_batcher.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import os
  2. import random
  3. import threading
  4. from datetime import datetime, timezone
  5. from typing import TYPE_CHECKING, TypeVar, Generic
  6. from sentry_sdk.utils import format_timestamp, safe_repr, serialize_attribute
  7. from sentry_sdk.envelope import Envelope, Item, PayloadRef
  8. if TYPE_CHECKING:
  9. from typing import Optional, Callable, Any
  10. T = TypeVar("T")
  11. class Batcher(Generic[T]):
  12. MAX_BEFORE_FLUSH = 100
  13. MAX_BEFORE_DROP = 1_000
  14. FLUSH_WAIT_TIME = 5.0
  15. TYPE = ""
  16. CONTENT_TYPE = ""
  17. def __init__(
  18. self,
  19. capture_func: "Callable[[Envelope], None]",
  20. record_lost_func: "Callable[..., None]",
  21. ) -> None:
  22. self._buffer: "list[T]" = []
  23. self._capture_func = capture_func
  24. self._record_lost_func = record_lost_func
  25. self._running = True
  26. self._lock = threading.Lock()
  27. self._flush_event: "threading.Event" = threading.Event()
  28. self._flusher: "Optional[threading.Thread]" = None
  29. self._flusher_pid: "Optional[int]" = None
  30. def _ensure_thread(self) -> bool:
  31. """For forking processes we might need to restart this thread.
  32. This ensures that our process actually has that thread running.
  33. """
  34. if not self._running:
  35. return False
  36. pid = os.getpid()
  37. if self._flusher_pid == pid:
  38. return True
  39. with self._lock:
  40. # Recheck to make sure another thread didn't get here and start the
  41. # the flusher in the meantime
  42. if self._flusher_pid == pid:
  43. return True
  44. self._flusher_pid = pid
  45. self._flusher = threading.Thread(target=self._flush_loop)
  46. self._flusher.daemon = True
  47. try:
  48. self._flusher.start()
  49. except RuntimeError:
  50. # Unfortunately at this point the interpreter is in a state that no
  51. # longer allows us to spawn a thread and we have to bail.
  52. self._running = False
  53. return False
  54. return True
  55. def _flush_loop(self) -> None:
  56. while self._running:
  57. self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
  58. self._flush_event.clear()
  59. self._flush()
  60. def add(self, item: "T") -> None:
  61. if not self._ensure_thread() or self._flusher is None:
  62. return None
  63. with self._lock:
  64. if len(self._buffer) >= self.MAX_BEFORE_DROP:
  65. self._record_lost(item)
  66. return None
  67. self._buffer.append(item)
  68. if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
  69. self._flush_event.set()
  70. def kill(self) -> None:
  71. if self._flusher is None:
  72. return
  73. self._running = False
  74. self._flush_event.set()
  75. self._flusher = None
  76. def flush(self) -> None:
  77. self._flush()
  78. def _add_to_envelope(self, envelope: "Envelope") -> None:
  79. envelope.add_item(
  80. Item(
  81. type=self.TYPE,
  82. content_type=self.CONTENT_TYPE,
  83. headers={
  84. "item_count": len(self._buffer),
  85. },
  86. payload=PayloadRef(
  87. json={
  88. "items": [
  89. self._to_transport_format(item) for item in self._buffer
  90. ]
  91. }
  92. ),
  93. )
  94. )
  95. def _flush(self) -> "Optional[Envelope]":
  96. envelope = Envelope(
  97. headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
  98. )
  99. with self._lock:
  100. if len(self._buffer) == 0:
  101. return None
  102. self._add_to_envelope(envelope)
  103. self._buffer.clear()
  104. self._capture_func(envelope)
  105. return envelope
  106. def _record_lost(self, item: "T") -> None:
  107. pass
  108. @staticmethod
  109. def _to_transport_format(item: "T") -> "Any":
  110. pass