diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 3fb83803c4..d8813d2d43 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -213,6 +213,31 @@ def run( ) return self._run_impl(messages, response_id, session, checkpoint_id, checkpoint_storage, **kwargs) + def _filter_messages(self, chat_messages: list[Message]) -> list[Message]: + """Return only the last meaningful non-user message from a list of messages. + + Args: + chat_messages: The conversation history or workflow output to filter. + + Returns: + A single-element list containing the last meaningful non-user message, + or an empty list if none exists. + """ + if not chat_messages: + return [] + + for msg in reversed(chat_messages): + if msg.role != "user" and msg.text and msg.text.strip(): + return [msg] + # fallback: last non-user message + non_user = [m for m in reversed(chat_messages) if m.role != "user"][:1] + if not non_user: + logger.warning( + "_filter_messages: no non-user messages found in list[Message] output. " + "Returning empty list — this likely indicates an unexpected workflow termination state." + ) + return non_user + async def _run_impl( self, messages: AgentRunInputs, @@ -476,8 +501,10 @@ def _convert_workflow_events_to_agent_response( messages.append(data) raw_representations.append(data.raw_representation) elif is_instance_of(data, list[Message]): - chat_messages = cast(list[Message], data) + chat_messages = self._filter_messages(cast(list[Message], data)) messages.extend(chat_messages) + # raw_representations intentionally stores the original unfiltered list — + # it records what the workflow emitted, not what was surfaced to the caller. raw_representations.append(data) else: contents = self._extract_contents(data) @@ -593,7 +620,7 @@ def _convert_workflow_event_to_agent_response_updates( ] if is_instance_of(data, list[Message]): # Convert each Message to an AgentResponseUpdate - chat_messages = cast(list[Message], data) + chat_messages = self._filter_messages(cast(list[Message], data)) updates = [] for msg in chat_messages: updates.append( diff --git a/python/packages/core/tests/workflow/test_workflow_agent.py b/python/packages/core/tests/workflow/test_workflow_agent.py index b2fbded39b..abe4b26d08 100644 --- a/python/packages/core/tests/workflow/test_workflow_agent.py +++ b/python/packages/core/tests/workflow/test_workflow_agent.py @@ -469,15 +469,20 @@ async def raw_yielding_executor( assert updates[2].raw_representation.value == 42 async def test_workflow_as_agent_yield_output_with_list_of_chat_messages(self) -> None: - """Test that yield_output with list[Message] extracts contents from all messages. + """Test that yield_output with list[Message] surfaces only the last assistant message. - Note: Content items are coalesced by _finalize_response, so multiple text contents - become a single merged Content in the final response. + When a workflow executor yields a list[Message] (as GroupChat orchestrators + do with self._full_conversation on termination), _filter_messages returns + only the last meaningful assistant message to avoid re-emitting user input + and replaying the full conversation history across turns. See #4261. + + Users who need intermediate agent responses can opt in via + intermediate_outputs=True in GroupChatBuilder. """ @executor async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[Never, list[Message]]) -> None: - # Yield a list of Messages (as SequentialBuilder does) + # Yield a list of Messages (as GroupChat orchestrator does with _full_conversation) msg_list = [ Message(role="user", text="first message"), Message(role="assistant", text="second message"), @@ -491,25 +496,24 @@ async def list_yielding_executor(messages: list[Message], ctx: WorkflowContext[N workflow = WorkflowBuilder(start_executor=list_yielding_executor).build() agent = workflow.as_agent("list-msg-agent") - # Verify streaming returns the update with all 4 contents before coalescing + # Streaming: _filter_messages returns only the last meaningful assistant message updates: list[AgentResponseUpdate] = [] async for update in agent.run("test", stream=True): updates.append(update) - assert len(updates) == 3 + # Only the last assistant message should be surfaced (user messages filtered, + # earlier assistant messages treated as conversation history replay) + assert len(updates) == 1 full_response = AgentResponse.from_updates(updates) - assert len(full_response.messages) == 3 - texts = [message.text for message in full_response.messages] - # Note: `from_agent_run_response_updates` coalesces multiple text contents into one content - assert texts == ["first message", "second message", "thirdfourth"] + assert len(full_response.messages) == 1 + assert full_response.messages[0].text == "thirdfourth" - # Verify run() + # Non-streaming: same filtering applies result = await agent.run("test") assert isinstance(result, AgentResponse) - assert len(result.messages) == 3 - texts = [message.text for message in result.messages] - assert texts == ["first message", "second message", "third fourth"] + assert len(result.messages) == 1 + assert result.messages[0].text == "third fourth" async def test_session_conversation_history_included_in_workflow_run(self) -> None: """Test that messages provided to agent.run() are passed through to the workflow.""" @@ -1296,3 +1300,213 @@ def test_merge_updates_function_result_no_matching_call(self): # Order: text (user), text (assistant), function_result (orphan at end) assert content_types == ["text", "text", "function_result"] + + +class TestWorkflowAgentUserInputFilteringRegression: + """Regression tests for #4261: user input must not compound across successive turns. + + When a GroupChat orchestrator terminates, it yields self._full_conversation via + ctx.yield_output(self._full_conversation). This is a list[Message] containing the + entire conversation history — both user inputs and all prior assistant responses. + + Without filtering, WorkflowAgent's _convert_workflow_event_to_agent_response_updates + (streaming) and _convert_workflow_events_to_agent_response (non-streaming) forward + all messages verbatim, causing user inputs and earlier assistant responses to + accumulate in the output on every successive turn. + + _filter_messages fixes this by returning only the last meaningful assistant message + from list[Message] output, aligning with GroupChatBuilder's default behavior of + intermediate_outputs=False where only the orchestrator's final summary is surfaced. + Users who need intermediate agent responses can opt in via intermediate_outputs=True. + + These tests use a class-based Executor (rather than the @executor decorator) to + ensure generic type annotations on WorkflowContext[Never, list[Message]] resolve + correctly at runtime, so is_instance_of(data, list[Message]) hits the right branch. + """ + + async def test_streaming_compounding_not_observed_across_turns(self): + """Regression: turn 1's user input must not appear in turn 2's streamed response.""" + + class GroupChatLikeExecutor(Executor): + """Simulates a GroupChat orchestrator's termination behavior. + + On termination, BaseGroupChatOrchestrator yields self._full_conversation + via ctx.yield_output(self._full_conversation), which is a list[Message] + containing the entire conversation history (both user and assistant messages). + This executor replicates that exact pattern to exercise the list[Message] + branch in _filter_messages and verify that user inputs do not compound + across successive turns (see #4261). + """ + + @handler + async def handle_messages( + self, + messages: list[Message], + ctx: WorkflowContext[Never, list[Message]], + ) -> None: + input_text = messages[-1].text or "" + full_conversation: list[Message] = [ + Message(role="user", text=input_text), + Message( + role="assistant", + contents=[Content.from_text(text=f"Answer to: {input_text}")], + author_name="Principal", + ), + ] + await ctx.yield_output(full_conversation) + + groupchat_executor = GroupChatLikeExecutor(id="groupchat") + workflow = WorkflowBuilder(start_executor=groupchat_executor).build() + agent = workflow.as_agent("groupchat-agent") + session = AgentSession() + + # Turn 1 + updates1: list[AgentResponseUpdate] = [] + async for chunk in agent.run("first_question", stream=True, session=session): + updates1.append(chunk) + + # Turn 2: "first_question" must NOT bleed into turn 2's streamed output + updates2: list[AgentResponseUpdate] = [] + async for chunk in agent.run("second_question", stream=True, session=session): + updates2.append(chunk) + + text2 = " ".join(u.text or "" for u in updates2 if u.text) + assert "first_question" not in text2, ( + "Turn 1 user input should not appear in turn 2 streaming output (compounding regression)" + ) + assert "Answer to: second_question" in text2 + + async def test_nonstreaming_compounding_not_observed_across_turns(self): + """Regression: turn 1's user input must not appear in turn 2's response.""" + + class GroupChatLikeExecutor(Executor): + """Simulates a GroupChat orchestrator's termination behavior. + + On termination, BaseGroupChatOrchestrator yields self._full_conversation + via ctx.yield_output(self._full_conversation), which is a list[Message] + containing the entire conversation history (both user and assistant messages). + This executor replicates that exact pattern to exercise the list[Message] + branch in _filter_messages and verify that user inputs do not compound + across successive turns (see #4261). + """ + + @handler + async def handle_messages( + self, + messages: list[Message], + ctx: WorkflowContext[Never, list[Message]], + ) -> None: + input_text = messages[-1].text or "" + full_conversation: list[Message] = [ + Message(role="user", text=input_text), + Message( + role="assistant", + contents=[Content.from_text(text=f"Answer to: {input_text}")], + author_name="Principal", + ), + ] + await ctx.yield_output(full_conversation) + + groupchat_executor = GroupChatLikeExecutor(id="groupchat") + workflow = WorkflowBuilder(start_executor=groupchat_executor).build() + agent = workflow.as_agent("groupchat-agent") + session = AgentSession() + + # Turn 1 + await agent.run("first_question", session=session) + + # Turn 2: "first_question" must NOT bleed into turn 2's response + result2 = await agent.run("second_question", session=session) + text2 = " ".join(m.text or "" for m in result2.messages) + assert "first_question" not in text2, ( + "Turn 1 user input should not appear in turn 2 response (compounding regression)" + ) + assert "Answer to: second_question" in text2 + + +class TestFilterMessages: + """Direct unit tests for WorkflowAgent._filter_messages edge cases. + + Covers empty input, all-user messages, assistant messages with no/whitespace text, + mixed roles, and ordering. The all-user and empty cases both hit the + `if not non_user: return chat_messages` fallback path, returning the original + list unchanged rather than silently dropping output (see moonbox3's review on #4268). + """ + + def _make_agent(self) -> WorkflowAgent: + @executor + async def _e(messages: list[Message], ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output("x") + + workflow = WorkflowBuilder(start_executor=_e).build() + return WorkflowAgent(workflow=workflow) + + def test_empty_list_returns_empty(self): + agent = self._make_agent() + assert agent._filter_messages([]) == [] + + def test_single_assistant_message_empty_text(self): + """Return the single assistant message as-is when it's the only message, even if it has no text""" + agent = self._make_agent() + msg = Message(role="assistant", text="") + assert agent._filter_messages([msg]) == [msg] + + def test_single_assistant_message_whitespace_text(self): + """Return the single assistant message as-is when it's the only message, even if it has only whitespace.""" + agent = self._make_agent() + msg = Message(role="assistant", text=" ") + assert agent._filter_messages([msg]) == [msg] + + def test_single_assistant_message_none_text(self): + """Return the single assistant message as-is when it's the only message, even if it has None text.""" + agent = self._make_agent() + msg = Message(role="assistant", text=None) + assert agent._filter_messages([msg]) == [msg] + + def test_single_assistant_message_returned(self): + """Return the single assistant message as-is when it's the only message""" + agent = self._make_agent() + msg = Message(role="assistant", text="Hello") + assert agent._filter_messages([msg]) == [msg] + + def test_all_user_messages_returns_empty_list(self): + """All-user input: no assistant content exists to surface, returns empty list.""" + + agent = self._make_agent() + msgs = [Message(role="user", text="hi"), Message(role="user", text="hello")] + result = agent._filter_messages(msgs) + assert result == [] + + def test_mixed_roles_returns_last_assistant(self): + agent = self._make_agent() + msgs = [ + Message(role="user", text="q1"), + Message(role="assistant", text="a1"), + Message(role="user", text="q2"), + Message(role="assistant", text="a2"), # should be returned + ] + result = agent._filter_messages(msgs) + assert len(result) == 1 + assert result[0].text == "a2" + + def test_assistant_with_none_text_falls_through_to_next(self): + agent = self._make_agent() + msgs = [ + Message(role="assistant", text="a1"), + Message(role="assistant", text=None), + Message(role="assistant", text=" "), + ] + # The last non-user message is whitespace-only, falls to non-text fallback + result = agent._filter_messages(msgs) + assert len(result) == 1 # fallback picks last non-user message + + def test_returns_last_not_first_assistant(self): + agent = self._make_agent() + msgs = [ + Message(role="assistant", text="First response"), + Message(role="user", text="follow up"), + Message(role="assistant", text="Second response"), + ] + result = agent._filter_messages(msgs) + assert len(result) == 1 + assert result[0].text == "Second response"