_apply_pages.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. import multiprocessing
  2. import os
  3. import time
  4. import pymupdf
  5. # Support for concurrent processing of document pages.
  6. #
  7. class _worker_State:
  8. pass
  9. _worker_state = _worker_State()
  10. def _worker_init(
  11. path,
  12. initfn,
  13. initfn_args,
  14. initfn_kwargs,
  15. pagefn,
  16. pagefn_args,
  17. pagefn_kwargs,
  18. stats,
  19. ):
  20. # pylint: disable=attribute-defined-outside-init
  21. _worker_state.path = path
  22. _worker_state.pagefn = pagefn
  23. _worker_state.pagefn_args = pagefn_args
  24. _worker_state.pagefn_kwargs = pagefn_kwargs
  25. _worker_state.stats = stats
  26. _worker_state.document = None
  27. if initfn:
  28. initfn(*initfn_args, **initfn_kwargs)
  29. def _stats_write(t, label):
  30. t = time.time() - t
  31. if t >= 10:
  32. pymupdf.log(f'{os.getpid()=}: {t:2f}s: {label}.')
  33. def _worker_fn(page_number):
  34. # Create Document from filename if we haven't already done so.
  35. if not _worker_state.document:
  36. if _worker_state.stats:
  37. t = time.time()
  38. _worker_state.document = pymupdf.Document(_worker_state.path) # pylint: disable=attribute-defined-outside-init
  39. if _worker_state.stats:
  40. _stats_write(t, 'pymupdf.Document()')
  41. if _worker_state.stats:
  42. t = time.time()
  43. page = _worker_state.document[page_number]
  44. if _worker_state.stats:
  45. _stats_write(t, '_worker_state.document[page_number]')
  46. if _worker_state.stats:
  47. t = time.time()
  48. ret = _worker_state.pagefn(
  49. page,
  50. *_worker_state.pagefn_args,
  51. **_worker_state.pagefn_kwargs,
  52. )
  53. if _worker_state.stats:
  54. _stats_write(t, '_worker_state.pagefn()')
  55. return ret
  56. def _multiprocessing(
  57. path,
  58. pages,
  59. pagefn,
  60. pagefn_args,
  61. pagefn_kwargs,
  62. initfn,
  63. initfn_args,
  64. initfn_kwargs,
  65. concurrency,
  66. stats,
  67. ):
  68. #print(f'_worker_mp(): {concurrency=}', flush=1)
  69. with multiprocessing.Pool(
  70. concurrency,
  71. _worker_init,
  72. (
  73. path,
  74. initfn, initfn_args, initfn_kwargs,
  75. pagefn, pagefn_args, pagefn_kwargs,
  76. stats,
  77. ),
  78. ) as pool:
  79. result = pool.map_async(_worker_fn, pages)
  80. return result.get()
  81. def _fork(
  82. path,
  83. pages,
  84. pagefn,
  85. pagefn_args,
  86. pagefn_kwargs,
  87. initfn,
  88. initfn_args,
  89. initfn_kwargs,
  90. concurrency,
  91. stats,
  92. ):
  93. verbose = 0
  94. if concurrency is None:
  95. concurrency = multiprocessing.cpu_count()
  96. # We write page numbers to `queue_down` and read `(page_num, text)` from
  97. # `queue_up`. Workers each repeatedly read the next available page number
  98. # from `queue_down`, extract the text and write it onto `queue_up`.
  99. #
  100. # This is better than pre-allocating a subset of pages to each worker
  101. # because it ensures there will never be idle workers until we are near the
  102. # end with fewer pages left than workers.
  103. #
  104. queue_down = multiprocessing.Queue()
  105. queue_up = multiprocessing.Queue()
  106. def childfn():
  107. document = None
  108. if verbose:
  109. pymupdf.log(f'{os.getpid()=}: {initfn=} {initfn_args=}')
  110. _worker_init(
  111. path,
  112. initfn,
  113. initfn_args,
  114. initfn_kwargs,
  115. pagefn,
  116. pagefn_args,
  117. pagefn_kwargs,
  118. stats,
  119. )
  120. while 1:
  121. if verbose:
  122. pymupdf.log(f'{os.getpid()=}: calling get().')
  123. page_num = queue_down.get()
  124. if verbose:
  125. pymupdf.log(f'{os.getpid()=}: {page_num=}.')
  126. if page_num is None:
  127. break
  128. try:
  129. if not document:
  130. if stats:
  131. t = time.time()
  132. document = pymupdf.Document(path)
  133. if stats:
  134. _stats_write(t, 'pymupdf.Document(path)')
  135. if stats:
  136. t = time.time()
  137. page = document[page_num]
  138. if stats:
  139. _stats_write(t, 'document[page_num]')
  140. if verbose:
  141. pymupdf.log(f'{os.getpid()=}: {_worker_state=}')
  142. if stats:
  143. t = time.time()
  144. ret = pagefn(
  145. page,
  146. *_worker_state.pagefn_args,
  147. **_worker_state.pagefn_kwargs,
  148. )
  149. if stats:
  150. _stats_write(t, f'{page_num=} pagefn()')
  151. except Exception as e:
  152. if verbose: pymupdf.log(f'{os.getpid()=}: exception {e=}')
  153. ret = e
  154. if verbose:
  155. pymupdf.log(f'{os.getpid()=}: sending {page_num=} {ret=}')
  156. queue_up.put( (page_num, ret) )
  157. error = None
  158. pids = list()
  159. try:
  160. # Start child processes.
  161. if stats:
  162. t = time.time()
  163. for i in range(concurrency):
  164. p = os.fork() # pylint: disable=no-member
  165. if p == 0:
  166. # Child process.
  167. try:
  168. try:
  169. childfn()
  170. except Exception as e:
  171. pymupdf.log(f'{os.getpid()=}: childfn() => {e=}')
  172. raise
  173. finally:
  174. if verbose:
  175. pymupdf.log(f'{os.getpid()=}: calling os._exit(0)')
  176. os._exit(0)
  177. pids.append(p)
  178. if stats:
  179. _stats_write(t, 'create child processes')
  180. # Send page numbers.
  181. if stats:
  182. t = time.time()
  183. if verbose:
  184. pymupdf.log(f'Sending page numbers.')
  185. for page_num in range(len(pages)):
  186. queue_down.put(page_num)
  187. if stats:
  188. _stats_write(t, 'Send page numbers')
  189. # Collect results. We give up if any worker sends an exception instead
  190. # of text, but this hasn't been tested.
  191. ret = [None] * len(pages)
  192. for i in range(len(pages)):
  193. page_num, text = queue_up.get()
  194. if verbose:
  195. pymupdf.log(f'{page_num=} {len(text)=}')
  196. assert ret[page_num] is None
  197. if isinstance(text, Exception):
  198. if not error:
  199. error = text
  200. break
  201. ret[page_num] = text
  202. # Close queue. This should cause exception in workers and terminate
  203. # them, but on macos-arm64 this does not seem to happen, so we also
  204. # send None, which makes workers terminate.
  205. for i in range(concurrency):
  206. queue_down.put(None)
  207. if verbose: pymupdf.log(f'Closing queues.')
  208. queue_down.close()
  209. if error:
  210. raise error
  211. if verbose:
  212. pymupdf.log(f'After concurrent, returning {len(ret)=}')
  213. return ret
  214. finally:
  215. # Join all child processes.
  216. if stats:
  217. t = time.time()
  218. for pid in pids:
  219. if verbose:
  220. pymupdf.log(f'waiting for {pid=}.')
  221. e = os.waitpid(pid, 0)
  222. if verbose:
  223. pymupdf.log(f'{pid=} => {e=}')
  224. if stats:
  225. _stats_write(t, 'Join all child proceses')