langchain.py 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198
  1. import contextvars
  2. import itertools
  3. import sys
  4. import warnings
  5. from collections import OrderedDict
  6. from functools import wraps
  7. from typing import TYPE_CHECKING
  8. import sentry_sdk
  9. from sentry_sdk.ai.monitoring import set_ai_pipeline_name
  10. from sentry_sdk.ai.utils import (
  11. GEN_AI_ALLOWED_MESSAGE_ROLES,
  12. get_start_span_function,
  13. normalize_message_roles,
  14. set_data_normalized,
  15. truncate_and_annotate_messages,
  16. transform_content_part,
  17. )
  18. from sentry_sdk.consts import OP, SPANDATA
  19. from sentry_sdk.integrations import DidNotEnable, Integration
  20. from sentry_sdk.scope import should_send_default_pii
  21. from sentry_sdk.tracing_utils import _get_value, set_span_errored
  22. from sentry_sdk.utils import capture_internal_exceptions, logger
  23. if TYPE_CHECKING:
  24. from typing import (
  25. Any,
  26. AsyncIterator,
  27. Callable,
  28. Dict,
  29. Iterator,
  30. List,
  31. Optional,
  32. Union,
  33. )
  34. from uuid import UUID
  35. from sentry_sdk.tracing import Span
  36. try:
  37. from langchain_core.agents import AgentFinish
  38. from langchain_core.callbacks import (
  39. BaseCallbackHandler,
  40. BaseCallbackManager,
  41. Callbacks,
  42. manager,
  43. )
  44. from langchain_core.messages import BaseMessage
  45. from langchain_core.outputs import LLMResult
  46. except ImportError:
  47. raise DidNotEnable("langchain not installed")
  48. try:
  49. # >=v1
  50. from langchain_classic.agents import AgentExecutor # type: ignore[import-not-found]
  51. except ImportError:
  52. try:
  53. # <v1
  54. from langchain.agents import AgentExecutor
  55. except ImportError:
  56. AgentExecutor = None
  57. # Conditional imports for embeddings providers
  58. try:
  59. from langchain_openai import OpenAIEmbeddings # type: ignore[import-not-found]
  60. except ImportError:
  61. OpenAIEmbeddings = None
  62. try:
  63. from langchain_openai import AzureOpenAIEmbeddings
  64. except ImportError:
  65. AzureOpenAIEmbeddings = None
  66. try:
  67. from langchain_google_vertexai import VertexAIEmbeddings # type: ignore[import-not-found]
  68. except ImportError:
  69. VertexAIEmbeddings = None
  70. try:
  71. from langchain_aws import BedrockEmbeddings # type: ignore[import-not-found]
  72. except ImportError:
  73. BedrockEmbeddings = None
  74. try:
  75. from langchain_cohere import CohereEmbeddings # type: ignore[import-not-found]
  76. except ImportError:
  77. CohereEmbeddings = None
  78. try:
  79. from langchain_mistralai import MistralAIEmbeddings # type: ignore[import-not-found]
  80. except ImportError:
  81. MistralAIEmbeddings = None
  82. try:
  83. from langchain_huggingface import HuggingFaceEmbeddings # type: ignore[import-not-found]
  84. except ImportError:
  85. HuggingFaceEmbeddings = None
  86. try:
  87. from langchain_ollama import OllamaEmbeddings # type: ignore[import-not-found]
  88. except ImportError:
  89. OllamaEmbeddings = None
  90. DATA_FIELDS = {
  91. "frequency_penalty": SPANDATA.GEN_AI_REQUEST_FREQUENCY_PENALTY,
  92. "function_call": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
  93. "max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS,
  94. "presence_penalty": SPANDATA.GEN_AI_REQUEST_PRESENCE_PENALTY,
  95. "temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE,
  96. "tool_calls": SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
  97. "top_k": SPANDATA.GEN_AI_REQUEST_TOP_K,
  98. "top_p": SPANDATA.GEN_AI_REQUEST_TOP_P,
  99. }
  100. def _transform_langchain_content_block(
  101. content_block: "Dict[str, Any]",
  102. ) -> "Dict[str, Any]":
  103. """
  104. Transform a LangChain content block using the shared transform_content_part function.
  105. Returns the original content block if transformation is not applicable
  106. (e.g., for text blocks or unrecognized formats).
  107. """
  108. result = transform_content_part(content_block)
  109. return result if result is not None else content_block
  110. def _transform_langchain_message_content(content: "Any") -> "Any":
  111. """
  112. Transform LangChain message content, handling both string content and
  113. list of content blocks.
  114. """
  115. if isinstance(content, str):
  116. return content
  117. if isinstance(content, (list, tuple)):
  118. transformed = []
  119. for block in content:
  120. if isinstance(block, dict):
  121. transformed.append(_transform_langchain_content_block(block))
  122. else:
  123. transformed.append(block)
  124. return transformed
  125. return content
  126. # Contextvar to track agent names in a stack for re-entrant agent support
  127. _agent_stack: "contextvars.ContextVar[Optional[List[Optional[str]]]]" = (
  128. contextvars.ContextVar("langchain_agent_stack", default=None)
  129. )
  130. def _push_agent(agent_name: "Optional[str]") -> None:
  131. """Push an agent name onto the stack."""
  132. stack = _agent_stack.get()
  133. if stack is None:
  134. stack = []
  135. else:
  136. # Copy the list to maintain contextvar isolation across async contexts
  137. stack = stack.copy()
  138. stack.append(agent_name)
  139. _agent_stack.set(stack)
  140. def _pop_agent() -> "Optional[str]":
  141. """Pop an agent name from the stack and return it."""
  142. stack = _agent_stack.get()
  143. if stack:
  144. # Copy the list to maintain contextvar isolation across async contexts
  145. stack = stack.copy()
  146. agent_name = stack.pop()
  147. _agent_stack.set(stack)
  148. return agent_name
  149. return None
  150. def _get_current_agent() -> "Optional[str]":
  151. """Get the current agent name (top of stack) without removing it."""
  152. stack = _agent_stack.get()
  153. if stack:
  154. return stack[-1]
  155. return None
  156. class LangchainIntegration(Integration):
  157. identifier = "langchain"
  158. origin = f"auto.ai.{identifier}"
  159. def __init__(
  160. self: "LangchainIntegration",
  161. include_prompts: bool = True,
  162. max_spans: "Optional[int]" = None,
  163. ) -> None:
  164. self.include_prompts = include_prompts
  165. self.max_spans = max_spans
  166. if max_spans is not None:
  167. warnings.warn(
  168. "The `max_spans` parameter of `LangchainIntegration` is "
  169. "deprecated and will be removed in version 3.0 of sentry-sdk.",
  170. DeprecationWarning,
  171. stacklevel=2,
  172. )
  173. @staticmethod
  174. def setup_once() -> None:
  175. manager._configure = _wrap_configure(manager._configure)
  176. if AgentExecutor is not None:
  177. AgentExecutor.invoke = _wrap_agent_executor_invoke(AgentExecutor.invoke)
  178. AgentExecutor.stream = _wrap_agent_executor_stream(AgentExecutor.stream)
  179. # Patch embeddings providers
  180. _patch_embeddings_provider(OpenAIEmbeddings)
  181. _patch_embeddings_provider(AzureOpenAIEmbeddings)
  182. _patch_embeddings_provider(VertexAIEmbeddings)
  183. _patch_embeddings_provider(BedrockEmbeddings)
  184. _patch_embeddings_provider(CohereEmbeddings)
  185. _patch_embeddings_provider(MistralAIEmbeddings)
  186. _patch_embeddings_provider(HuggingFaceEmbeddings)
  187. _patch_embeddings_provider(OllamaEmbeddings)
  188. class WatchedSpan:
  189. span: "Span" = None # type: ignore[assignment]
  190. children: "List[WatchedSpan]" = []
  191. is_pipeline: bool = False
  192. def __init__(self, span: "Span") -> None:
  193. self.span = span
  194. class SentryLangchainCallback(BaseCallbackHandler): # type: ignore[misc]
  195. """Callback handler that creates Sentry spans."""
  196. def __init__(
  197. self, max_span_map_size: "Optional[int]", include_prompts: bool
  198. ) -> None:
  199. self.span_map: "OrderedDict[UUID, WatchedSpan]" = OrderedDict()
  200. self.max_span_map_size = max_span_map_size
  201. self.include_prompts = include_prompts
  202. def gc_span_map(self) -> None:
  203. if self.max_span_map_size is not None:
  204. while len(self.span_map) > self.max_span_map_size:
  205. run_id, watched_span = self.span_map.popitem(last=False)
  206. self._exit_span(watched_span, run_id)
  207. def _handle_error(self, run_id: "UUID", error: "Any") -> None:
  208. with capture_internal_exceptions():
  209. if not run_id or run_id not in self.span_map:
  210. return
  211. span_data = self.span_map[run_id]
  212. span = span_data.span
  213. set_span_errored(span)
  214. sentry_sdk.capture_exception(error, span.scope)
  215. span.__exit__(None, None, None)
  216. del self.span_map[run_id]
  217. def _normalize_langchain_message(self, message: "BaseMessage") -> "Any":
  218. # Transform content to handle multimodal data (images, audio, video, files)
  219. transformed_content = _transform_langchain_message_content(message.content)
  220. parsed = {"role": message.type, "content": transformed_content}
  221. parsed.update(message.additional_kwargs)
  222. return parsed
  223. def _create_span(
  224. self: "SentryLangchainCallback",
  225. run_id: "UUID",
  226. parent_id: "Optional[Any]",
  227. **kwargs: "Any",
  228. ) -> "WatchedSpan":
  229. watched_span: "Optional[WatchedSpan]" = None
  230. if parent_id:
  231. parent_span: "Optional[WatchedSpan]" = self.span_map.get(parent_id)
  232. if parent_span:
  233. watched_span = WatchedSpan(parent_span.span.start_child(**kwargs))
  234. parent_span.children.append(watched_span)
  235. if watched_span is None:
  236. watched_span = WatchedSpan(sentry_sdk.start_span(**kwargs))
  237. watched_span.span.__enter__()
  238. self.span_map[run_id] = watched_span
  239. self.gc_span_map()
  240. return watched_span
  241. def _exit_span(
  242. self: "SentryLangchainCallback", span_data: "WatchedSpan", run_id: "UUID"
  243. ) -> None:
  244. if span_data.is_pipeline:
  245. set_ai_pipeline_name(None)
  246. span_data.span.__exit__(None, None, None)
  247. del self.span_map[run_id]
  248. def on_llm_start(
  249. self: "SentryLangchainCallback",
  250. serialized: "Dict[str, Any]",
  251. prompts: "List[str]",
  252. *,
  253. run_id: "UUID",
  254. tags: "Optional[List[str]]" = None,
  255. parent_run_id: "Optional[UUID]" = None,
  256. metadata: "Optional[Dict[str, Any]]" = None,
  257. **kwargs: "Any",
  258. ) -> "Any":
  259. """Run when LLM starts running."""
  260. with capture_internal_exceptions():
  261. if not run_id:
  262. return
  263. all_params = kwargs.get("invocation_params", {})
  264. all_params.update(serialized.get("kwargs", {}))
  265. model = (
  266. all_params.get("model")
  267. or all_params.get("model_name")
  268. or all_params.get("model_id")
  269. or ""
  270. )
  271. watched_span = self._create_span(
  272. run_id,
  273. parent_run_id,
  274. op=OP.GEN_AI_PIPELINE,
  275. name=kwargs.get("name") or "Langchain LLM call",
  276. origin=LangchainIntegration.origin,
  277. )
  278. span = watched_span.span
  279. if model:
  280. span.set_data(
  281. SPANDATA.GEN_AI_REQUEST_MODEL,
  282. model,
  283. )
  284. ai_type = all_params.get("_type", "")
  285. if "anthropic" in ai_type:
  286. span.set_data(SPANDATA.GEN_AI_SYSTEM, "anthropic")
  287. elif "openai" in ai_type:
  288. span.set_data(SPANDATA.GEN_AI_SYSTEM, "openai")
  289. for key, attribute in DATA_FIELDS.items():
  290. if key in all_params and all_params[key] is not None:
  291. set_data_normalized(span, attribute, all_params[key], unpack=False)
  292. _set_tools_on_span(span, all_params.get("tools"))
  293. if should_send_default_pii() and self.include_prompts:
  294. normalized_messages = [
  295. {
  296. "role": GEN_AI_ALLOWED_MESSAGE_ROLES.USER,
  297. "content": {"type": "text", "text": prompt},
  298. }
  299. for prompt in prompts
  300. ]
  301. scope = sentry_sdk.get_current_scope()
  302. messages_data = truncate_and_annotate_messages(
  303. normalized_messages, span, scope
  304. )
  305. if messages_data is not None:
  306. set_data_normalized(
  307. span,
  308. SPANDATA.GEN_AI_REQUEST_MESSAGES,
  309. messages_data,
  310. unpack=False,
  311. )
  312. def on_chat_model_start(
  313. self: "SentryLangchainCallback",
  314. serialized: "Dict[str, Any]",
  315. messages: "List[List[BaseMessage]]",
  316. *,
  317. run_id: "UUID",
  318. **kwargs: "Any",
  319. ) -> "Any":
  320. """Run when Chat Model starts running."""
  321. with capture_internal_exceptions():
  322. if not run_id:
  323. return
  324. all_params = kwargs.get("invocation_params", {})
  325. all_params.update(serialized.get("kwargs", {}))
  326. model = (
  327. all_params.get("model")
  328. or all_params.get("model_name")
  329. or all_params.get("model_id")
  330. or ""
  331. )
  332. watched_span = self._create_span(
  333. run_id,
  334. kwargs.get("parent_run_id"),
  335. op=OP.GEN_AI_CHAT,
  336. name=f"chat {model}".strip(),
  337. origin=LangchainIntegration.origin,
  338. )
  339. span = watched_span.span
  340. span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat")
  341. if model:
  342. span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model)
  343. ai_type = all_params.get("_type", "")
  344. if "anthropic" in ai_type:
  345. span.set_data(SPANDATA.GEN_AI_SYSTEM, "anthropic")
  346. elif "openai" in ai_type:
  347. span.set_data(SPANDATA.GEN_AI_SYSTEM, "openai")
  348. agent_name = _get_current_agent()
  349. if agent_name:
  350. span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name)
  351. for key, attribute in DATA_FIELDS.items():
  352. if key in all_params and all_params[key] is not None:
  353. set_data_normalized(span, attribute, all_params[key], unpack=False)
  354. _set_tools_on_span(span, all_params.get("tools"))
  355. if should_send_default_pii() and self.include_prompts:
  356. normalized_messages = []
  357. for list_ in messages:
  358. for message in list_:
  359. normalized_messages.append(
  360. self._normalize_langchain_message(message)
  361. )
  362. normalized_messages = normalize_message_roles(normalized_messages)
  363. scope = sentry_sdk.get_current_scope()
  364. messages_data = truncate_and_annotate_messages(
  365. normalized_messages, span, scope
  366. )
  367. if messages_data is not None:
  368. set_data_normalized(
  369. span,
  370. SPANDATA.GEN_AI_REQUEST_MESSAGES,
  371. messages_data,
  372. unpack=False,
  373. )
  374. def on_chat_model_end(
  375. self: "SentryLangchainCallback",
  376. response: "LLMResult",
  377. *,
  378. run_id: "UUID",
  379. **kwargs: "Any",
  380. ) -> "Any":
  381. """Run when Chat Model ends running."""
  382. with capture_internal_exceptions():
  383. if not run_id or run_id not in self.span_map:
  384. return
  385. span_data = self.span_map[run_id]
  386. span = span_data.span
  387. if should_send_default_pii() and self.include_prompts:
  388. set_data_normalized(
  389. span,
  390. SPANDATA.GEN_AI_RESPONSE_TEXT,
  391. [[x.text for x in list_] for list_ in response.generations],
  392. )
  393. _record_token_usage(span, response)
  394. self._exit_span(span_data, run_id)
  395. def on_llm_end(
  396. self: "SentryLangchainCallback",
  397. response: "LLMResult",
  398. *,
  399. run_id: "UUID",
  400. **kwargs: "Any",
  401. ) -> "Any":
  402. """Run when LLM ends running."""
  403. with capture_internal_exceptions():
  404. if not run_id or run_id not in self.span_map:
  405. return
  406. span_data = self.span_map[run_id]
  407. span = span_data.span
  408. try:
  409. generation = response.generations[0][0]
  410. except IndexError:
  411. generation = None
  412. if generation is not None:
  413. try:
  414. response_model = generation.message.response_metadata.get(
  415. "model_name"
  416. )
  417. if response_model is not None:
  418. span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model)
  419. except AttributeError:
  420. pass
  421. try:
  422. finish_reason = generation.generation_info.get("finish_reason")
  423. if finish_reason is not None:
  424. span.set_data(
  425. SPANDATA.GEN_AI_RESPONSE_FINISH_REASONS, finish_reason
  426. )
  427. except AttributeError:
  428. pass
  429. try:
  430. if should_send_default_pii() and self.include_prompts:
  431. tool_calls = getattr(generation.message, "tool_calls", None)
  432. if tool_calls is not None and tool_calls != []:
  433. set_data_normalized(
  434. span,
  435. SPANDATA.GEN_AI_RESPONSE_TOOL_CALLS,
  436. tool_calls,
  437. unpack=False,
  438. )
  439. except AttributeError:
  440. pass
  441. if should_send_default_pii() and self.include_prompts:
  442. set_data_normalized(
  443. span,
  444. SPANDATA.GEN_AI_RESPONSE_TEXT,
  445. [[x.text for x in list_] for list_ in response.generations],
  446. )
  447. _record_token_usage(span, response)
  448. self._exit_span(span_data, run_id)
  449. def on_llm_error(
  450. self: "SentryLangchainCallback",
  451. error: "Union[Exception, KeyboardInterrupt]",
  452. *,
  453. run_id: "UUID",
  454. **kwargs: "Any",
  455. ) -> "Any":
  456. """Run when LLM errors."""
  457. self._handle_error(run_id, error)
  458. def on_chat_model_error(
  459. self: "SentryLangchainCallback",
  460. error: "Union[Exception, KeyboardInterrupt]",
  461. *,
  462. run_id: "UUID",
  463. **kwargs: "Any",
  464. ) -> "Any":
  465. """Run when Chat Model errors."""
  466. self._handle_error(run_id, error)
  467. def on_agent_finish(
  468. self: "SentryLangchainCallback",
  469. finish: "AgentFinish",
  470. *,
  471. run_id: "UUID",
  472. **kwargs: "Any",
  473. ) -> "Any":
  474. with capture_internal_exceptions():
  475. if not run_id or run_id not in self.span_map:
  476. return
  477. span_data = self.span_map[run_id]
  478. span = span_data.span
  479. if should_send_default_pii() and self.include_prompts:
  480. set_data_normalized(
  481. span, SPANDATA.GEN_AI_RESPONSE_TEXT, finish.return_values.items()
  482. )
  483. self._exit_span(span_data, run_id)
  484. def on_tool_start(
  485. self: "SentryLangchainCallback",
  486. serialized: "Dict[str, Any]",
  487. input_str: str,
  488. *,
  489. run_id: "UUID",
  490. **kwargs: "Any",
  491. ) -> "Any":
  492. """Run when tool starts running."""
  493. with capture_internal_exceptions():
  494. if not run_id:
  495. return
  496. tool_name = serialized.get("name") or kwargs.get("name") or ""
  497. watched_span = self._create_span(
  498. run_id,
  499. kwargs.get("parent_run_id"),
  500. op=OP.GEN_AI_EXECUTE_TOOL,
  501. name=f"execute_tool {tool_name}".strip(),
  502. origin=LangchainIntegration.origin,
  503. )
  504. span = watched_span.span
  505. span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "execute_tool")
  506. span.set_data(SPANDATA.GEN_AI_TOOL_NAME, tool_name)
  507. tool_description = serialized.get("description")
  508. if tool_description is not None:
  509. span.set_data(SPANDATA.GEN_AI_TOOL_DESCRIPTION, tool_description)
  510. agent_name = _get_current_agent()
  511. if agent_name:
  512. span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name)
  513. if should_send_default_pii() and self.include_prompts:
  514. set_data_normalized(
  515. span,
  516. SPANDATA.GEN_AI_TOOL_INPUT,
  517. kwargs.get("inputs", [input_str]),
  518. )
  519. def on_tool_end(
  520. self: "SentryLangchainCallback", output: str, *, run_id: "UUID", **kwargs: "Any"
  521. ) -> "Any":
  522. """Run when tool ends running."""
  523. with capture_internal_exceptions():
  524. if not run_id or run_id not in self.span_map:
  525. return
  526. span_data = self.span_map[run_id]
  527. span = span_data.span
  528. if should_send_default_pii() and self.include_prompts:
  529. set_data_normalized(span, SPANDATA.GEN_AI_TOOL_OUTPUT, output)
  530. self._exit_span(span_data, run_id)
  531. def on_tool_error(
  532. self,
  533. error: "SentryLangchainCallback",
  534. *args: "Union[Exception, KeyboardInterrupt]",
  535. run_id: "UUID",
  536. **kwargs: "Any",
  537. ) -> "Any":
  538. """Run when tool errors."""
  539. self._handle_error(run_id, error)
  540. def _extract_tokens(
  541. token_usage: "Any",
  542. ) -> "tuple[Optional[int], Optional[int], Optional[int]]":
  543. if not token_usage:
  544. return None, None, None
  545. input_tokens = _get_value(token_usage, "prompt_tokens") or _get_value(
  546. token_usage, "input_tokens"
  547. )
  548. output_tokens = _get_value(token_usage, "completion_tokens") or _get_value(
  549. token_usage, "output_tokens"
  550. )
  551. total_tokens = _get_value(token_usage, "total_tokens")
  552. return input_tokens, output_tokens, total_tokens
  553. def _extract_tokens_from_generations(
  554. generations: "Any",
  555. ) -> "tuple[Optional[int], Optional[int], Optional[int]]":
  556. """Extract token usage from response.generations structure."""
  557. if not generations:
  558. return None, None, None
  559. total_input = 0
  560. total_output = 0
  561. total_total = 0
  562. for gen_list in generations:
  563. for gen in gen_list:
  564. token_usage = _get_token_usage(gen)
  565. input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage)
  566. total_input += input_tokens if input_tokens is not None else 0
  567. total_output += output_tokens if output_tokens is not None else 0
  568. total_total += total_tokens if total_tokens is not None else 0
  569. return (
  570. total_input if total_input > 0 else None,
  571. total_output if total_output > 0 else None,
  572. total_total if total_total > 0 else None,
  573. )
  574. def _get_token_usage(obj: "Any") -> "Optional[Dict[str, Any]]":
  575. """
  576. Check multiple paths to extract token usage from different objects.
  577. """
  578. possible_names = ("usage", "token_usage", "usage_metadata")
  579. message = _get_value(obj, "message")
  580. if message is not None:
  581. for name in possible_names:
  582. usage = _get_value(message, name)
  583. if usage is not None:
  584. return usage
  585. llm_output = _get_value(obj, "llm_output")
  586. if llm_output is not None:
  587. for name in possible_names:
  588. usage = _get_value(llm_output, name)
  589. if usage is not None:
  590. return usage
  591. for name in possible_names:
  592. usage = _get_value(obj, name)
  593. if usage is not None:
  594. return usage
  595. return None
  596. def _record_token_usage(span: "Span", response: "Any") -> None:
  597. token_usage = _get_token_usage(response)
  598. if token_usage:
  599. input_tokens, output_tokens, total_tokens = _extract_tokens(token_usage)
  600. else:
  601. input_tokens, output_tokens, total_tokens = _extract_tokens_from_generations(
  602. response.generations
  603. )
  604. if input_tokens is not None:
  605. span.set_data(SPANDATA.GEN_AI_USAGE_INPUT_TOKENS, input_tokens)
  606. if output_tokens is not None:
  607. span.set_data(SPANDATA.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens)
  608. if total_tokens is not None:
  609. span.set_data(SPANDATA.GEN_AI_USAGE_TOTAL_TOKENS, total_tokens)
  610. def _get_request_data(
  611. obj: "Any", args: "Any", kwargs: "Any"
  612. ) -> "tuple[Optional[str], Optional[List[Any]]]":
  613. """
  614. Get the agent name and available tools for the agent.
  615. """
  616. agent = getattr(obj, "agent", None)
  617. runnable = getattr(agent, "runnable", None)
  618. runnable_config = getattr(runnable, "config", {})
  619. tools = (
  620. getattr(obj, "tools", None)
  621. or getattr(agent, "tools", None)
  622. or runnable_config.get("tools")
  623. or runnable_config.get("available_tools")
  624. )
  625. tools = tools if tools and len(tools) > 0 else None
  626. try:
  627. agent_name = None
  628. if len(args) > 1:
  629. agent_name = args[1].get("run_name")
  630. if agent_name is None:
  631. agent_name = runnable_config.get("run_name")
  632. except Exception:
  633. pass
  634. return (agent_name, tools)
  635. def _simplify_langchain_tools(tools: "Any") -> "Optional[List[Any]]":
  636. """Parse and simplify tools into a cleaner format."""
  637. if not tools:
  638. return None
  639. if not isinstance(tools, (list, tuple)):
  640. return None
  641. simplified_tools = []
  642. for tool in tools:
  643. try:
  644. if isinstance(tool, dict):
  645. if "function" in tool and isinstance(tool["function"], dict):
  646. func = tool["function"]
  647. simplified_tool = {
  648. "name": func.get("name"),
  649. "description": func.get("description"),
  650. }
  651. if simplified_tool["name"]:
  652. simplified_tools.append(simplified_tool)
  653. elif "name" in tool:
  654. simplified_tool = {
  655. "name": tool.get("name"),
  656. "description": tool.get("description"),
  657. }
  658. simplified_tools.append(simplified_tool)
  659. else:
  660. name = (
  661. tool.get("name")
  662. or tool.get("tool_name")
  663. or tool.get("function_name")
  664. )
  665. if name:
  666. simplified_tools.append(
  667. {
  668. "name": name,
  669. "description": tool.get("description")
  670. or tool.get("desc"),
  671. }
  672. )
  673. elif hasattr(tool, "name"):
  674. simplified_tool = {
  675. "name": getattr(tool, "name", None),
  676. "description": getattr(tool, "description", None)
  677. or getattr(tool, "desc", None),
  678. }
  679. if simplified_tool["name"]:
  680. simplified_tools.append(simplified_tool)
  681. elif hasattr(tool, "__name__"):
  682. simplified_tools.append(
  683. {
  684. "name": tool.__name__,
  685. "description": getattr(tool, "__doc__", None),
  686. }
  687. )
  688. else:
  689. tool_str = str(tool)
  690. if tool_str and tool_str != "":
  691. simplified_tools.append({"name": tool_str, "description": None})
  692. except Exception:
  693. continue
  694. return simplified_tools if simplified_tools else None
  695. def _set_tools_on_span(span: "Span", tools: "Any") -> None:
  696. """Set available tools data on a span if tools are provided."""
  697. if tools is not None:
  698. simplified_tools = _simplify_langchain_tools(tools)
  699. if simplified_tools:
  700. set_data_normalized(
  701. span,
  702. SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS,
  703. simplified_tools,
  704. unpack=False,
  705. )
  706. def _wrap_configure(f: "Callable[..., Any]") -> "Callable[..., Any]":
  707. @wraps(f)
  708. def new_configure(
  709. callback_manager_cls: type,
  710. inheritable_callbacks: "Callbacks" = None,
  711. local_callbacks: "Callbacks" = None,
  712. *args: "Any",
  713. **kwargs: "Any",
  714. ) -> "Any":
  715. integration = sentry_sdk.get_client().get_integration(LangchainIntegration)
  716. if integration is None:
  717. return f(
  718. callback_manager_cls,
  719. inheritable_callbacks,
  720. local_callbacks,
  721. *args,
  722. **kwargs,
  723. )
  724. local_callbacks = local_callbacks or []
  725. # Handle each possible type of local_callbacks. For each type, we
  726. # extract the list of callbacks to check for SentryLangchainCallback,
  727. # and define a function that would add the SentryLangchainCallback
  728. # to the existing callbacks list.
  729. if isinstance(local_callbacks, BaseCallbackManager):
  730. callbacks_list = local_callbacks.handlers
  731. elif isinstance(local_callbacks, BaseCallbackHandler):
  732. callbacks_list = [local_callbacks]
  733. elif isinstance(local_callbacks, list):
  734. callbacks_list = local_callbacks
  735. else:
  736. logger.debug("Unknown callback type: %s", local_callbacks)
  737. # Just proceed with original function call
  738. return f(
  739. callback_manager_cls,
  740. inheritable_callbacks,
  741. local_callbacks,
  742. *args,
  743. **kwargs,
  744. )
  745. # Handle each possible type of inheritable_callbacks.
  746. if isinstance(inheritable_callbacks, BaseCallbackManager):
  747. inheritable_callbacks_list = inheritable_callbacks.handlers
  748. elif isinstance(inheritable_callbacks, list):
  749. inheritable_callbacks_list = inheritable_callbacks
  750. else:
  751. inheritable_callbacks_list = []
  752. if not any(
  753. isinstance(cb, SentryLangchainCallback)
  754. for cb in itertools.chain(callbacks_list, inheritable_callbacks_list)
  755. ):
  756. sentry_handler = SentryLangchainCallback(
  757. integration.max_spans,
  758. integration.include_prompts,
  759. )
  760. if isinstance(local_callbacks, BaseCallbackManager):
  761. local_callbacks = local_callbacks.copy()
  762. local_callbacks.handlers = [
  763. *local_callbacks.handlers,
  764. sentry_handler,
  765. ]
  766. elif isinstance(local_callbacks, BaseCallbackHandler):
  767. local_callbacks = [local_callbacks, sentry_handler]
  768. else:
  769. local_callbacks = [*local_callbacks, sentry_handler]
  770. return f(
  771. callback_manager_cls,
  772. inheritable_callbacks,
  773. local_callbacks,
  774. *args,
  775. **kwargs,
  776. )
  777. return new_configure
  778. def _wrap_agent_executor_invoke(f: "Callable[..., Any]") -> "Callable[..., Any]":
  779. @wraps(f)
  780. def new_invoke(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
  781. integration = sentry_sdk.get_client().get_integration(LangchainIntegration)
  782. if integration is None:
  783. return f(self, *args, **kwargs)
  784. agent_name, tools = _get_request_data(self, args, kwargs)
  785. start_span_function = get_start_span_function()
  786. with start_span_function(
  787. op=OP.GEN_AI_INVOKE_AGENT,
  788. name=f"invoke_agent {agent_name}" if agent_name else "invoke_agent",
  789. origin=LangchainIntegration.origin,
  790. ) as span:
  791. _push_agent(agent_name)
  792. try:
  793. if agent_name:
  794. span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name)
  795. span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent")
  796. span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, False)
  797. _set_tools_on_span(span, tools)
  798. # Run the agent
  799. result = f(self, *args, **kwargs)
  800. input = result.get("input")
  801. if (
  802. input is not None
  803. and should_send_default_pii()
  804. and integration.include_prompts
  805. ):
  806. normalized_messages = normalize_message_roles([input])
  807. scope = sentry_sdk.get_current_scope()
  808. messages_data = truncate_and_annotate_messages(
  809. normalized_messages, span, scope
  810. )
  811. if messages_data is not None:
  812. set_data_normalized(
  813. span,
  814. SPANDATA.GEN_AI_REQUEST_MESSAGES,
  815. messages_data,
  816. unpack=False,
  817. )
  818. output = result.get("output")
  819. if (
  820. output is not None
  821. and should_send_default_pii()
  822. and integration.include_prompts
  823. ):
  824. set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)
  825. return result
  826. finally:
  827. # Ensure agent is popped even if an exception occurs
  828. _pop_agent()
  829. return new_invoke
  830. def _wrap_agent_executor_stream(f: "Callable[..., Any]") -> "Callable[..., Any]":
  831. @wraps(f)
  832. def new_stream(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
  833. integration = sentry_sdk.get_client().get_integration(LangchainIntegration)
  834. if integration is None:
  835. return f(self, *args, **kwargs)
  836. agent_name, tools = _get_request_data(self, args, kwargs)
  837. start_span_function = get_start_span_function()
  838. span = start_span_function(
  839. op=OP.GEN_AI_INVOKE_AGENT,
  840. name=f"invoke_agent {agent_name}" if agent_name else "invoke_agent",
  841. origin=LangchainIntegration.origin,
  842. )
  843. span.__enter__()
  844. _push_agent(agent_name)
  845. if agent_name:
  846. span.set_data(SPANDATA.GEN_AI_AGENT_NAME, agent_name)
  847. span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "invoke_agent")
  848. span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
  849. _set_tools_on_span(span, tools)
  850. input = args[0].get("input") if len(args) >= 1 else None
  851. if (
  852. input is not None
  853. and should_send_default_pii()
  854. and integration.include_prompts
  855. ):
  856. normalized_messages = normalize_message_roles([input])
  857. scope = sentry_sdk.get_current_scope()
  858. messages_data = truncate_and_annotate_messages(
  859. normalized_messages, span, scope
  860. )
  861. if messages_data is not None:
  862. set_data_normalized(
  863. span,
  864. SPANDATA.GEN_AI_REQUEST_MESSAGES,
  865. messages_data,
  866. unpack=False,
  867. )
  868. # Run the agent
  869. result = f(self, *args, **kwargs)
  870. old_iterator = result
  871. def new_iterator() -> "Iterator[Any]":
  872. exc_info: "tuple[Any, Any, Any]" = (None, None, None)
  873. try:
  874. for event in old_iterator:
  875. yield event
  876. try:
  877. output = event.get("output")
  878. except Exception:
  879. output = None
  880. if (
  881. output is not None
  882. and should_send_default_pii()
  883. and integration.include_prompts
  884. ):
  885. set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)
  886. except Exception:
  887. exc_info = sys.exc_info()
  888. set_span_errored(span)
  889. raise
  890. finally:
  891. # Ensure cleanup happens even if iterator is abandoned or fails
  892. _pop_agent()
  893. span.__exit__(*exc_info)
  894. async def new_iterator_async() -> "AsyncIterator[Any]":
  895. exc_info: "tuple[Any, Any, Any]" = (None, None, None)
  896. try:
  897. async for event in old_iterator:
  898. yield event
  899. try:
  900. output = event.get("output")
  901. except Exception:
  902. output = None
  903. if (
  904. output is not None
  905. and should_send_default_pii()
  906. and integration.include_prompts
  907. ):
  908. set_data_normalized(span, SPANDATA.GEN_AI_RESPONSE_TEXT, output)
  909. except Exception:
  910. exc_info = sys.exc_info()
  911. set_span_errored(span)
  912. raise
  913. finally:
  914. # Ensure cleanup happens even if iterator is abandoned or fails
  915. _pop_agent()
  916. span.__exit__(*exc_info)
  917. if str(type(result)) == "<class 'async_generator'>":
  918. result = new_iterator_async()
  919. else:
  920. result = new_iterator()
  921. return result
  922. return new_stream
  923. def _patch_embeddings_provider(provider_class: "Any") -> None:
  924. """Patch an embeddings provider class with monitoring wrappers."""
  925. if provider_class is None:
  926. return
  927. if hasattr(provider_class, "embed_documents"):
  928. provider_class.embed_documents = _wrap_embedding_method(
  929. provider_class.embed_documents
  930. )
  931. if hasattr(provider_class, "embed_query"):
  932. provider_class.embed_query = _wrap_embedding_method(provider_class.embed_query)
  933. if hasattr(provider_class, "aembed_documents"):
  934. provider_class.aembed_documents = _wrap_async_embedding_method(
  935. provider_class.aembed_documents
  936. )
  937. if hasattr(provider_class, "aembed_query"):
  938. provider_class.aembed_query = _wrap_async_embedding_method(
  939. provider_class.aembed_query
  940. )
  941. def _wrap_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]":
  942. """Wrap sync embedding methods (embed_documents and embed_query)."""
  943. @wraps(f)
  944. def new_embedding_method(self: "Any", *args: "Any", **kwargs: "Any") -> "Any":
  945. integration = sentry_sdk.get_client().get_integration(LangchainIntegration)
  946. if integration is None:
  947. return f(self, *args, **kwargs)
  948. model_name = getattr(self, "model", None) or getattr(self, "model_name", None)
  949. with sentry_sdk.start_span(
  950. op=OP.GEN_AI_EMBEDDINGS,
  951. name=f"embeddings {model_name}" if model_name else "embeddings",
  952. origin=LangchainIntegration.origin,
  953. ) as span:
  954. span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
  955. if model_name:
  956. span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)
  957. # Capture input if PII is allowed
  958. if (
  959. should_send_default_pii()
  960. and integration.include_prompts
  961. and len(args) > 0
  962. ):
  963. input_data = args[0]
  964. # Normalize to list format
  965. texts = input_data if isinstance(input_data, list) else [input_data]
  966. set_data_normalized(
  967. span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
  968. )
  969. result = f(self, *args, **kwargs)
  970. return result
  971. return new_embedding_method
  972. def _wrap_async_embedding_method(f: "Callable[..., Any]") -> "Callable[..., Any]":
  973. """Wrap async embedding methods (aembed_documents and aembed_query)."""
  974. @wraps(f)
  975. async def new_async_embedding_method(
  976. self: "Any", *args: "Any", **kwargs: "Any"
  977. ) -> "Any":
  978. integration = sentry_sdk.get_client().get_integration(LangchainIntegration)
  979. if integration is None:
  980. return await f(self, *args, **kwargs)
  981. model_name = getattr(self, "model", None) or getattr(self, "model_name", None)
  982. with sentry_sdk.start_span(
  983. op=OP.GEN_AI_EMBEDDINGS,
  984. name=f"embeddings {model_name}" if model_name else "embeddings",
  985. origin=LangchainIntegration.origin,
  986. ) as span:
  987. span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "embeddings")
  988. if model_name:
  989. span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name)
  990. # Capture input if PII is allowed
  991. if (
  992. should_send_default_pii()
  993. and integration.include_prompts
  994. and len(args) > 0
  995. ):
  996. input_data = args[0]
  997. # Normalize to list format
  998. texts = input_data if isinstance(input_data, list) else [input_data]
  999. set_data_normalized(
  1000. span, SPANDATA.GEN_AI_EMBEDDINGS_INPUT, texts, unpack=False
  1001. )
  1002. result = await f(self, *args, **kwargs)
  1003. return result
  1004. return new_async_embedding_method