anthropic.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. import sys
  2. from collections.abc import Iterable
  3. from functools import wraps
  4. from typing import TYPE_CHECKING
  5. import sentry_sdk
  6. from sentry_sdk.ai.monitoring import record_token_usage
  7. from sentry_sdk.ai.utils import (
  8. GEN_AI_ALLOWED_MESSAGE_ROLES,
  9. set_data_normalized,
  10. normalize_message_roles,
  11. truncate_and_annotate_messages,
  12. get_start_span_function,
  13. transform_anthropic_content_part,
  14. )
  15. from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
  16. from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
  17. from sentry_sdk.scope import should_send_default_pii
  18. from sentry_sdk.tracing_utils import set_span_errored
  19. from sentry_sdk.utils import (
  20. capture_internal_exceptions,
  21. event_from_exception,
  22. package_version,
  23. safe_serialize,
  24. reraise,
  25. )
  26. try:
  27. try:
  28. from anthropic import NotGiven
  29. except ImportError:
  30. NotGiven = None
  31. try:
  32. from anthropic import Omit
  33. except ImportError:
  34. Omit = None
  35. from anthropic.resources import AsyncMessages, Messages
  36. if TYPE_CHECKING:
  37. from anthropic.types import MessageStreamEvent
  38. except ImportError:
  39. raise DidNotEnable("Anthropic not installed")
  40. if TYPE_CHECKING:
  41. from typing import Any, AsyncIterator, Iterator, List, Optional, Union
  42. from sentry_sdk.tracing import Span
  43. class AnthropicIntegration(Integration):
  44. identifier = "anthropic"
  45. origin = f"auto.ai.{identifier}"
  46. def __init__(self: "AnthropicIntegration", include_prompts: bool = True) -> None:
  47. self.include_prompts = include_prompts
  48. @staticmethod
  49. def setup_once() -> None:
  50. version = package_version("anthropic")
  51. _check_minimum_version(AnthropicIntegration, version)
  52. Messages.create = _wrap_message_create(Messages.create)
  53. AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)
  54. def _capture_exception(exc: "Any") -> None:
  55. set_span_errored()
  56. event, hint = event_from_exception(
  57. exc,
  58. client_options=sentry_sdk.get_client().options,
  59. mechanism={"type": "anthropic", "handled": False},
  60. )
  61. sentry_sdk.capture_event(event, hint=hint)
  62. def _get_token_usage(result: "Messages") -> "tuple[int, int, int, int]":
  63. """
  64. Get token usage from the Anthropic response.
  65. Returns: (input_tokens, output_tokens, cache_read_input_tokens, cache_write_input_tokens)
  66. """
  67. input_tokens = 0
  68. output_tokens = 0
  69. cache_read_input_tokens = 0
  70. cache_write_input_tokens = 0
  71. if hasattr(result, "usage"):
  72. usage = result.usage
  73. if hasattr(usage, "input_tokens") and isinstance(usage.input_tokens, int):
  74. input_tokens = usage.input_tokens
  75. if hasattr(usage, "output_tokens") and isinstance(usage.output_tokens, int):
  76. output_tokens = usage.output_tokens
  77. if hasattr(usage, "cache_read_input_tokens") and isinstance(
  78. usage.cache_read_input_tokens, int
  79. ):
  80. cache_read_input_tokens = usage.cache_read_input_tokens
  81. if hasattr(usage, "cache_creation_input_tokens") and isinstance(
  82. usage.cache_creation_input_tokens, int
  83. ):
  84. cache_write_input_tokens = usage.cache_creation_input_tokens
  85. return (
  86. input_tokens,
  87. output_tokens,
  88. cache_read_input_tokens,
  89. cache_write_input_tokens,
  90. )
  91. def _collect_ai_data(
  92. event: "MessageStreamEvent",
  93. model: "str | None",
  94. input_tokens: int,
  95. output_tokens: int,
  96. cache_read_input_tokens: int,
  97. cache_write_input_tokens: int,
  98. content_blocks: "list[str]",
  99. ) -> "tuple[str | None, int, int, int, int, list[str]]":
  100. """
  101. Collect model information, token usage, and collect content blocks from the AI streaming response.
  102. """
  103. with capture_internal_exceptions():
  104. if hasattr(event, "type"):
  105. if event.type == "message_start":
  106. usage = event.message.usage
  107. input_tokens += usage.input_tokens
  108. output_tokens += usage.output_tokens
  109. if hasattr(usage, "cache_read_input_tokens") and isinstance(
  110. usage.cache_read_input_tokens, int
  111. ):
  112. cache_read_input_tokens += usage.cache_read_input_tokens
  113. if hasattr(usage, "cache_creation_input_tokens") and isinstance(
  114. usage.cache_creation_input_tokens, int
  115. ):
  116. cache_write_input_tokens += usage.cache_creation_input_tokens
  117. model = event.message.model or model
  118. elif event.type == "content_block_start":
  119. pass
  120. elif event.type == "content_block_delta":
  121. if hasattr(event.delta, "text"):
  122. content_blocks.append(event.delta.text)
  123. elif hasattr(event.delta, "partial_json"):
  124. content_blocks.append(event.delta.partial_json)
  125. elif event.type == "content_block_stop":
  126. pass
  127. elif event.type == "message_delta":
  128. output_tokens += event.usage.output_tokens
  129. return (
  130. model,
  131. input_tokens,
  132. output_tokens,
  133. cache_read_input_tokens,
  134. cache_write_input_tokens,
  135. content_blocks,
  136. )
  137. def _transform_anthropic_content_block(
  138. content_block: "dict[str, Any]",
  139. ) -> "dict[str, Any]":
  140. """
  141. Transform an Anthropic content block using the Anthropic-specific transformer,
  142. with special handling for Anthropic's text-type documents.
  143. """
  144. # Handle Anthropic's text-type documents specially (not covered by shared function)
  145. if content_block.get("type") == "document":
  146. source = content_block.get("source")
  147. if isinstance(source, dict) and source.get("type") == "text":
  148. return {
  149. "type": "text",
  150. "text": source.get("data", ""),
  151. }
  152. # Use Anthropic-specific transformation
  153. result = transform_anthropic_content_part(content_block)
  154. return result if result is not None else content_block
  155. def _set_input_data(
  156. span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
  157. ) -> None:
  158. """
  159. Set input data for the span based on the provided keyword arguments for the anthropic message creation.
  160. """
  161. set_data_normalized(span, SPANDATA.GEN_AI_OPERATION_NAME, "chat")
  162. system_prompt = kwargs.get("system")
  163. messages = kwargs.get("messages")
  164. if (
  165. messages is not None
  166. and len(messages) > 0
  167. and should_send_default_pii()
  168. and integration.include_prompts
  169. ):
  170. normalized_messages = []
  171. if system_prompt:
  172. system_prompt_content: "Optional[Union[str, List[dict[str, Any]]]]" = None
  173. if isinstance(system_prompt, str):
  174. system_prompt_content = system_prompt
  175. elif isinstance(system_prompt, Iterable):
  176. system_prompt_content = []
  177. for item in system_prompt:
  178. if (
  179. isinstance(item, dict)
  180. and item.get("type") == "text"
  181. and item.get("text")
  182. ):
  183. system_prompt_content.append(item.copy())
  184. if system_prompt_content:
  185. normalized_messages.append(
  186. {
  187. "role": GEN_AI_ALLOWED_MESSAGE_ROLES.SYSTEM,
  188. "content": system_prompt_content,
  189. }
  190. )
  191. for message in messages:
  192. if (
  193. message.get("role") == GEN_AI_ALLOWED_MESSAGE_ROLES.USER
  194. and "content" in message
  195. and isinstance(message["content"], (list, tuple))
  196. ):
  197. transformed_content = []
  198. for item in message["content"]:
  199. # Skip tool_result items - they can contain images/documents
  200. # with nested structures that are difficult to redact properly
  201. if isinstance(item, dict) and item.get("type") == "tool_result":
  202. continue
  203. # Transform content blocks (images, documents, etc.)
  204. transformed_content.append(
  205. _transform_anthropic_content_block(item)
  206. if isinstance(item, dict)
  207. else item
  208. )
  209. # If there are non-tool-result items, add them as a message
  210. if transformed_content:
  211. normalized_messages.append(
  212. {
  213. "role": message.get("role"),
  214. "content": transformed_content,
  215. }
  216. )
  217. else:
  218. # Transform content for non-list messages or assistant messages
  219. transformed_message = message.copy()
  220. if "content" in transformed_message:
  221. content = transformed_message["content"]
  222. if isinstance(content, (list, tuple)):
  223. transformed_message["content"] = [
  224. _transform_anthropic_content_block(item)
  225. if isinstance(item, dict)
  226. else item
  227. for item in content
  228. ]
  229. normalized_messages.append(transformed_message)
  230. role_normalized_messages = normalize_message_roles(normalized_messages)
  231. scope = sentry_sdk.get_current_scope()
  232. messages_data = truncate_and_annotate_messages(
  233. role_normalized_messages, span, scope
  234. )
  235. if messages_data is not None:
  236. set_data_normalized(
  237. span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
  238. )
  239. set_data_normalized(
  240. span, SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False)
  241. )
  242. kwargs_keys_to_attributes = {
  243. "max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS,
  244. "model": SPANDATA.GEN_AI_REQUEST_MODEL,
  245. "temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE,
  246. "top_k": SPANDATA.GEN_AI_REQUEST_TOP_K,
  247. "top_p": SPANDATA.GEN_AI_REQUEST_TOP_P,
  248. }
  249. for key, attribute in kwargs_keys_to_attributes.items():
  250. value = kwargs.get(key)
  251. if value is not None and _is_given(value):
  252. set_data_normalized(span, attribute, value)
  253. # Input attributes: Tools
  254. tools = kwargs.get("tools")
  255. if tools is not None and _is_given(tools) and len(tools) > 0:
  256. set_data_normalized(
  257. span, SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools)
  258. )
  259. def _set_output_data(
  260. span: "Span",
  261. integration: "AnthropicIntegration",
  262. model: "str | None",
  263. input_tokens: "int | None",
  264. output_tokens: "int | None",
  265. cache_read_input_tokens: "int | None",
  266. cache_write_input_tokens: "int | None",
  267. content_blocks: "list[Any]",
  268. finish_span: bool = False,
  269. ) -> None:
  270. """
  271. Set output data for the span based on the AI response."""
  272. span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, model)
  273. if should_send_default_pii() and integration.include_prompts:
  274. output_messages: "dict[str, list[Any]]" = {
  275. "response": [],
  276. "tool": [],
  277. }
  278. for output in content_blocks:
  279. if output["type"] == "text":
  280. output_messages["response"].append(output["text"])
  281. elif output["type"] == "tool_use":
  282. output_messages["tool"].append(output)
  283. if len(output_messages["tool"]) > 0:
  284. set_data_normalized(
  285. span,
  286. SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
  287. output_messages["tool"],
  288. unpack=False,
  289. )
  290. if len(output_messages["response"]) > 0:
  291. set_data_normalized(
  292. span, SPANDATA.GEN_AI_RESPONSE_TEXT, output_messages["response"]
  293. )
  294. record_token_usage(
  295. span,
  296. input_tokens=input_tokens,
  297. output_tokens=output_tokens,
  298. input_tokens_cached=cache_read_input_tokens,
  299. input_tokens_cache_write=cache_write_input_tokens,
  300. )
  301. if finish_span:
  302. span.__exit__(None, None, None)
  303. def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
  304. integration = kwargs.pop("integration")
  305. if integration is None:
  306. return f(*args, **kwargs)
  307. if "messages" not in kwargs:
  308. return f(*args, **kwargs)
  309. try:
  310. iter(kwargs["messages"])
  311. except TypeError:
  312. return f(*args, **kwargs)
  313. model = kwargs.get("model", "")
  314. span = get_start_span_function()(
  315. op=OP.GEN_AI_CHAT,
  316. name=f"chat {model}".strip(),
  317. origin=AnthropicIntegration.origin,
  318. )
  319. span.__enter__()
  320. _set_input_data(span, kwargs, integration)
  321. result = yield f, args, kwargs
  322. with capture_internal_exceptions():
  323. if hasattr(result, "content"):
  324. (
  325. input_tokens,
  326. output_tokens,
  327. cache_read_input_tokens,
  328. cache_write_input_tokens,
  329. ) = _get_token_usage(result)
  330. content_blocks = []
  331. for content_block in result.content:
  332. if hasattr(content_block, "to_dict"):
  333. content_blocks.append(content_block.to_dict())
  334. elif hasattr(content_block, "model_dump"):
  335. content_blocks.append(content_block.model_dump())
  336. elif hasattr(content_block, "text"):
  337. content_blocks.append({"type": "text", "text": content_block.text})
  338. _set_output_data(
  339. span=span,
  340. integration=integration,
  341. model=getattr(result, "model", None),
  342. input_tokens=input_tokens,
  343. output_tokens=output_tokens,
  344. cache_read_input_tokens=cache_read_input_tokens,
  345. cache_write_input_tokens=cache_write_input_tokens,
  346. content_blocks=content_blocks,
  347. finish_span=True,
  348. )
  349. # Streaming response
  350. elif hasattr(result, "_iterator"):
  351. old_iterator = result._iterator
  352. def new_iterator() -> "Iterator[MessageStreamEvent]":
  353. model = None
  354. input_tokens = 0
  355. output_tokens = 0
  356. cache_read_input_tokens = 0
  357. cache_write_input_tokens = 0
  358. content_blocks: "list[str]" = []
  359. for event in old_iterator:
  360. (
  361. model,
  362. input_tokens,
  363. output_tokens,
  364. cache_read_input_tokens,
  365. cache_write_input_tokens,
  366. content_blocks,
  367. ) = _collect_ai_data(
  368. event,
  369. model,
  370. input_tokens,
  371. output_tokens,
  372. cache_read_input_tokens,
  373. cache_write_input_tokens,
  374. content_blocks,
  375. )
  376. yield event
  377. _set_output_data(
  378. span=span,
  379. integration=integration,
  380. model=model,
  381. input_tokens=input_tokens,
  382. output_tokens=output_tokens,
  383. cache_read_input_tokens=cache_read_input_tokens,
  384. cache_write_input_tokens=cache_write_input_tokens,
  385. content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
  386. finish_span=True,
  387. )
  388. async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
  389. model = None
  390. input_tokens = 0
  391. output_tokens = 0
  392. cache_read_input_tokens = 0
  393. cache_write_input_tokens = 0
  394. content_blocks: "list[str]" = []
  395. async for event in old_iterator:
  396. (
  397. model,
  398. input_tokens,
  399. output_tokens,
  400. cache_read_input_tokens,
  401. cache_write_input_tokens,
  402. content_blocks,
  403. ) = _collect_ai_data(
  404. event,
  405. model,
  406. input_tokens,
  407. output_tokens,
  408. cache_read_input_tokens,
  409. cache_write_input_tokens,
  410. content_blocks,
  411. )
  412. yield event
  413. _set_output_data(
  414. span=span,
  415. integration=integration,
  416. model=model,
  417. input_tokens=input_tokens,
  418. output_tokens=output_tokens,
  419. cache_read_input_tokens=cache_read_input_tokens,
  420. cache_write_input_tokens=cache_write_input_tokens,
  421. content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
  422. finish_span=True,
  423. )
  424. if str(type(result._iterator)) == "<class 'async_generator'>":
  425. result._iterator = new_iterator_async()
  426. else:
  427. result._iterator = new_iterator()
  428. else:
  429. span.set_data("unknown_response", True)
  430. span.__exit__(None, None, None)
  431. return result
  432. def _wrap_message_create(f: "Any") -> "Any":
  433. def _execute_sync(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
  434. gen = _sentry_patched_create_common(f, *args, **kwargs)
  435. try:
  436. f, args, kwargs = next(gen)
  437. except StopIteration as e:
  438. return e.value
  439. try:
  440. try:
  441. result = f(*args, **kwargs)
  442. except Exception as exc:
  443. exc_info = sys.exc_info()
  444. with capture_internal_exceptions():
  445. _capture_exception(exc)
  446. reraise(*exc_info)
  447. return gen.send(result)
  448. except StopIteration as e:
  449. return e.value
  450. @wraps(f)
  451. def _sentry_patched_create_sync(*args: "Any", **kwargs: "Any") -> "Any":
  452. integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
  453. kwargs["integration"] = integration
  454. try:
  455. return _execute_sync(f, *args, **kwargs)
  456. finally:
  457. span = sentry_sdk.get_current_span()
  458. if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR:
  459. with capture_internal_exceptions():
  460. span.__exit__(None, None, None)
  461. return _sentry_patched_create_sync
  462. def _wrap_message_create_async(f: "Any") -> "Any":
  463. async def _execute_async(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
  464. gen = _sentry_patched_create_common(f, *args, **kwargs)
  465. try:
  466. f, args, kwargs = next(gen)
  467. except StopIteration as e:
  468. return await e.value
  469. try:
  470. try:
  471. result = await f(*args, **kwargs)
  472. except Exception as exc:
  473. exc_info = sys.exc_info()
  474. with capture_internal_exceptions():
  475. _capture_exception(exc)
  476. reraise(*exc_info)
  477. return gen.send(result)
  478. except StopIteration as e:
  479. return e.value
  480. @wraps(f)
  481. async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
  482. integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)
  483. kwargs["integration"] = integration
  484. try:
  485. return await _execute_async(f, *args, **kwargs)
  486. finally:
  487. span = sentry_sdk.get_current_span()
  488. if span is not None and span.status == SPANSTATUS.INTERNAL_ERROR:
  489. with capture_internal_exceptions():
  490. span.__exit__(None, None, None)
  491. return _sentry_patched_create_async
  492. def _is_given(obj: "Any") -> bool:
  493. """
  494. Check for givenness safely across different anthropic versions.
  495. """
  496. if NotGiven is not None and isinstance(obj, NotGiven):
  497. return False
  498. if Omit is not None and isinstance(obj, Omit):
  499. return False
  500. return True