worker.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import os
  2. import threading
  3. from time import sleep, time
  4. from sentry_sdk._queue import Queue, FullError
  5. from sentry_sdk.utils import logger
  6. from sentry_sdk.consts import DEFAULT_QUEUE_SIZE
  7. from typing import TYPE_CHECKING
  8. if TYPE_CHECKING:
  9. from typing import Any
  10. from typing import Optional
  11. from typing import Callable
  12. _TERMINATOR = object()
  13. class BackgroundWorker:
  14. def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None:
  15. self._queue: "Queue" = Queue(queue_size)
  16. self._lock = threading.Lock()
  17. self._thread: "Optional[threading.Thread]" = None
  18. self._thread_for_pid: "Optional[int]" = None
  19. @property
  20. def is_alive(self) -> bool:
  21. if self._thread_for_pid != os.getpid():
  22. return False
  23. if not self._thread:
  24. return False
  25. return self._thread.is_alive()
  26. def _ensure_thread(self) -> None:
  27. if not self.is_alive:
  28. self.start()
  29. def _timed_queue_join(self, timeout: float) -> bool:
  30. deadline = time() + timeout
  31. queue = self._queue
  32. queue.all_tasks_done.acquire()
  33. try:
  34. while queue.unfinished_tasks:
  35. delay = deadline - time()
  36. if delay <= 0:
  37. return False
  38. queue.all_tasks_done.wait(timeout=delay)
  39. return True
  40. finally:
  41. queue.all_tasks_done.release()
  42. def start(self) -> None:
  43. with self._lock:
  44. if not self.is_alive:
  45. self._thread = threading.Thread(
  46. target=self._target, name="sentry-sdk.BackgroundWorker"
  47. )
  48. self._thread.daemon = True
  49. try:
  50. self._thread.start()
  51. self._thread_for_pid = os.getpid()
  52. except RuntimeError:
  53. # At this point we can no longer start because the interpreter
  54. # is already shutting down. Sadly at this point we can no longer
  55. # send out events.
  56. self._thread = None
  57. def kill(self) -> None:
  58. """
  59. Kill worker thread. Returns immediately. Not useful for
  60. waiting on shutdown for events, use `flush` for that.
  61. """
  62. logger.debug("background worker got kill request")
  63. with self._lock:
  64. if self._thread:
  65. try:
  66. self._queue.put_nowait(_TERMINATOR)
  67. except FullError:
  68. logger.debug("background worker queue full, kill failed")
  69. self._thread = None
  70. self._thread_for_pid = None
  71. def flush(self, timeout: float, callback: "Optional[Any]" = None) -> None:
  72. logger.debug("background worker got flush request")
  73. with self._lock:
  74. if self.is_alive and timeout > 0.0:
  75. self._wait_flush(timeout, callback)
  76. logger.debug("background worker flushed")
  77. def full(self) -> bool:
  78. return self._queue.full()
  79. def _wait_flush(self, timeout: float, callback: "Optional[Any]") -> None:
  80. initial_timeout = min(0.1, timeout)
  81. if not self._timed_queue_join(initial_timeout):
  82. pending = self._queue.qsize() + 1
  83. logger.debug("%d event(s) pending on flush", pending)
  84. if callback is not None:
  85. callback(pending, timeout)
  86. if not self._timed_queue_join(timeout - initial_timeout):
  87. pending = self._queue.qsize() + 1
  88. logger.error("flush timed out, dropped %s events", pending)
  89. def submit(self, callback: "Callable[[], None]") -> bool:
  90. self._ensure_thread()
  91. try:
  92. self._queue.put_nowait(callback)
  93. return True
  94. except FullError:
  95. return False
  96. def _target(self) -> None:
  97. while True:
  98. callback = self._queue.get()
  99. try:
  100. if callback is _TERMINATOR:
  101. break
  102. try:
  103. callback()
  104. except Exception:
  105. logger.error("Failed processing job", exc_info=True)
  106. finally:
  107. self._queue.task_done()
  108. sleep(0)