From a103cfa49120945b413f3d9362a9557a3e9f4675 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmik Date: Mon, 16 Jun 2025 17:20:58 +0200 Subject: [PATCH 1/7] Implement track_adk_agent_recursive, fix missing input bug for sub_agents --- .../src/opik/integrations/adk/__init__.py | 3 +- .../integrations/adk/callback_injector.py | 104 +++++++ .../src/opik/integrations/adk/opik_tracer.py | 1 + .../library_integration/adk/test_adk_sync.py | 281 +++++++++++++++++- 4 files changed, 385 insertions(+), 4 deletions(-) create mode 100644 sdks/python/src/opik/integrations/adk/callback_injector.py diff --git a/sdks/python/src/opik/integrations/adk/__init__.py b/sdks/python/src/opik/integrations/adk/__init__.py index 9e9bfebb98..e167530a39 100644 --- a/sdks/python/src/opik/integrations/adk/__init__.py +++ b/sdks/python/src/opik/integrations/adk/__init__.py @@ -1,3 +1,4 @@ from .opik_tracer import OpikTracer +from .callback_injector import track_adk_agent_recursive -__all__ = ["OpikTracer"] +__all__ = ["OpikTracer", "track_adk_agent_recursive"] diff --git a/sdks/python/src/opik/integrations/adk/callback_injector.py b/sdks/python/src/opik/integrations/adk/callback_injector.py new file mode 100644 index 0000000000..81f2a9bcfc --- /dev/null +++ b/sdks/python/src/opik/integrations/adk/callback_injector.py @@ -0,0 +1,104 @@ +from typing import TypeVar +from . import opik_tracer +import logging +from typing import Set + +from google.adk.tools import agent_tool +from google.adk import agents + +LOGGER = logging.getLogger(__name__) + +ADKAgent = TypeVar("ADKAgent", bound=agents.BaseAgent) + + +def track_adk_agent_recursive( + root_agent: ADKAgent, tracer: opik_tracer.OpikTracer +) -> ADKAgent: + """ + Recursively adds opik tracer callbacks to the agent, its subagents, and agent tools. + + Args: + root_agent: The root ADK agent to track + tracer: The OpikTracer instance to use for tracking + + Returns: + The modified root agent with tracking enabled + """ + LOGGER.info( + "track_adk_agent_recursive is experimental feature. Please let us know if something is not working as expected: https://github.com/comet-ml/opik/issues" + ) + processed_agent_instance_ids: Set[int] = set() + + _process_agent(root_agent, tracer, processed_agent_instance_ids) + + return root_agent + + +def _process_agent( + agent: ADKAgent, + tracer: opik_tracer.OpikTracer, + processed_agent_instance_ids: Set[int], +) -> None: + if id(agent) in processed_agent_instance_ids: + return + + _add_callbacks_to_agent(agent, tracer) + _process_sub_agents(agent, tracer, processed_agent_instance_ids) + _process_tools(agent, tracer, processed_agent_instance_ids) + + processed_agent_instance_ids.add(id(agent)) + + +def _add_callbacks_to_agent(agent: ADKAgent, tracer: opik_tracer.OpikTracer) -> None: + callback_fields = { + "before_agent_callback": tracer.before_agent_callback, + "after_agent_callback": tracer.after_agent_callback, + "before_model_callback": tracer.before_model_callback, + "after_model_callback": tracer.after_model_callback, + "before_tool_callback": tracer.before_tool_callback, + "after_tool_callback": tracer.after_tool_callback, + } + + for callback_field_name, callback_func in callback_fields.items(): + if hasattr(agent, callback_field_name): + current_callback_value = getattr(agent, callback_field_name) + if current_callback_value is None: + setattr(agent, callback_field_name, callback_func) + elif isinstance(current_callback_value, list): + current_callback_value.append(callback_func) + else: + setattr( + agent, callback_field_name, [current_callback_value, callback_func] + ) + + +def _process_sub_agents( + agent: ADKAgent, + tracer: opik_tracer.OpikTracer, + processed_agent_instance_ids: Set[int], +) -> None: + if not hasattr(agent, "sub_agents"): + return + + for sub_agent in agent.sub_agents: + try: + _process_agent(sub_agent, tracer, processed_agent_instance_ids) + except Exception as e: + LOGGER.warning(f"Failed to track subagent: {e}") + + +def _process_tools( + agent: ADKAgent, + tracer: opik_tracer.OpikTracer, + processed_agent_instance_ids: Set[int], +) -> None: + if not hasattr(agent, "tools"): + return + + for tool in agent.tools: + if not isinstance(tool, agent_tool.AgentTool): + continue + try: + _process_agent(tool.agent, tracer, processed_agent_instance_ids) + except Exception as e: + LOGGER.warning(f"Failed to track agent tool: {e}") diff --git a/sdks/python/src/opik/integrations/adk/opik_tracer.py b/sdks/python/src/opik/integrations/adk/opik_tracer.py index a1cf8c65fe..107c9197b1 100644 --- a/sdks/python/src/opik/integrations/adk/opik_tracer.py +++ b/sdks/python/src/opik/integrations/adk/opik_tracer.py @@ -139,6 +139,7 @@ def before_agent_callback( project_name=self.project_name, metadata=trace_metadata, tags=self.tags, + input=user_input, type="general", ) _, opik_span_data = ( diff --git a/sdks/python/tests/library_integration/adk/test_adk_sync.py b/sdks/python/tests/library_integration/adk/test_adk_sync.py index 7c0cd6d01b..5627fda727 100644 --- a/sdks/python/tests/library_integration/adk/test_adk_sync.py +++ b/sdks/python/tests/library_integration/adk/test_adk_sync.py @@ -6,9 +6,11 @@ from google.adk import sessions as adk_sessions from google.adk import events as adk_events from google.adk.models import lite_llm as adk_lite_llm +from google.adk.tools import agent_tool as adk_agent_tool + from google.genai import types as genai_types -from opik.integrations.adk import OpikTracer +from opik.integrations.adk import OpikTracer, track_adk_agent_recursive from ...testlib import ( ANY_BUT_NONE, ANY_DICT, @@ -474,7 +476,7 @@ def test_adk__sequential_agent_with_subagents__every_subagent_has_its_own_span( last_updated_at=ANY_BUT_NONE, metadata=ANY_DICT, type="general", - input=None, + input=ANY_DICT, output=ANY_DICT, spans=[ SpanModel( @@ -501,7 +503,7 @@ def test_adk__sequential_agent_with_subagents__every_subagent_has_its_own_span( last_updated_at=ANY_BUT_NONE, metadata=ANY_DICT, type="general", - input=None, + input=ANY_DICT, output=ANY_DICT, spans=[ SpanModel( @@ -796,3 +798,276 @@ def test_adk__litellm_used_for_openai_model__usage_logged_in_openai_format( assert_dict_has_keys( trace_tree.spans[2].usage, EXPECTED_USAGE_KEYS_IN_OPENAI_FORMAT ) + + +def test_adk__track_adk_agent_recursive__sequential_agent_with_subagent__every_subagent_is_tracked( + fake_backend, +): + opik_tracer = OpikTracer() + + translator_to_english = adk_agents.Agent( + name="Translator", + model=MODEL_NAME, + description="Translates text to English.", + ) + summarizer = adk_agents.Agent( + name="Summarizer", + model=MODEL_NAME, + description="Summarizes text to 1 sentence.", + ) + root_agent = adk_agents.SequentialAgent( + name="TextProcessingAssistant", + sub_agents=[translator_to_english, summarizer], + description="Runs translator to english then summarizer, in order.", + ) + + track_adk_agent_recursive(root_agent, opik_tracer) + + runner = _build_runner(root_agent) + + INPUT_GERMAN_TEXT = ( + "Wie große Sprachmodelle (LLMs) funktionieren\n\n" + "Große Sprachmodelle (LLMs) werden mit riesigen Mengen an Text trainiert,\n" + "um Muster in der Sprache zu erkennen. Sie verwenden eine Art neuronales Netzwerk,\n" + "das Transformer genannt wird. Dieses ermöglicht es ihnen, den Kontext und die Beziehungen\n" + "zwischen Wörtern zu verstehen.\n" + "Wenn man einem LLM eine Eingabe gibt, sagt es die wahrscheinlichsten nächsten Wörter\n" + "voraus – basierend auf allem, was es während des Trainings gelernt hat.\n" + "Es „versteht“ nicht im menschlichen Sinne, aber es erzeugt Antworten, die oft intelligent wirken,\n" + "weil es so viele Daten gesehen hat.\n" + "Je mehr Daten und Training ein Modell hat, desto besser kann es Aufgaben wie das Beantworten von Fragen,\n" + "das Schreiben von Texten oder das Zusammenfassen von Inhalten erfüllen.\n" + ) + + events = runner.run( + user_id=USER_ID, + session_id=SESSION_ID, + new_message=genai_types.Content( + role="user", parts=[genai_types.Part(text=INPUT_GERMAN_TEXT)] + ), + ) + final_response = _extract_final_response_text(events) + + opik.flush_tracker() + assert len(fake_backend.trace_trees) > 0 + trace_tree = fake_backend.trace_trees[0] + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="TextProcessingAssistant", + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata={ + "created_from": "google-adk", + "adk_invocation_id": ANY_STRING, + "app_name": APP_NAME, + "user_id": USER_ID, + }, + output=ANY_DICT.containing( + {"content": {"parts": [{"text": final_response}], "role": "model"}} + ), + input={ + "role": "user", + "parts": [{"text": INPUT_GERMAN_TEXT}], + }, + thread_id=SESSION_ID, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + name="Translator", + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata=ANY_DICT, + type="general", + input=ANY_DICT, + output=ANY_DICT, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + name=MODEL_NAME, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata=ANY_DICT, + type="llm", + input=ANY_DICT, + output=ANY_DICT, + provider=opik_adk_helpers.get_adk_provider(), + model=MODEL_NAME, + usage=ANY_DICT, + ) + ], + ), + SpanModel( + id=ANY_BUT_NONE, + name="Summarizer", + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata=ANY_DICT, + type="general", + input=ANY_DICT, + output=ANY_DICT, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + name=MODEL_NAME, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata=ANY_DICT, + type="llm", + input=ANY_DICT, + output=ANY_DICT, + provider=opik_adk_helpers.get_adk_provider(), + model=MODEL_NAME, + usage=ANY_DICT, + ) + ], + ), + ], + ) + + assert_equal(EXPECTED_TRACE_TREE, trace_tree) + assert_dict_has_keys(trace_tree.spans[0].spans[0].usage, EXPECTED_USAGE_KEYS_GOOGLE) + assert_dict_has_keys(trace_tree.spans[1].spans[0].usage, EXPECTED_USAGE_KEYS_GOOGLE) + + +def test_adk__track_adk_agent_recursive__agent_tool_is_used__agent_tool_is_tracked( + fake_backend, +): + opik_tracer = OpikTracer() + + translator_to_english = adk_agents.Agent( + name="Translator", + model=MODEL_NAME, + description="Translates text to English.", + ) + + root_agent = adk_agents.Agent( + name="TextProcessingAssistant", + model=MODEL_NAME, + tools=[adk_agent_tool.AgentTool(agent=translator_to_english)], + description="Agent responsible for translating text to english by invoking a special tool for that.", + ) + + track_adk_agent_recursive(root_agent, opik_tracer) + + runner = _build_runner(root_agent) + + INPUT_GERMAN_TEXT = "Wie große Sprachmodelle (LLMs) funktionieren\n\n" + + events = runner.run( + user_id=USER_ID, + session_id=SESSION_ID, + new_message=genai_types.Content( + role="user", parts=[genai_types.Part(text=INPUT_GERMAN_TEXT)] + ), + ) + final_response = _extract_final_response_text(events) + + opik.flush_tracker() + assert len(fake_backend.trace_trees) > 0 + trace_tree = fake_backend.trace_trees[0] + + EXPECTED_TRACE_TREE = TraceModel( + id=ANY_BUT_NONE, + name="TextProcessingAssistant", + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata={ + "created_from": "google-adk", + "adk_invocation_id": ANY_STRING, + "app_name": APP_NAME, + "user_id": USER_ID, + }, + output=ANY_DICT.containing( + {"content": {"parts": [{"text": final_response}], "role": "model"}} + ), + input={ + "role": "user", + "parts": [{"text": INPUT_GERMAN_TEXT}], + }, + thread_id=SESSION_ID, + spans=[ + SpanModel( + id=ANY_BUT_NONE, + name=MODEL_NAME, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata=ANY_DICT, + type="llm", + input=ANY_DICT, + output=ANY_DICT, + provider=opik_adk_helpers.get_adk_provider(), + model=MODEL_NAME, + usage=ANY_DICT, + ), + SpanModel( # from tool callback + id=ANY_BUT_NONE, + name="Translator", + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata=ANY_DICT, + type="tool", + input=ANY_DICT, + output=ANY_DICT, + spans=[ + SpanModel( # from agent callback + id=ANY_BUT_NONE, + name="Translator", + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata=ANY_DICT, + type="general", + input=ANY_DICT, + output=ANY_DICT, + spans=[ + SpanModel( # from model callback inside the agent tool + id=ANY_BUT_NONE, + name=MODEL_NAME, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata=ANY_DICT, + type="llm", + input=ANY_DICT, + output=ANY_DICT, + provider=opik_adk_helpers.get_adk_provider(), + model=MODEL_NAME, + usage=ANY_DICT, + ) + ], + ) + ], + ), + SpanModel( + id=ANY_BUT_NONE, + name=MODEL_NAME, + start_time=ANY_BUT_NONE, + end_time=ANY_BUT_NONE, + last_updated_at=ANY_BUT_NONE, + metadata=ANY_DICT, + type="llm", + input=ANY_DICT, + output=ANY_DICT, + provider=opik_adk_helpers.get_adk_provider(), + model=MODEL_NAME, + usage=ANY_DICT, + ), + ], + ) + + assert_equal(EXPECTED_TRACE_TREE, trace_tree) + + assert_dict_has_keys(trace_tree.spans[0].usage, EXPECTED_USAGE_KEYS_GOOGLE) + assert_dict_has_keys( + trace_tree.spans[1].spans[0].spans[0].usage, EXPECTED_USAGE_KEYS_GOOGLE + ) + assert_dict_has_keys(trace_tree.spans[2].usage, EXPECTED_USAGE_KEYS_GOOGLE) From b18c96102c0adbf0ba713fd1ed03a5456abeb092 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmik Date: Mon, 16 Jun 2025 17:23:04 +0200 Subject: [PATCH 2/7] update async test --- sdks/python/tests/library_integration/adk/test_adk_async.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/tests/library_integration/adk/test_adk_async.py b/sdks/python/tests/library_integration/adk/test_adk_async.py index 6b0cc3033d..6695c41bed 100644 --- a/sdks/python/tests/library_integration/adk/test_adk_async.py +++ b/sdks/python/tests/library_integration/adk/test_adk_async.py @@ -217,8 +217,6 @@ async def test_adk__sequential_agent_with_subagents__every_subagent_has_its_own_ description="Runs translator to english then summarizer, in order.", before_agent_callback=opik_tracer.before_agent_callback, after_agent_callback=opik_tracer.after_agent_callback, - # before_model_callback=opik_tracer.before_model_callback, - # after_model_callback=opik_tracer.after_model_callback, ) runner = await _async_build_runner(root_agent) @@ -279,7 +277,7 @@ async def test_adk__sequential_agent_with_subagents__every_subagent_has_its_own_ last_updated_at=ANY_BUT_NONE, metadata=ANY_DICT, type="general", - input=None, + input=ANY_DICT, output=ANY_DICT, spans=[ SpanModel( @@ -306,7 +304,7 @@ async def test_adk__sequential_agent_with_subagents__every_subagent_has_its_own_ last_updated_at=ANY_BUT_NONE, metadata=ANY_DICT, type="general", - input=None, + input=ANY_DICT, output=ANY_DICT, spans=[ SpanModel( From 7ea006121e80ed934bd92ae8b92506092f1d1967 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmik Date: Tue, 17 Jun 2025 12:13:06 +0200 Subject: [PATCH 3/7] Move callback injecting logic to the class, use weakset for handling cache of already processed agents --- .../src/opik/integrations/adk/__init__.py | 2 +- .../integrations/adk/callback_injector.py | 104 --------------- .../adk/recursive_callback_injector.py | 120 ++++++++++++++++++ 3 files changed, 121 insertions(+), 105 deletions(-) delete mode 100644 sdks/python/src/opik/integrations/adk/callback_injector.py create mode 100644 sdks/python/src/opik/integrations/adk/recursive_callback_injector.py diff --git a/sdks/python/src/opik/integrations/adk/__init__.py b/sdks/python/src/opik/integrations/adk/__init__.py index e167530a39..9e0e26e1a2 100644 --- a/sdks/python/src/opik/integrations/adk/__init__.py +++ b/sdks/python/src/opik/integrations/adk/__init__.py @@ -1,4 +1,4 @@ from .opik_tracer import OpikTracer -from .callback_injector import track_adk_agent_recursive +from .recursive_callback_injector import track_adk_agent_recursive __all__ = ["OpikTracer", "track_adk_agent_recursive"] diff --git a/sdks/python/src/opik/integrations/adk/callback_injector.py b/sdks/python/src/opik/integrations/adk/callback_injector.py deleted file mode 100644 index 81f2a9bcfc..0000000000 --- a/sdks/python/src/opik/integrations/adk/callback_injector.py +++ /dev/null @@ -1,104 +0,0 @@ -from typing import TypeVar -from . import opik_tracer -import logging -from typing import Set - -from google.adk.tools import agent_tool -from google.adk import agents - -LOGGER = logging.getLogger(__name__) - -ADKAgent = TypeVar("ADKAgent", bound=agents.BaseAgent) - - -def track_adk_agent_recursive( - root_agent: ADKAgent, tracer: opik_tracer.OpikTracer -) -> ADKAgent: - """ - Recursively adds opik tracer callbacks to the agent, its subagents, and agent tools. - - Args: - root_agent: The root ADK agent to track - tracer: The OpikTracer instance to use for tracking - - Returns: - The modified root agent with tracking enabled - """ - LOGGER.info( - "track_adk_agent_recursive is experimental feature. Please let us know if something is not working as expected: https://github.com/comet-ml/opik/issues" - ) - processed_agent_instance_ids: Set[int] = set() - - _process_agent(root_agent, tracer, processed_agent_instance_ids) - - return root_agent - - -def _process_agent( - agent: ADKAgent, - tracer: opik_tracer.OpikTracer, - processed_agent_instance_ids: Set[int], -) -> None: - if id(agent) in processed_agent_instance_ids: - return - - _add_callbacks_to_agent(agent, tracer) - _process_sub_agents(agent, tracer, processed_agent_instance_ids) - _process_tools(agent, tracer, processed_agent_instance_ids) - - processed_agent_instance_ids.add(id(agent)) - - -def _add_callbacks_to_agent(agent: ADKAgent, tracer: opik_tracer.OpikTracer) -> None: - callback_fields = { - "before_agent_callback": tracer.before_agent_callback, - "after_agent_callback": tracer.after_agent_callback, - "before_model_callback": tracer.before_model_callback, - "after_model_callback": tracer.after_model_callback, - "before_tool_callback": tracer.before_tool_callback, - "after_tool_callback": tracer.after_tool_callback, - } - - for callback_field_name, callback_func in callback_fields.items(): - if hasattr(agent, callback_field_name): - current_callback_value = getattr(agent, callback_field_name) - if current_callback_value is None: - setattr(agent, callback_field_name, callback_func) - elif isinstance(current_callback_value, list): - current_callback_value.append(callback_func) - else: - setattr( - agent, callback_field_name, [current_callback_value, callback_func] - ) - - -def _process_sub_agents( - agent: ADKAgent, - tracer: opik_tracer.OpikTracer, - processed_agent_instance_ids: Set[int], -) -> None: - if not hasattr(agent, "sub_agents"): - return - - for sub_agent in agent.sub_agents: - try: - _process_agent(sub_agent, tracer, processed_agent_instance_ids) - except Exception as e: - LOGGER.warning(f"Failed to track subagent: {e}") - - -def _process_tools( - agent: ADKAgent, - tracer: opik_tracer.OpikTracer, - processed_agent_instance_ids: Set[int], -) -> None: - if not hasattr(agent, "tools"): - return - - for tool in agent.tools: - if not isinstance(tool, agent_tool.AgentTool): - continue - try: - _process_agent(tool.agent, tracer, processed_agent_instance_ids) - except Exception as e: - LOGGER.warning(f"Failed to track agent tool: {e}") diff --git a/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py b/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py new file mode 100644 index 0000000000..0f844ba8c7 --- /dev/null +++ b/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py @@ -0,0 +1,120 @@ +from typing import TypeVar +from . import opik_tracer +import logging +import weakref +from opik import _logging + +from google.adk.tools import agent_tool +from google.adk import agents + +LOGGER = logging.getLogger(__name__) + +ADKAgent = TypeVar("ADKAgent", bound=agents.BaseAgent) + + +class RecursiveCallbackInjector: + def __init__(self) -> None: + self._tracked_agents: weakref.WeakSet[agents.BaseAgent] = weakref.WeakSet() + + def inject( + self, + root_agent: agents.BaseAgent, + tracer: opik_tracer.OpikTracer, + ) -> None: + self._process_agent(root_agent, tracer) + + def _add_callbacks_to_agent( + self, agent: agents.BaseAgent, tracer: opik_tracer.OpikTracer + ) -> None: + callback_fields = { + "before_agent_callback": tracer.before_agent_callback, + "after_agent_callback": tracer.after_agent_callback, + "before_model_callback": tracer.before_model_callback, + "after_model_callback": tracer.after_model_callback, + "before_tool_callback": tracer.before_tool_callback, + "after_tool_callback": tracer.after_tool_callback, + } + + for callback_field_name, callback_func in callback_fields.items(): + if not hasattr(agent, callback_field_name): + continue + + current_callback_value = getattr(agent, callback_field_name) + if current_callback_value is None: + setattr(agent, callback_field_name, callback_func) + elif isinstance(current_callback_value, list): + current_callback_value.append(callback_func) + else: + setattr( + agent, callback_field_name, [current_callback_value, callback_func] + ) + + def _process_agent( + self, + agent: agents.BaseAgent, + tracer: opik_tracer.OpikTracer, + ) -> None: + if agent in self._tracked_agents: + return + + self._add_callbacks_to_agent(agent, tracer) + self._process_sub_agents(agent, tracer) + self._process_tools(agent, tracer) + + self._tracked_agents.add(agent) + + def _process_sub_agents( + self, + agent: agents.BaseAgent, + tracer: opik_tracer.OpikTracer, + ) -> None: + if not hasattr(agent, "sub_agents"): + return + + for sub_agent in agent.sub_agents: + try: + self._process_agent(sub_agent, tracer) + except Exception as e: + LOGGER.warning(f"Failed to track subagent: {e}") + + def _process_tools( + self, + agent: agents.BaseAgent, + tracer: opik_tracer.OpikTracer, + ) -> None: + if not hasattr(agent, "tools"): + return + + for tool in agent.tools: + if not isinstance(tool, agent_tool.AgentTool): + continue + try: + self._process_agent(tool.agent, tracer) + except Exception as e: + LOGGER.warning(f"Failed to track agent tool: {e}") + + +_recursive_callback_injector = RecursiveCallbackInjector() + + +def track_adk_agent_recursive( + root_agent: ADKAgent, tracer: opik_tracer.OpikTracer +) -> ADKAgent: + """ + Recursively adds opik tracer callbacks to the agent, its subagents, and agent tools. + + Args: + root_agent: The root ADK agent to track + tracer: The OpikTracer instance to use for tracking + + Returns: + The modified root agent with tracking enabled + """ + _logging.log_once_at_level( + logging.INFO, + "`track_adk_agent_recursive` is experimental feature. Please let us know if something is not working as expected: https://github.com/comet-ml/opik/issues", + logger=LOGGER, + ) + _recursive_callback_injector.inject(root_agent, tracer) + + return root_agent From 1f7709150ffbfa696ec21225e9ec5ffabc5e82e7 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmik Date: Tue, 17 Jun 2025 13:28:51 +0200 Subject: [PATCH 4/7] Replace weakref logic with set of ids and checks for already injected opik callbacks --- .../src/opik/integrations/adk/opik_tracer.py | 1 + .../adk/recursive_callback_injector.py | 67 +++++++++++-------- 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/sdks/python/src/opik/integrations/adk/opik_tracer.py b/sdks/python/src/opik/integrations/adk/opik_tracer.py index 107c9197b1..bf00a53722 100644 --- a/sdks/python/src/opik/integrations/adk/opik_tracer.py +++ b/sdks/python/src/opik/integrations/adk/opik_tracer.py @@ -67,6 +67,7 @@ def __init__( _patch_adk() + def _end_current_trace(self) -> None: trace_data = self._context_storage.pop_trace_data() assert trace_data is not None diff --git a/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py b/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py index 0f844ba8c7..6e71b31e1b 100644 --- a/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py +++ b/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py @@ -1,7 +1,7 @@ -from typing import TypeVar +import types +from typing import TypeVar, List, Callable, Any, Set from . import opik_tracer import logging -import weakref from opik import _logging from google.adk.tools import agent_tool @@ -13,26 +13,24 @@ class RecursiveCallbackInjector: - def __init__(self) -> None: - self._tracked_agents: weakref.WeakSet[agents.BaseAgent] = weakref.WeakSet() + def __init__(self, tracer: opik_tracer.OpikTracer) -> None: + self._opik_tracer = tracer + self._seen_instance_ids: Set[int] = set() def inject( self, root_agent: agents.BaseAgent, - tracer: opik_tracer.OpikTracer, ) -> None: - self._process_agent(root_agent, tracer) + self._process_agent(root_agent) - def _add_callbacks_to_agent( - self, agent: agents.BaseAgent, tracer: opik_tracer.OpikTracer - ) -> None: + def _add_callbacks_to_agent(self, agent: agents.BaseAgent) -> None: callback_fields = { - "before_agent_callback": tracer.before_agent_callback, - "after_agent_callback": tracer.after_agent_callback, - "before_model_callback": tracer.before_model_callback, - "after_model_callback": tracer.after_model_callback, - "before_tool_callback": tracer.before_tool_callback, - "after_tool_callback": tracer.after_tool_callback, + "before_agent_callback": self._opik_tracer.before_agent_callback, + "after_agent_callback": self._opik_tracer.after_agent_callback, + "before_model_callback": self._opik_tracer.before_model_callback, + "after_model_callback": self._opik_tracer.after_model_callback, + "before_tool_callback": self._opik_tracer.before_tool_callback, + "after_tool_callback": self._opik_tracer.after_tool_callback, } for callback_field_name, callback_func in callback_fields.items(): @@ -42,9 +40,11 @@ def _add_callbacks_to_agent( current_callback_value = getattr(agent, callback_field_name) if current_callback_value is None: setattr(agent, callback_field_name, callback_func) - elif isinstance(current_callback_value, list): + elif isinstance( + current_callback_value, list + ) and not _contains_opik_tracer_callback(callbacks=current_callback_value): current_callback_value.append(callback_func) - else: + elif not _is_opik_callback_function(current_callback_value): setattr( agent, callback_field_name, [current_callback_value, callback_func] ) @@ -52,35 +52,32 @@ def _add_callbacks_to_agent( def _process_agent( self, agent: agents.BaseAgent, - tracer: opik_tracer.OpikTracer, ) -> None: - if agent in self._tracked_agents: + if id(agent) in self._seen_instance_ids: return - self._add_callbacks_to_agent(agent, tracer) - self._process_sub_agents(agent, tracer) - self._process_tools(agent, tracer) + self._add_callbacks_to_agent(agent) + self._process_sub_agents(agent) + self._process_tools(agent) - self._tracked_agents.add(agent) + self._seen_instance_ids.add(id(agent)) def _process_sub_agents( self, agent: agents.BaseAgent, - tracer: opik_tracer.OpikTracer, ) -> None: if not hasattr(agent, "sub_agents"): return for sub_agent in agent.sub_agents: try: - self._process_agent(sub_agent, tracer) + self._process_agent(sub_agent) except Exception as e: LOGGER.warning(f"Failed to track subagent: {e}") def _process_tools( self, agent: agents.BaseAgent, - tracer: opik_tracer.OpikTracer, ) -> None: if not hasattr(agent, "tools"): return @@ -89,12 +86,23 @@ def _process_tools( if not isinstance(tool, agent_tool.AgentTool): continue try: - self._process_agent(tool.agent, tracer) + self._process_agent(tool.agent) except Exception as e: LOGGER.warning(f"Failed to track agent tool: {e}") -_recursive_callback_injector = RecursiveCallbackInjector() +def _is_opik_callback_function(obj: Any) -> bool: + if not callable(obj): + return False + + if isinstance(obj, types.MethodType): + return isinstance(obj.__self__, opik_tracer.OpikTracer) + + return False + + +def _contains_opik_tracer_callback(callbacks: List) -> bool: + return any(_is_opik_callback_function(callback) for callback in callbacks) def track_adk_agent_recursive( @@ -115,6 +123,7 @@ def track_adk_agent_recursive( "`track_adk_agent_recursive` is experimental feature. Please let us know if something is not working as expected: https://github.com/comet-ml/opik/issues", logger=LOGGER, ) - _recursive_callback_injector.inject(root_agent, tracer) + recursive_callback_injector = RecursiveCallbackInjector(tracer) + recursive_callback_injector.inject(root_agent) return root_agent From aa828a432555c71be29ad076a40b002d6aad4b19 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmik Date: Tue, 17 Jun 2025 13:29:08 +0200 Subject: [PATCH 5/7] Fix lint errors --- sdks/python/src/opik/integrations/adk/opik_tracer.py | 1 - .../src/opik/integrations/adk/recursive_callback_injector.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/src/opik/integrations/adk/opik_tracer.py b/sdks/python/src/opik/integrations/adk/opik_tracer.py index bf00a53722..107c9197b1 100644 --- a/sdks/python/src/opik/integrations/adk/opik_tracer.py +++ b/sdks/python/src/opik/integrations/adk/opik_tracer.py @@ -67,7 +67,6 @@ def __init__( _patch_adk() - def _end_current_trace(self) -> None: trace_data = self._context_storage.pop_trace_data() assert trace_data is not None diff --git a/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py b/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py index 6e71b31e1b..9315793f4e 100644 --- a/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py +++ b/sdks/python/src/opik/integrations/adk/recursive_callback_injector.py @@ -1,5 +1,5 @@ import types -from typing import TypeVar, List, Callable, Any, Set +from typing import TypeVar, List, Any, Set from . import opik_tracer import logging from opik import _logging From acfbf61f818f711fed86f38430d94ab46b9aae00 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmik Date: Tue, 17 Jun 2025 13:51:22 +0200 Subject: [PATCH 6/7] Add new test for idempotency --- .../library_integration/adk/test_adk_sync.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/sdks/python/tests/library_integration/adk/test_adk_sync.py b/sdks/python/tests/library_integration/adk/test_adk_sync.py index 5627fda727..da8eddeb90 100644 --- a/sdks/python/tests/library_integration/adk/test_adk_sync.py +++ b/sdks/python/tests/library_integration/adk/test_adk_sync.py @@ -1071,3 +1071,52 @@ def test_adk__track_adk_agent_recursive__agent_tool_is_used__agent_tool_is_track trace_tree.spans[1].spans[0].spans[0].usage, EXPECTED_USAGE_KEYS_GOOGLE ) assert_dict_has_keys(trace_tree.spans[2].usage, EXPECTED_USAGE_KEYS_GOOGLE) + + +def test_adk__track_adk_agent_recursive__idempotent_calls_make_no_duplicated_callbacks(): + opik_tracer = OpikTracer() + + translator_to_english = adk_agents.Agent( + name="Translator", + model=MODEL_NAME, + description="Translates text to English.", + ) + + root_agent = adk_agents.Agent( + name="TextProcessingAssistant", + model=MODEL_NAME, + tools=[adk_agent_tool.AgentTool(agent=translator_to_english)], + description="Agent responsible for translating text to english by invoking a special tool for that.", + ) + + track_adk_agent_recursive(root_agent, opik_tracer) + + first_translator_after_agent_callback = translator_to_english.after_agent_callback + first_translator_before_agent_callback = translator_to_english.before_agent_callback + first_translator_after_tool_callback = translator_to_english.after_tool_callback + first_translator_before_tool_callback = translator_to_english.before_tool_callback + first_translator_after_model_callback = translator_to_english.after_model_callback + first_translator_before_model_callback = translator_to_english.before_model_callback + + first_root_after_agent_callback = root_agent.after_agent_callback + first_root_before_agent_callback = root_agent.before_agent_callback + first_root_after_tool_callback = root_agent.after_tool_callback + first_root_before_tool_callback = root_agent.before_tool_callback + first_root_after_model_callback = root_agent.after_model_callback + first_root_before_model_callback = root_agent.before_model_callback + + track_adk_agent_recursive(root_agent, opik_tracer) + + assert translator_to_english.after_agent_callback is first_translator_after_agent_callback + assert translator_to_english.before_agent_callback is first_translator_before_agent_callback + assert translator_to_english.after_tool_callback is first_translator_after_tool_callback + assert translator_to_english.before_tool_callback is first_translator_before_tool_callback + assert translator_to_english.after_model_callback is first_translator_after_model_callback + assert translator_to_english.before_model_callback is first_translator_before_model_callback + + assert root_agent.after_agent_callback is first_root_after_agent_callback + assert root_agent.before_agent_callback is first_root_before_agent_callback + assert root_agent.after_tool_callback is first_root_after_tool_callback + assert root_agent.before_tool_callback is first_root_before_tool_callback + assert root_agent.after_model_callback is first_root_after_model_callback + assert root_agent.before_model_callback is first_root_before_model_callback From 8963a6f81a44e2a72908bb7c61aafb27e7c31a25 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmik Date: Tue, 17 Jun 2025 13:51:40 +0200 Subject: [PATCH 7/7] Fix lint errors --- .../library_integration/adk/test_adk_sync.py | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/sdks/python/tests/library_integration/adk/test_adk_sync.py b/sdks/python/tests/library_integration/adk/test_adk_sync.py index da8eddeb90..5b1897c3cc 100644 --- a/sdks/python/tests/library_integration/adk/test_adk_sync.py +++ b/sdks/python/tests/library_integration/adk/test_adk_sync.py @@ -1104,16 +1104,34 @@ def test_adk__track_adk_agent_recursive__idempotent_calls_make_no_duplicated_cal first_root_before_tool_callback = root_agent.before_tool_callback first_root_after_model_callback = root_agent.after_model_callback first_root_before_model_callback = root_agent.before_model_callback - + track_adk_agent_recursive(root_agent, opik_tracer) - assert translator_to_english.after_agent_callback is first_translator_after_agent_callback - assert translator_to_english.before_agent_callback is first_translator_before_agent_callback - assert translator_to_english.after_tool_callback is first_translator_after_tool_callback - assert translator_to_english.before_tool_callback is first_translator_before_tool_callback - assert translator_to_english.after_model_callback is first_translator_after_model_callback - assert translator_to_english.before_model_callback is first_translator_before_model_callback - + assert ( + translator_to_english.after_agent_callback + is first_translator_after_agent_callback + ) + assert ( + translator_to_english.before_agent_callback + is first_translator_before_agent_callback + ) + assert ( + translator_to_english.after_tool_callback + is first_translator_after_tool_callback + ) + assert ( + translator_to_english.before_tool_callback + is first_translator_before_tool_callback + ) + assert ( + translator_to_english.after_model_callback + is first_translator_after_model_callback + ) + assert ( + translator_to_english.before_model_callback + is first_translator_before_model_callback + ) + assert root_agent.after_agent_callback is first_root_after_agent_callback assert root_agent.before_agent_callback is first_root_before_agent_callback assert root_agent.after_tool_callback is first_root_after_tool_callback