parallel.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. from tqdm import tqdm
  2. from concurrent.futures import ProcessPoolExecutor, as_completed
  3. def parallel_process(array, function, n_jobs=16, use_kwargs=False, front_num=0):
  4. """
  5. A parallel version of the map function with a progress bar.
  6. Args:
  7. array (array-like): An array to iterate over.
  8. function (function): A python function to apply to the elements of array
  9. n_jobs (int, default=16): The number of cores to use
  10. use_kwargs (boolean, default=False): Whether to consider the elements of array as dictionaries of
  11. keyword arguments to function
  12. front_num (int, default=3): The number of iterations to run serially before kicking off the parallel job.
  13. Useful for catching bugs
  14. Returns:
  15. [function(array[0]), function(array[1]), ...]
  16. """
  17. # We run the first few iterations serially to catch bugs
  18. if front_num > 0:
  19. front = [
  20. function(**a) if use_kwargs else function(a) for a in array[:front_num]
  21. ]
  22. else:
  23. front = []
  24. # If we set n_jobs to 1, just run a list comprehension. This is useful for benchmarking and debugging.
  25. if n_jobs == 1:
  26. return front + [
  27. function(**a) if use_kwargs else function(a)
  28. for a in tqdm(array[front_num:])
  29. ]
  30. # Assemble the workers
  31. with ProcessPoolExecutor(max_workers=n_jobs) as pool:
  32. # Pass the elements of array into function
  33. if use_kwargs:
  34. futures = [pool.submit(function, **a) for a in array[front_num:]]
  35. else:
  36. futures = [pool.submit(function, a) for a in array[front_num:]]
  37. kwargs = {
  38. "total": len(futures),
  39. "unit": "it",
  40. "unit_scale": True,
  41. "leave": True,
  42. }
  43. # Print out the progress as tasks complete
  44. for f in tqdm(as_completed(futures), **kwargs):
  45. pass
  46. out = []
  47. # Get the results from the futures.
  48. for i, future in tqdm(enumerate(futures)):
  49. try:
  50. out.append(future.result())
  51. except Exception as e:
  52. out.append(e)
  53. return front + out