reader.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. # Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import copy
  15. import logging
  16. import multiprocessing
  17. import sys
  18. import time
  19. import warnings
  20. import paddle
  21. from ..base.framework import (
  22. _current_expected_place,
  23. _get_paddle_place,
  24. _get_paddle_place_list,
  25. )
  26. from ..framework import core, in_dynamic_mode
  27. from .dataloader import BatchSampler, IterableDataset, Subset
  28. from .dataloader.batch_sampler import _InfiniteIterableSampler
  29. from .dataloader.dataloader_iter import (
  30. _DataLoaderIterMultiProcess,
  31. _DataLoaderIterSingleProcess,
  32. _DatasetKind,
  33. )
  34. # NOTE: [ avoid hanging & failed quickly ]
  35. # These value is used in getting data from another process
  36. QUEUE_GET_TIMEOUT = 60
  37. USE_PINNED_MEMORY = None
  38. # AutoTune Flags
  39. USE_AUTOTUNE = False
  40. TUNING_STEPS = 500
  41. def set_autotune_config(use_autotune, tuning_steps=500):
  42. global USE_AUTOTUNE
  43. USE_AUTOTUNE = use_autotune
  44. global TUNING_STEPS
  45. TUNING_STEPS = tuning_steps
  46. def use_pinned_memory(*args):
  47. global USE_PINNED_MEMORY
  48. if len(args) == 0:
  49. return USE_PINNED_MEMORY
  50. else:
  51. assert len(args) == 1 and isinstance(args[0], bool)
  52. USE_PINNED_MEMORY = args[0]
  53. def _convert_places(places):
  54. if not isinstance(places, (list, tuple)):
  55. places = [places]
  56. ret = []
  57. for p in places:
  58. if not isinstance(p, core.Place):
  59. tmp = core.Place()
  60. tmp.set_place(p)
  61. p = tmp
  62. ret.append(p)
  63. return ret
  64. class AuToTune:
  65. def __init__(self, loader):
  66. self.loader = loader
  67. self.max_num_worker = multiprocessing.cpu_count() / 2
  68. def __call__(self):
  69. # use default loader
  70. if (not USE_AUTOTUNE) or (not self.need_autotune()):
  71. return self.loader.num_workers
  72. # get autotune loader
  73. auto_tune_loader = self.get_autotune_loader()
  74. if auto_tune_loader is None:
  75. return self.loader.num_workers
  76. # pick the best num_workers
  77. auto_tune_start = time.time()
  78. logging.debug("========= DataLoader Auto Tune =========")
  79. logging.debug(
  80. "User config for DataLoader: " + str(self.loader.num_workers)
  81. )
  82. best_num_workers = 0
  83. min_cost = float("inf")
  84. logging.debug(
  85. "Tuning Range for num_workers: 0 ~ " + str(self.max_num_worker)
  86. )
  87. num_workers = 0
  88. while num_workers < self.max_num_worker:
  89. auto_tune_loader.num_workers = num_workers
  90. avg_cost = self.evaluate_reader_cost(auto_tune_loader)
  91. if min_cost * 0.75 > avg_cost:
  92. min_cost = avg_cost
  93. best_num_workers = num_workers
  94. else:
  95. update_num = self.is_best(
  96. auto_tune_loader,
  97. best_num_workers,
  98. min_cost,
  99. self.max_num_worker,
  100. )
  101. if update_num == best_num_workers:
  102. break
  103. else:
  104. best_num_workers = update_num
  105. logging.debug(
  106. "num_workers: "
  107. + str(num_workers)
  108. + " avg_cost: "
  109. + str(avg_cost)
  110. )
  111. num_workers += 2
  112. logging.info(
  113. "auto_tune dataLoader best_num_workers: " + str(best_num_workers)
  114. )
  115. logging.debug(
  116. "AutoTuning Cost for DataLoader: "
  117. + str(time.time() - auto_tune_start)
  118. + ' seconds'
  119. )
  120. # tune the default loader's num_workers
  121. return best_num_workers
  122. def need_autotune(self):
  123. if sys.platform == 'darwin' or sys.platform == 'win32':
  124. return False
  125. else:
  126. return True
  127. def get_sub_dataset(self, dataset, batch_size):
  128. num_samples = min(batch_size * TUNING_STEPS, len(dataset))
  129. sub_dataset = Subset(dataset, indices=list(range(num_samples)))
  130. return sub_dataset
  131. def get_autotune_loader(self):
  132. loader = copy.copy(self.loader)
  133. batch_size = self.loader.batch_sampler.batch_size
  134. if isinstance(
  135. self.loader.batch_sampler, paddle.io.DistributedBatchSampler
  136. ):
  137. dataset = self.loader.batch_sampler.dataset
  138. sub_dataset = self.get_sub_dataset(dataset, batch_size)
  139. loader.batch_sampler = paddle.io.DistributedBatchSampler(
  140. dataset=sub_dataset,
  141. batch_size=batch_size,
  142. num_replicas=self.loader.batch_sampler.nranks,
  143. rank=self.loader.batch_sampler.local_rank,
  144. shuffle=self.loader.batch_sampler.shuffle,
  145. drop_last=self.loader.batch_sampler.drop_last,
  146. )
  147. elif isinstance(self.loader.batch_sampler, paddle.io.BatchSampler):
  148. dataset = self.loader.batch_sampler.sampler.data_source
  149. sub_dataset = self.get_sub_dataset(dataset, batch_size)
  150. loader.batch_sampler = paddle.io.BatchSampler(
  151. dataset=sub_dataset,
  152. batch_size=batch_size,
  153. drop_last=self.loader.batch_sampler.drop_last,
  154. )
  155. else:
  156. loader = None
  157. return loader
  158. def evaluate_reader_cost(self, reader):
  159. costs = []
  160. avg_cost = 0
  161. start = time.time()
  162. for i, data in enumerate(reader):
  163. costs.append(time.time() - start)
  164. start = time.time()
  165. if len(costs) > 2:
  166. avg_cost = sum(costs[2:]) / len(costs[2:])
  167. else:
  168. avg_cost = sum(costs[0:]) / len(costs[0:])
  169. return avg_cost
  170. def is_best(self, reader, best_workers, best_time, num_work_boundary):
  171. step = 0
  172. num_workers = best_workers + 1
  173. boundary = 1
  174. while num_workers < num_work_boundary and step < 5:
  175. self.loader.num_workers = num_workers
  176. time = self.evaluate_reader_cost(reader)
  177. logging.debug(
  178. "for back num_workers: "
  179. + str(num_workers)
  180. + " avg_cost: "
  181. + str(time)
  182. )
  183. step += 1
  184. if time < best_time * 0.70 * boundary:
  185. return num_workers
  186. else:
  187. num_workers += 1
  188. boundary *= 0.80
  189. return best_workers
  190. class DataLoader:
  191. """
  192. DataLoader provides an iterator which iterates given dataset
  193. once by the batch_sampler.
  194. DataLoader supports single-process and multi-process data loading,
  195. multi-process workers will be used to load data asynchronously if
  196. :attr:`num_workers` is set as a positive number.
  197. DataLoader supports map-style dataset and iterable-style dataset.
  198. For map-style dataset(can get a sample from dataset with a given
  199. index), please see :code:`paddle.io.Dataset`.
  200. For iterable-style dataset(get samples from dataset iteratively,
  201. like a Python iterator), please see :code:`paddle.io.IterableDataset`.
  202. For :code:`batch_sampler` please see :code:`paddle.io.BatchSampler`
  203. Notes:
  204. GPU tensor operation is not supported in subprocess currently,
  205. please don't use GPU tensor operations in pipeline which will
  206. be performed in subprocess, such as dataset transforms, collate_fn,
  207. etc. Numpy array and CPU tensor operation is supported.
  208. **Disable automatic batching**
  209. In certain cases such as some NLP tasks, instead of automatic batching,
  210. handling batching manually in dataset is needed by users. For these
  211. cases, automatic batching is disabled if both :attr:`batch_size` and
  212. :attr:`batch_sampler` is set as None, each data got from :attr:`dataset`
  213. should be batched data and will be processed with function define by
  214. :attr:`collate_fn` or :attr:`default_collate_fn`.
  215. Notes:
  216. When automatic batching is disabled, :attr:`default_collate_fn` will
  217. do nothing to data from dataset.
  218. Args:
  219. dataset(Dataset): the dataset to load data from, should be an
  220. instance of subclass of :code:`paddle.io.Dataset` or
  221. :code:`paddle.io.IterableDataset`.
  222. feed_list (list(Tensor)|tuple(Tensor), optional): feed Tensor list.
  223. The Tensors should be created by :code:`paddle.static.data()`.
  224. :attr:`feed_list` must be set if :attr:`return_list` is
  225. False. Default None.
  226. places(list(Place)|tuple(Place)|list(str), optional): a list of Place,
  227. to put data onto, :attr:`places` can be None, if
  228. :attr:`places` is None, default place(CPUPlace or CUDAPlace(0))
  229. will be used. Default None. If ``places`` is list of string,
  230. the string in the list can be ``cpu``, ``gpu:x`` and ``gpu_pinned``,
  231. where ``x`` is the index of the GPUs.
  232. return_list (bool, optional): whether the return value on each device is
  233. presented as a list. If :attr:`return_list=False`, the return
  234. value on each device would be a dict of str -> Tensor, where
  235. the key of the dict is the name of each fed Tensors. If
  236. :attr:`return_list=True`, the return value on each device would
  237. be a list(Tensor). :attr:`return_list` can only be True
  238. in dynamic graph mode. Default True.
  239. batch_sampler(BatchSampler, optional): an instance of `paddle.io.BatchSampler`
  240. to generate batch indices to draw samples from :attr:`dataset`
  241. and combine a batch. Default None.
  242. batch_size(int|None, optional): sample number in a mini-batch, a substitution
  243. parameter for :attr:`batch_sampler`, if :attr:`batch_sampler`
  244. is not set, a default `paddle.io.BatchSampler` will be used
  245. and initialize by :attr:`batch_size`, :attr:`shuffle` and
  246. :attr:`drop_last`. Default 1.
  247. shuffle(bool, optional): whether to shuffle indices order before generate
  248. batch indices, a substitution parameter for :attr:`batch_sampler`
  249. see :attr:`batch_size`. Default False.
  250. drop_last(bool, optional): whether drop the last incomplete batch dataset size
  251. is not divisible by the batch size, a substitution parameter
  252. for :attr:`batch_sampler`, see :attr:`batch_size`. Default False
  253. collate_fn(callable, optional): function to generate mini-batch data by merging
  254. the sample list, None for only stack each fields of sample in axis
  255. 0(same as :attr::`np.stack(..., axis=0)`). Default None
  256. num_workers(int, optional): the number of subprocess to load data, 0 for no
  257. subprocess used and loading data in main process. Default 0
  258. use_buffer_reader (bool, optional): whether to use buffered reader.
  259. If use_buffer_reader=True, the DataLoader would prefetch
  260. batch data asynchronously, so it would speed up data feeding
  261. and occupies a little more CPU or GPU memory, i.e., the memory
  262. of one batch input data. Default True.
  263. prefetch_factor (int, optional): Number of batch data the DataLoader would prefetch
  264. if use_buffer_reader=True. Default 2.
  265. use_shared_memory (bool, optional): whether to use shared memory to speed up
  266. putting data into inter-process queue, set :attr:`use_shared_memory`
  267. as True only when the shared memory space on your machine(e.g.
  268. space of '/dev/shm' on Linux operating system) is large enough.
  269. Shared memory will only be enabled in multi-process mode(num_workers
  270. > 0). Default True.
  271. timeout(int, optional): the timeout value for getting data form output queue
  272. of subprocesses. Default 0.
  273. worker_init_fn(callable, optional): init function which will be called with
  274. worker id on each subprocess starting if not set as None. Default
  275. None.
  276. Returns:
  277. DataLoader: an iterable object for data iterating, each element of the generated data is a Tensor.
  278. Examples:
  279. .. code-block:: python
  280. >>> # doctest: +SOLO('can not use multiprocessing testing `paddle.io.DataLoader`')
  281. >>> import numpy as np
  282. >>> import paddle
  283. >>> import paddle.nn as nn
  284. >>> import paddle.nn.functional as F
  285. >>> from paddle.io import Dataset, BatchSampler, DataLoader
  286. >>> BATCH_NUM = 20
  287. >>> BATCH_SIZE = 16
  288. >>> EPOCH_NUM = 4
  289. >>> IMAGE_SIZE = 784
  290. >>> CLASS_NUM = 10
  291. >>> # define a random dataset
  292. >>> class RandomDataset(Dataset):
  293. ... def __init__(self, num_samples):
  294. ... self.num_samples = num_samples
  295. ...
  296. ... def __getitem__(self, idx):
  297. ... image = np.random.random([IMAGE_SIZE]).astype('float32')
  298. ... label = np.random.randint(0, CLASS_NUM - 1, (1, )).astype('int64')
  299. ... return image, label
  300. ...
  301. ... def __len__(self):
  302. ... return self.num_samples
  303. ...
  304. >>> dataset = RandomDataset(BATCH_NUM * BATCH_SIZE)
  305. >>> class SimpleNet(nn.Layer):
  306. ... def __init__(self):
  307. ... super().__init__()
  308. ... self.fc = nn.Linear(IMAGE_SIZE, CLASS_NUM)
  309. ...
  310. ... def forward(self, image, label=None):
  311. ... return self.fc(image)
  312. ...
  313. >>> simple_net = SimpleNet()
  314. >>> opt = paddle.optimizer.SGD(learning_rate=1e-3,
  315. ... parameters=simple_net.parameters())
  316. ...
  317. >>> loader = DataLoader(dataset,
  318. ... batch_size=BATCH_SIZE,
  319. ... shuffle=True,
  320. ... drop_last=True,
  321. ... num_workers=2)
  322. ...
  323. >>> for e in range(EPOCH_NUM):
  324. ... for i, (image, label) in enumerate(loader()):
  325. ... out = simple_net(image)
  326. ... loss = F.cross_entropy(out, label)
  327. ... avg_loss = paddle.mean(loss)
  328. ... avg_loss.backward()
  329. ... opt.minimize(avg_loss)
  330. ... simple_net.clear_gradients()
  331. ... print("Epoch {} batch {}: loss = {}".format(e, i, np.mean(loss.numpy())))
  332. Notes:
  333. For reading iterable dataset with multiprocess Dataloader,
  334. please see :code:`paddle.io.IterableDataset`
  335. """
  336. def __init__(
  337. self,
  338. dataset,
  339. feed_list=None,
  340. places=None,
  341. return_list=True,
  342. batch_sampler=None,
  343. batch_size=1,
  344. shuffle=False,
  345. drop_last=False,
  346. collate_fn=None,
  347. num_workers=0,
  348. use_buffer_reader=True,
  349. prefetch_factor=2,
  350. use_shared_memory=True,
  351. timeout=0,
  352. worker_init_fn=None,
  353. persistent_workers=False,
  354. ):
  355. self.return_list = return_list
  356. self.collate_fn = collate_fn
  357. self.use_buffer_reader = use_buffer_reader
  358. self.prefetch_factor = prefetch_factor
  359. self.worker_init_fn = worker_init_fn
  360. self.dataset = dataset
  361. if not return_list and not in_dynamic_mode():
  362. assert (
  363. feed_list is not None
  364. ), "feed_list should be set when return_list=False"
  365. self.feed_list = feed_list
  366. if places is None:
  367. places = _current_expected_place()
  368. if isinstance(places, (list, tuple)):
  369. places = _get_paddle_place_list(places)
  370. else:
  371. places = _get_paddle_place(places)
  372. self.places = _convert_places(places)
  373. assert num_workers >= 0, "num_workers should be a non-negative value"
  374. if num_workers > 0 and (
  375. sys.platform == 'darwin' or sys.platform == 'win32'
  376. ):
  377. warnings.warn(
  378. "DataLoader with multi-process mode is not supported on MacOs and Windows currently."
  379. " Please use single-process mode with num_workers = 0 instead"
  380. )
  381. num_workers = 0
  382. self.num_workers = num_workers
  383. assert prefetch_factor > 0, "prefetch_factor should be a positive value"
  384. self.use_shared_memory = use_shared_memory
  385. if use_shared_memory and num_workers == 0:
  386. self.use_shared_memory = False
  387. assert timeout >= 0, "timeout should be a non-negative value"
  388. self.timeout = timeout
  389. if isinstance(dataset, IterableDataset):
  390. self.dataset_kind = _DatasetKind.ITER
  391. if shuffle:
  392. raise ValueError(
  393. f"IterableDataset not support shuffle, but got shuffle={shuffle}"
  394. )
  395. if batch_sampler is not None:
  396. raise ValueError(
  397. "IterableDataset expect unspecified batch_sampler"
  398. )
  399. else:
  400. self.dataset_kind = _DatasetKind.MAP
  401. if batch_sampler is not None:
  402. assert batch_size == 1 and not shuffle and not drop_last, (
  403. "batch_size/shuffle/drop_last should not be set when "
  404. "batch_sampler is given"
  405. )
  406. self.batch_sampler = batch_sampler
  407. self.batch_size = None
  408. elif batch_size is None:
  409. self.batch_sampler = None
  410. self.batch_size = None
  411. else:
  412. assert batch_size > 0, (
  413. "batch_size should be None or a positive value when "
  414. "batch_sampler is not given"
  415. )
  416. self.batch_size = batch_size
  417. if isinstance(dataset, IterableDataset):
  418. self.batch_sampler = _InfiniteIterableSampler(
  419. dataset, batch_size
  420. )
  421. else:
  422. self.batch_sampler = BatchSampler(
  423. dataset=dataset,
  424. batch_size=batch_size,
  425. shuffle=shuffle,
  426. drop_last=drop_last,
  427. )
  428. self.drop_last = drop_last
  429. self.auto_collate_batch = self.batch_sampler is not None
  430. self.pin_memory = False
  431. if in_dynamic_mode():
  432. self.pin_memory = (
  433. True if use_pinned_memory() is None else use_pinned_memory()
  434. )
  435. self._persistent_workers = persistent_workers
  436. self._iterator = None
  437. self.num_workers = AuToTune(self).__call__()
  438. def __len__(self):
  439. if self.dataset_kind == _DatasetKind.ITER:
  440. raise ValueError("length of IterableDataset not supported")
  441. else:
  442. if self.auto_collate_batch:
  443. return len(self.batch_sampler)
  444. else:
  445. return len(self.dataset)
  446. def __iter__(self):
  447. if self.num_workers == 0:
  448. return _DataLoaderIterSingleProcess(self)
  449. elif self._persistent_workers:
  450. if self._iterator is None:
  451. self._iterator = _DataLoaderIterMultiProcess(self)
  452. else:
  453. self._iterator._reset()
  454. return self._iterator
  455. else:
  456. return _DataLoaderIterMultiProcess(self)
  457. def __call__(self):
  458. return self.__iter__()