| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- import multiprocessing
- import os
- import time
- import pymupdf
- # Support for concurrent processing of document pages.
- #
- class _worker_State:
- pass
- _worker_state = _worker_State()
- def _worker_init(
- path,
- initfn,
- initfn_args,
- initfn_kwargs,
- pagefn,
- pagefn_args,
- pagefn_kwargs,
- stats,
- ):
- # pylint: disable=attribute-defined-outside-init
- _worker_state.path = path
- _worker_state.pagefn = pagefn
- _worker_state.pagefn_args = pagefn_args
- _worker_state.pagefn_kwargs = pagefn_kwargs
- _worker_state.stats = stats
- _worker_state.document = None
- if initfn:
- initfn(*initfn_args, **initfn_kwargs)
- def _stats_write(t, label):
- t = time.time() - t
- if t >= 10:
- pymupdf.log(f'{os.getpid()=}: {t:2f}s: {label}.')
- def _worker_fn(page_number):
- # Create Document from filename if we haven't already done so.
- if not _worker_state.document:
- if _worker_state.stats:
- t = time.time()
- _worker_state.document = pymupdf.Document(_worker_state.path) # pylint: disable=attribute-defined-outside-init
- if _worker_state.stats:
- _stats_write(t, 'pymupdf.Document()')
-
- if _worker_state.stats:
- t = time.time()
- page = _worker_state.document[page_number]
- if _worker_state.stats:
- _stats_write(t, '_worker_state.document[page_number]')
-
- if _worker_state.stats:
- t = time.time()
- ret = _worker_state.pagefn(
- page,
- *_worker_state.pagefn_args,
- **_worker_state.pagefn_kwargs,
- )
- if _worker_state.stats:
- _stats_write(t, '_worker_state.pagefn()')
-
- return ret
-
- def _multiprocessing(
- path,
- pages,
- pagefn,
- pagefn_args,
- pagefn_kwargs,
- initfn,
- initfn_args,
- initfn_kwargs,
- concurrency,
- stats,
- ):
- #print(f'_worker_mp(): {concurrency=}', flush=1)
- with multiprocessing.Pool(
- concurrency,
- _worker_init,
- (
- path,
- initfn, initfn_args, initfn_kwargs,
- pagefn, pagefn_args, pagefn_kwargs,
- stats,
- ),
- ) as pool:
- result = pool.map_async(_worker_fn, pages)
- return result.get()
-
- def _fork(
- path,
- pages,
- pagefn,
- pagefn_args,
- pagefn_kwargs,
- initfn,
- initfn_args,
- initfn_kwargs,
- concurrency,
- stats,
- ):
- verbose = 0
- if concurrency is None:
- concurrency = multiprocessing.cpu_count()
- # We write page numbers to `queue_down` and read `(page_num, text)` from
- # `queue_up`. Workers each repeatedly read the next available page number
- # from `queue_down`, extract the text and write it onto `queue_up`.
- #
- # This is better than pre-allocating a subset of pages to each worker
- # because it ensures there will never be idle workers until we are near the
- # end with fewer pages left than workers.
- #
- queue_down = multiprocessing.Queue()
- queue_up = multiprocessing.Queue()
- def childfn():
- document = None
- if verbose:
- pymupdf.log(f'{os.getpid()=}: {initfn=} {initfn_args=}')
- _worker_init(
- path,
- initfn,
- initfn_args,
- initfn_kwargs,
- pagefn,
- pagefn_args,
- pagefn_kwargs,
- stats,
- )
- while 1:
- if verbose:
- pymupdf.log(f'{os.getpid()=}: calling get().')
- page_num = queue_down.get()
- if verbose:
- pymupdf.log(f'{os.getpid()=}: {page_num=}.')
- if page_num is None:
- break
- try:
- if not document:
- if stats:
- t = time.time()
- document = pymupdf.Document(path)
- if stats:
- _stats_write(t, 'pymupdf.Document(path)')
-
- if stats:
- t = time.time()
- page = document[page_num]
- if stats:
- _stats_write(t, 'document[page_num]')
-
- if verbose:
- pymupdf.log(f'{os.getpid()=}: {_worker_state=}')
-
- if stats:
- t = time.time()
- ret = pagefn(
- page,
- *_worker_state.pagefn_args,
- **_worker_state.pagefn_kwargs,
- )
- if stats:
- _stats_write(t, f'{page_num=} pagefn()')
- except Exception as e:
- if verbose: pymupdf.log(f'{os.getpid()=}: exception {e=}')
- ret = e
- if verbose:
- pymupdf.log(f'{os.getpid()=}: sending {page_num=} {ret=}')
-
- queue_up.put( (page_num, ret) )
- error = None
- pids = list()
- try:
- # Start child processes.
- if stats:
- t = time.time()
- for i in range(concurrency):
- p = os.fork() # pylint: disable=no-member
- if p == 0:
- # Child process.
- try:
- try:
- childfn()
- except Exception as e:
- pymupdf.log(f'{os.getpid()=}: childfn() => {e=}')
- raise
- finally:
- if verbose:
- pymupdf.log(f'{os.getpid()=}: calling os._exit(0)')
- os._exit(0)
- pids.append(p)
- if stats:
- _stats_write(t, 'create child processes')
- # Send page numbers.
- if stats:
- t = time.time()
- if verbose:
- pymupdf.log(f'Sending page numbers.')
- for page_num in range(len(pages)):
- queue_down.put(page_num)
- if stats:
- _stats_write(t, 'Send page numbers')
- # Collect results. We give up if any worker sends an exception instead
- # of text, but this hasn't been tested.
- ret = [None] * len(pages)
- for i in range(len(pages)):
- page_num, text = queue_up.get()
- if verbose:
- pymupdf.log(f'{page_num=} {len(text)=}')
- assert ret[page_num] is None
- if isinstance(text, Exception):
- if not error:
- error = text
- break
- ret[page_num] = text
- # Close queue. This should cause exception in workers and terminate
- # them, but on macos-arm64 this does not seem to happen, so we also
- # send None, which makes workers terminate.
- for i in range(concurrency):
- queue_down.put(None)
- if verbose: pymupdf.log(f'Closing queues.')
- queue_down.close()
- if error:
- raise error
- if verbose:
- pymupdf.log(f'After concurrent, returning {len(ret)=}')
- return ret
-
- finally:
- # Join all child processes.
- if stats:
- t = time.time()
- for pid in pids:
- if verbose:
- pymupdf.log(f'waiting for {pid=}.')
- e = os.waitpid(pid, 0)
- if verbose:
- pymupdf.log(f'{pid=} => {e=}')
- if stats:
- _stats_write(t, 'Join all child proceses')
|