diff --git a/src/google/adk/runners.py b/src/google/adk/runners.py index 990e8c567e..7461c0bed9 100644 --- a/src/google/adk/runners.py +++ b/src/google/adk/runners.py @@ -611,10 +611,6 @@ async def _consume_event_queue( if event_or_done is done_sentinel: break event: Event = event_or_done - if not event.partial: - if event.node_info.message_as_output and event.content is not None: - event = event.model_copy() - event.output = None _apply_run_config_custom_metadata(event, ic.run_config) modified_event = await ic.plugin_manager.run_on_event_callback( diff --git a/tests/unittests/workflow/test_workflow_dynamic_nodes.py b/tests/unittests/workflow/test_workflow_dynamic_nodes.py index 129c2b7812..5091d1e34b 100644 --- a/tests/unittests/workflow/test_workflow_dynamic_nodes.py +++ b/tests/unittests/workflow/test_workflow_dynamic_nodes.py @@ -1153,8 +1153,9 @@ async def parent(*, ctx, node_input): assert len(agent_events) > 0 agent_event = agent_events[-1] - # Verify that runners.py cleared the output - assert agent_event.output is None + # The persisted event keeps the LlmAgent's output so resume can replay + # it without re-firing the model + assert agent_event.output == 'LLM output content' # When the workflow is resumed by resolving the interrupt resume_events = await _resume( diff --git a/tests/unittests/workflow/test_workflow_hitl.py b/tests/unittests/workflow/test_workflow_hitl.py index 5eb38642dd..d4e176fe46 100644 --- a/tests/unittests/workflow/test_workflow_hitl.py +++ b/tests/unittests/workflow/test_workflow_hitl.py @@ -2051,3 +2051,52 @@ def node_b(): ) outputs3 = workflow_testing_utils.get_outputs(events3) assert outputs3 == [f'c_from_b_response 2'] + + +@pytest.mark.asyncio +async def test_cached_llm_agent_output_remains_subscriptable_after_resume( + request: pytest.FixtureRequest, +): + """Cached LlmAgent output must stay a dict on resume.""" + + class Greeting(BaseModel): + text: str + + greeter = LlmAgent( + name='greeter', + model=testing_utils.MockModel.create( + responses=[types.Part.from_text(text='{"text": "hi"}')], + ), + mode='single_turn', + output_schema=Greeting, + instruction='Return JSON.', + ) + + @node(rerun_on_resume=True) + async def orchestrator(ctx: Context, node_input: Any): + out = await ctx.run_node(greeter, 'go') + if 'confirm' not in ctx.resume_inputs: + yield RequestInput(message='ok?', interrupt_id='confirm') + return + yield Event(output={'echoed': out['text']}) + + agent = Workflow(name='wf', edges=[(START, orchestrator)]) + app = App( + name=request.function.__name__, + root_agent=agent, + resumability_config=ResumabilityConfig(is_resumable=True), + ) + runner = testing_utils.InMemoryRunner(app=app) + + events1 = await runner.run_async(testing_utils.get_user_content('start')) + req = workflow_testing_utils.get_request_input_events(events1)[0] + interrupt_id = get_request_input_interrupt_ids(req)[0] + invocation_id = events1[0].invocation_id + + events2 = await runner.run_async( + new_message=testing_utils.UserContent( + create_request_input_response(interrupt_id, {'approved': True}) + ), + invocation_id=invocation_id, + ) + assert {'echoed': 'hi'} in workflow_testing_utils.get_outputs(events2)