From 7acd5e8119fa8e8bc0c13ba7b82fe0ed286ef9a8 Mon Sep 17 00:00:00 2001 From: Faridun Mirzoev Date: Tue, 31 Mar 2026 13:41:06 -0700 Subject: [PATCH] feat: add AG2 multi-agent framework as native agent client AG2 (formerly AutoGen) is an open-source multi-agent framework with 500K+ monthly PyPI downloads. This adds two AgentClient implementations: - AG2Agent: single AssistantAgent + UserProxyAgent pair - AG2GroupChatAgent: multi-agent GroupChat with Planner + Executor AG2 requires Python >=3.10 and pydantic >=2.6.1, which conflicts with AgentBench's core deps (Python 3.9, pydantic v1). A separate requirements-ag2.txt is provided for an isolated venv setup. Includes 23 tests (unit, integration, e2e with fake task server). --- configs/agents/ag2-gpt4.yaml | 10 + configs/agents/ag2-groupchat.yaml | 9 + configs/agents/ag2-single.yaml | 9 + configs/agents/ag2_agents.yaml | 8 + requirements-ag2.txt | 33 ++ src/client/agents/__init__.py | 5 + src/client/agents/ag2_agent.py | 278 ++++++++++++++ src/client/agents/test_ag2_agent.py | 565 ++++++++++++++++++++++++++++ 8 files changed, 917 insertions(+) create mode 100644 configs/agents/ag2-gpt4.yaml create mode 100644 configs/agents/ag2-groupchat.yaml create mode 100644 configs/agents/ag2-single.yaml create mode 100644 configs/agents/ag2_agents.yaml create mode 100644 requirements-ag2.txt create mode 100644 src/client/agents/ag2_agent.py create mode 100644 src/client/agents/test_ag2_agent.py diff --git a/configs/agents/ag2-gpt4.yaml b/configs/agents/ag2-gpt4.yaml new file mode 100644 index 00000000..5c181bec --- /dev/null +++ b/configs/agents/ag2-gpt4.yaml @@ -0,0 +1,10 @@ +module: src.client.agents.AG2Agent + +parameters: + model: "gpt-4" + api_key: <% PUT-YOUR-OPENAI-KEY-HERE %> + api_type: "openai" + system_message: "You are a helpful AI assistant that solves tasks step by step. Be precise and concise in your responses." + max_tokens: 1024 + temperature: 0 + max_turns: 1 diff --git a/configs/agents/ag2-groupchat.yaml b/configs/agents/ag2-groupchat.yaml new file mode 100644 index 00000000..979e3eeb --- /dev/null +++ b/configs/agents/ag2-groupchat.yaml @@ -0,0 +1,9 @@ +module: src.client.agents.AG2GroupChatAgent + +parameters: + model: "gpt-4o-mini" + api_key: <% PUT-YOUR-OPENAI-KEY-HERE %> + api_type: "openai" + max_tokens: 512 + temperature: 0 + max_rounds: 4 diff --git a/configs/agents/ag2-single.yaml b/configs/agents/ag2-single.yaml new file mode 100644 index 00000000..4a2bee60 --- /dev/null +++ b/configs/agents/ag2-single.yaml @@ -0,0 +1,9 @@ +module: src.client.agents.AG2Agent + +parameters: + model: "gpt-4o-mini" + api_type: "openai" + system_message: "You are a helpful AI assistant that solves tasks step by step. Be precise and concise in your responses." + max_tokens: 512 + temperature: 0 + max_turns: 1 diff --git a/configs/agents/ag2_agents.yaml b/configs/agents/ag2_agents.yaml new file mode 100644 index 00000000..6aa6a9b8 --- /dev/null +++ b/configs/agents/ag2_agents.yaml @@ -0,0 +1,8 @@ +ag2-single: + import: "./ag2-single.yaml" + +ag2-groupchat: + import: "./ag2-groupchat.yaml" + +ag2-gpt4: + import: "./ag2-gpt4.yaml" diff --git a/requirements-ag2.txt b/requirements-ag2.txt new file mode 100644 index 00000000..30a521b7 --- /dev/null +++ b/requirements-ag2.txt @@ -0,0 +1,33 @@ +# AG2 multi-agent framework integration for AgentBench. +# +# AG2 (v0.11.4+) requires Python >=3.10 and pydantic >=2.6.1, which +# conflicts with AgentBench's core dependencies (Python 3.9, pydantic ~=1.10). +# Install these into a SEPARATE virtual environment from the main +# AgentBench requirements. +# +# Setup: +# python3.10 -m venv .venv-ag2 +# source .venv-ag2/bin/activate +# pip install -r requirements-ag2.txt +# +# Then run: +# python -m src.client.agent_test --config configs/agents/ag2-single.yaml + +# --- AgentBench core (versions relaxed for Python 3.10+ / pydantic v2) --- +pydantic>=2.6.1,<3 +requests>=2.28,<3 +tqdm>=4.65 +pyyaml>=6.0 +jsonlines>=3.1 +aiohttp>=3.8 +uvicorn>=0.22 +fastapi>=0.101 +urllib3>=1.26,<3 + +# --- AgentBench agents that are imported via __init__.py --- +fschat>=0.2.31 +transformers>=4.34 +accelerate>=0.23 + +# --- AG2 --- +ag2[openai]>=0.11.4,<1.0 diff --git a/src/client/agents/__init__.py b/src/client/agents/__init__.py index 8d0fe88c..3eaaf0f7 100644 --- a/src/client/agents/__init__.py +++ b/src/client/agents/__init__.py @@ -1,2 +1,7 @@ from .fastchat_client import FastChatAgent from .http_agent import HTTPAgent + +try: + from .ag2_agent import AG2Agent, AG2GroupChatAgent +except ImportError: + pass # AG2 not installed; agents available via InstanceFactory when using requirements-ag2.txt diff --git a/src/client/agents/ag2_agent.py b/src/client/agents/ag2_agent.py new file mode 100644 index 00000000..b14660bb --- /dev/null +++ b/src/client/agents/ag2_agent.py @@ -0,0 +1,278 @@ +"""AG2 multi-agent client for AgentBench evaluation. + +Uses AG2 (formerly AutoGen) framework to handle benchmark tasks +via multi-agent conversation with tool-augmented agents. + +AG2: https://ag2.ai — 500K+ monthly PyPI downloads, 4,300+ GitHub stars. +""" + +import os +import logging +from typing import List + +try: + from autogen import AssistantAgent, UserProxyAgent, LLMConfig +except ImportError: + raise ImportError( + "AG2 is not installed. It requires a separate environment " + "(Python >=3.10, pydantic >=2.6.1). Install with:\n" + " pip install -r requirements-ag2.txt\n" + "See requirements-ag2.txt for details." + ) + +from src.client.agent import AgentClient + +logger = logging.getLogger(__name__) + + +class AG2Agent(AgentClient): + """AgentBench client powered by AG2 multi-agent framework. + + Wraps AG2's AssistantAgent + UserProxyAgent into the AgentClient + interface expected by AgentBench. Each call to `inference()` runs + a single-turn agent conversation and returns the assistant's response. + + Args: + model: Model name (e.g., "gpt-4o-mini", "gpt-4"). + api_key: API key for the model provider. + api_type: Provider type ("openai", "anthropic", "google", etc.). + system_message: System prompt for the assistant agent. + max_tokens: Maximum tokens for model responses. + temperature: Sampling temperature. + max_turns: Maximum conversation turns per inference call. + """ + + def __init__( + self, + model: str = "gpt-4o-mini", + api_key: str = None, + api_type: str = "openai", + system_message: str = "You are a helpful AI assistant that solves tasks step by step.", + max_tokens: int = 512, + temperature: float = 0, + max_turns: int = 1, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.model = model + self.api_key = api_key or os.environ.get("OPENAI_API_KEY", "") + self.api_type = api_type + self.system_message = system_message + self.max_tokens = max_tokens + self.temperature = temperature + self.max_turns = max_turns + + # Build LLM config — AG2 0.11.4 accepts config dicts as positional args + config_entry = { + "model": self.model, + "api_key": self.api_key, + "api_type": self.api_type, + } + + self._llm_config = LLMConfig( + config_entry, + temperature=self.temperature, + max_tokens=self.max_tokens, + ) + + # Create agents + self._assistant = AssistantAgent( + name="bench_assistant", + system_message=self.system_message, + llm_config=self._llm_config, + ) + + self._user_proxy = UserProxyAgent( + name="bench_evaluator", + human_input_mode="NEVER", + max_consecutive_auto_reply=0, + code_execution_config=False, + ) + + def inference(self, history: List[dict]) -> str: + """Run AG2 agent inference on the given conversation history. + + Converts AgentBench history format to a single prompt, + runs the AG2 agent pair, and extracts the response. + + Args: + history: List of message dicts from the benchmark environment. + Each dict typically has "role" and "content" keys. + + Returns: + The assistant's response string. + """ + # Reset agents for clean state + self._assistant.reset() + self._user_proxy.reset() + + # Build prompt from history + # AgentBench passes history as a list of {"role": ..., "content": ...} + # We concatenate into a single prompt for the AG2 conversation + prompt_parts = [] + for msg in history: + role = msg.get("role", "user") + content = msg.get("content", "") + if role == "system": + # System messages are part of context, prepend them + prompt_parts.insert(0, f"[System]: {content}") + else: + prompt_parts.append(content) + + prompt = "\n\n".join(prompt_parts) + + try: + run_response = self._user_proxy.run( + self._assistant, + message=prompt, + ) + # process() drives the conversation but returns None; + # the actual chat history lives on the agent objects. + run_response.process() + + # Extract first assistant reply from the assistant's chat log + chat_messages = self._assistant.chat_messages + if chat_messages: + # chat_messages is a defaultdict keyed by the other agent + history = list(chat_messages.values())[0] + for msg in history: + if msg.get("role") == "assistant": + content = msg.get("content", "").strip() + if content: + return content + + logger.warning("AG2 agent returned empty chat history") + return "" + + except Exception as e: + logger.error(f"AG2 inference error: {e}") + return f"Error: {str(e)}" + + +class AG2GroupChatAgent(AgentClient): + """AgentBench client using AG2 GroupChat with multiple specialized agents. + + Demonstrates AG2's multi-agent capability within the AgentBench framework. + Uses a Planner + Executor pattern for complex reasoning tasks. + + Args: + model: Model name. + api_key: API key. + api_type: Provider type. + max_tokens: Maximum tokens per response. + temperature: Sampling temperature. + max_rounds: Maximum GroupChat rounds. + """ + + def __init__( + self, + model: str = "gpt-4o-mini", + api_key: str = None, + api_type: str = "openai", + max_tokens: int = 512, + temperature: float = 0, + max_rounds: int = 4, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.api_key = api_key or os.environ.get("OPENAI_API_KEY", "") + + config_entry = { + "model": model, + "api_key": self.api_key, + "api_type": api_type, + } + + self._llm_config = LLMConfig( + config_entry, + temperature=temperature, + max_tokens=max_tokens, + ) + self._max_rounds = max_rounds + + from autogen import GroupChat, GroupChatManager + + # Specialized agents + self._planner = AssistantAgent( + name="Planner", + system_message=( + "You are a planning specialist. Analyze the task, break it down " + "into steps, and propose a clear action plan. Be concise." + ), + llm_config=self._llm_config, + ) + + self._executor = AssistantAgent( + name="Executor", + system_message=( + "You are an execution specialist. Follow the plan and produce " + "the final answer. Output only the answer, no explanation." + ), + llm_config=self._llm_config, + ) + + self._user_proxy = UserProxyAgent( + name="Evaluator", + human_input_mode="NEVER", + max_consecutive_auto_reply=0, + code_execution_config=False, + ) + + self._group_chat = GroupChat( + agents=[self._user_proxy, self._planner, self._executor], + messages=[], + max_round=self._max_rounds, + speaker_selection_method="auto", + ) + + self._manager = GroupChatManager( + groupchat=self._group_chat, + llm_config=self._llm_config, + ) + + def inference(self, history: List[dict]) -> str: + """Run GroupChat inference on the given conversation history. + + Args: + history: List of message dicts from the benchmark environment. + + Returns: + The final agent response string. + """ + # Reset all agents + self._planner.reset() + self._executor.reset() + self._user_proxy.reset() + self._group_chat.messages.clear() + + # Build prompt + prompt_parts = [] + for msg in history: + role = msg.get("role", "user") + content = msg.get("content", "") + if role == "system": + prompt_parts.insert(0, f"[System]: {content}") + else: + prompt_parts.append(content) + + prompt = "\n\n".join(prompt_parts) + + try: + run_response = self._user_proxy.run( + self._manager, + message=prompt, + ) + run_response.process() + + # GroupChat history is stored in group_chat.messages + for msg in reversed(self._group_chat.messages): + role = msg.get("role", "") + content = msg.get("content", "").strip() + if role != "user" and content: + return content + + return "" + + except Exception as e: + logger.error(f"AG2 GroupChat inference error: {e}") + return f"Error: {str(e)}" diff --git a/src/client/agents/test_ag2_agent.py b/src/client/agents/test_ag2_agent.py new file mode 100644 index 00000000..bedb5b79 --- /dev/null +++ b/src/client/agents/test_ag2_agent.py @@ -0,0 +1,565 @@ +"""Tests for AG2 agent clients. + +Covers three levels: + 1. Unit tests — class init, prompt construction, error handling. + 2. Integration tests — InstanceFactory config loading, agent registry. + 3. End-to-end tests — full TaskClient.run_sample loop with a mocked + task server, exercising the real AG2Agent.inference() code path. +""" + +import json +import os +import threading +from collections import defaultdict +from http.server import HTTPServer, BaseHTTPRequestHandler +from typing import List +from unittest.mock import patch, MagicMock + +import pytest + +from src.client.agent import AgentClient +from src.client.agents.ag2_agent import AG2Agent, AG2GroupChatAgent + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_history(*messages) -> List[dict]: + """Build a history list from (role, content) pairs.""" + return [{"role": r, "content": c} for r, c in messages] + + +def _setup_ag2_mocks(mock_proxy, mock_asst, chat_messages): + """Configure AG2 mocks so inference() can extract responses. + + AG2 0.11.4 flow: + - user_proxy.run(assistant, message=...) -> RunResponse + - RunResponse.process() -> None (side effect: populates agent.chat_messages) + - Result extracted from assistant.chat_messages + """ + mock_asst_inst = MagicMock() + # chat_messages is defaultdict(list) keyed by other agent + mock_asst_inst.chat_messages = defaultdict(list, {mock_proxy: chat_messages}) + mock_asst.return_value = mock_asst_inst + + mock_proxy_inst = MagicMock() + run_response = MagicMock() + run_response.process.return_value = None + mock_proxy_inst.run.return_value = run_response + mock_proxy.return_value = mock_proxy_inst + + return mock_asst_inst, mock_proxy_inst + + +# Decorator shorthand — mocks AG2 classes so __init__ doesn't hit real AG2 +_patch_ag2 = lambda fn: ( + patch("src.client.agents.ag2_agent.LLMConfig")( + patch("src.client.agents.ag2_agent.AssistantAgent")( + patch("src.client.agents.ag2_agent.UserProxyAgent")(fn))) +) + + +# ============================================================================ +# 1. Unit tests +# ============================================================================ + +class TestAG2AgentUnit: + """Unit tests for AG2Agent init and prompt construction.""" + + @_patch_ag2 + def test_init_default_params(self, mock_proxy, mock_asst, mock_llm): + with patch.dict("os.environ", {"OPENAI_API_KEY": "test-key"}): + agent = AG2Agent() + assert agent.model == "gpt-4o-mini" + assert agent.api_type == "openai" + assert agent.max_tokens == 512 + assert agent.temperature == 0 + assert agent.max_turns == 1 + + @_patch_ag2 + def test_init_custom_params(self, mock_proxy, mock_asst, mock_llm): + agent = AG2Agent( + model="gpt-4", + api_key="custom-key", + api_type="openai", + max_tokens=1024, + temperature=0.5, + max_turns=3, + ) + assert agent.model == "gpt-4" + assert agent.api_key == "custom-key" + assert agent.max_tokens == 1024 + assert agent.temperature == 0.5 + assert agent.max_turns == 3 + + @_patch_ag2 + def test_inherits_agent_client(self, mock_proxy, mock_asst, mock_llm): + agent = AG2Agent(api_key="k") + assert isinstance(agent, AgentClient) + + @_patch_ag2 + def test_inference_returns_assistant_content(self, mock_proxy, mock_asst, mock_llm): + _setup_ag2_mocks(mock_proxy, mock_asst, [ + {"role": "user", "content": "hi", "name": "user"}, + {"role": "assistant", "content": "hello back", "name": "assistant"}, + ]) + agent = AG2Agent(api_key="k") + result = agent.inference(_make_history(("user", "hi"))) + assert result == "hello back" + + @_patch_ag2 + def test_inference_skips_empty_assistant_messages(self, mock_proxy, mock_asst, mock_llm): + """When last assistant message is empty, find previous non-empty one.""" + _setup_ag2_mocks(mock_proxy, mock_asst, [ + {"role": "user", "content": "hi", "name": "user"}, + {"role": "assistant", "content": "real answer", "name": "assistant"}, + {"role": "user", "content": "", "name": "user"}, + {"role": "assistant", "content": "", "name": "assistant"}, + ]) + agent = AG2Agent(api_key="k") + result = agent.inference(_make_history(("user", "hi"))) + assert result == "real answer" + + @_patch_ag2 + def test_inference_empty_history_returns_empty(self, mock_proxy, mock_asst, mock_llm): + _setup_ag2_mocks(mock_proxy, mock_asst, []) + agent = AG2Agent(api_key="k") + result = agent.inference([]) + assert result == "" + + @_patch_ag2 + def test_inference_handles_exception(self, mock_proxy, mock_asst, mock_llm): + mock_proxy_inst = MagicMock() + mock_proxy_inst.run.side_effect = RuntimeError("LLM timeout") + mock_proxy.return_value = mock_proxy_inst + + agent = AG2Agent(api_key="k") + result = agent.inference(_make_history(("user", "hello"))) + assert "Error" in result + assert "LLM timeout" in result + + @_patch_ag2 + def test_inference_resets_agents_each_call(self, mock_proxy, mock_asst, mock_llm): + mock_asst_inst, mock_proxy_inst = _setup_ag2_mocks(mock_proxy, mock_asst, [ + {"role": "assistant", "content": "ok", "name": "assistant"}, + ]) + + agent = AG2Agent(api_key="k") + agent.inference(_make_history(("user", "first"))) + agent.inference(_make_history(("user", "second"))) + + assert mock_asst_inst.reset.call_count == 2 + assert mock_proxy_inst.reset.call_count == 2 + + def test_prompt_construction_system_first(self): + """System messages should be prepended to the prompt.""" + history = _make_history( + ("user", "Turn on the light."), + ("system", "You are in a kitchen."), + ("user", "Look around."), + ) + prompt_parts = [] + for msg in history: + role = msg.get("role", "user") + content = msg.get("content", "") + if role == "system": + prompt_parts.insert(0, f"[System]: {content}") + else: + prompt_parts.append(content) + prompt = "\n\n".join(prompt_parts) + + assert prompt.startswith("[System]: You are in a kitchen.") + assert "Turn on the light." in prompt + assert "Look around." in prompt + + def test_prompt_construction_multi_turn(self): + """Multi-turn history concatenated properly.""" + history = _make_history( + ("user", "What is 2+2?"), + ("agent", "4"), + ("user", "And 3+3?"), + ) + prompt_parts = [] + for msg in history: + role = msg.get("role", "user") + content = msg.get("content", "") + if role == "system": + prompt_parts.insert(0, f"[System]: {content}") + else: + prompt_parts.append(content) + prompt = "\n\n".join(prompt_parts) + + assert "What is 2+2?" in prompt + assert "4" in prompt + assert "And 3+3?" in prompt + + +class TestAG2GroupChatAgentUnit: + """Unit tests for AG2GroupChatAgent.""" + + @patch("autogen.GroupChatManager") + @patch("autogen.GroupChat") + @_patch_ag2 + def test_init_defaults(self, mock_proxy, mock_asst, mock_llm, mock_gc, mock_gcm): + with patch.dict("os.environ", {"OPENAI_API_KEY": "test-key"}): + agent = AG2GroupChatAgent() + assert agent._max_rounds == 4 + assert isinstance(agent, AgentClient) + + @patch("autogen.GroupChatManager") + @patch("autogen.GroupChat") + @_patch_ag2 + def test_init_custom_rounds(self, mock_proxy, mock_asst, mock_llm, mock_gc, mock_gcm): + agent = AG2GroupChatAgent(api_key="k", max_rounds=8) + assert agent._max_rounds == 8 + + @patch("autogen.GroupChatManager") + @patch("autogen.GroupChat") + @_patch_ag2 + def test_inference_returns_last_non_user( + self, mock_proxy, mock_asst, mock_llm, mock_gc, mock_gcm + ): + mock_proxy_inst = MagicMock() + + def mock_run(recipient, message): + # Simulate process() populating group_chat.messages + agent._group_chat.messages = [ + {"role": "user", "content": "task"}, + {"role": "assistant", "content": "plan step 1"}, + {"role": "assistant", "content": "final answer"}, + ] + run_response = MagicMock() + run_response.process.return_value = None + return run_response + + mock_proxy_inst.run.side_effect = mock_run + mock_proxy.return_value = mock_proxy_inst + + agent = AG2GroupChatAgent(api_key="k") + result = agent.inference(_make_history(("user", "solve this"))) + assert result == "final answer" + + +# ============================================================================ +# 2. Integration tests — config loading & agent registry +# ============================================================================ + +class TestAG2Integration: + """Test that AG2 agents load correctly through AgentBench's config system.""" + + @_patch_ag2 + def test_instance_factory_creates_ag2_agent(self, mock_proxy, mock_asst, mock_llm): + from src.typings import InstanceFactory + + factory = InstanceFactory( + module="src.client.agents.AG2Agent", + parameters={ + "model": "gpt-4o-mini", + "api_key": "test-key-123", + "api_type": "openai", + "max_tokens": 256, + }, + ) + agent = factory.create() + assert isinstance(agent, AG2Agent) + assert isinstance(agent, AgentClient) + assert agent.model == "gpt-4o-mini" + assert agent.api_key == "test-key-123" + assert agent.max_tokens == 256 + + @patch("autogen.GroupChatManager") + @patch("autogen.GroupChat") + @_patch_ag2 + def test_instance_factory_creates_groupchat_agent( + self, mock_proxy, mock_asst, mock_llm, mock_gc, mock_gcm + ): + from src.typings import InstanceFactory + + factory = InstanceFactory( + module="src.client.agents.AG2GroupChatAgent", + parameters={ + "model": "gpt-4", + "api_key": "test-key-456", + "max_rounds": 6, + }, + ) + agent = factory.create() + assert isinstance(agent, AG2GroupChatAgent) + assert isinstance(agent, AgentClient) + assert agent._max_rounds == 6 + + def test_config_loader_loads_ag2_single_yaml(self): + from src.configs import ConfigLoader + + loader = ConfigLoader() + config = loader.load_from("configs/agents/ag2-single.yaml") + assert config["module"] == "src.client.agents.AG2Agent" + assert "parameters" in config + assert config["parameters"]["model"] == "gpt-4o-mini" + assert config["parameters"]["api_type"] == "openai" + + def test_config_loader_loads_ag2_groupchat_yaml(self): + from src.configs import ConfigLoader + + loader = ConfigLoader() + config = loader.load_from("configs/agents/ag2-groupchat.yaml") + assert config["module"] == "src.client.agents.AG2GroupChatAgent" + assert config["parameters"]["max_rounds"] == 4 + + def test_config_loader_loads_ag2_gpt4_yaml(self): + from src.configs import ConfigLoader + + loader = ConfigLoader() + config = loader.load_from("configs/agents/ag2-gpt4.yaml") + assert config["parameters"]["model"] == "gpt-4" + assert config["parameters"]["max_tokens"] == 1024 + + def test_ag2_importable_from_agents_package(self): + """Verify the agents __init__.py exports work.""" + from src.client.agents import AG2Agent as ImportedAgent + from src.client.agents import AG2GroupChatAgent as ImportedGroupChat + + assert ImportedAgent is AG2Agent + assert ImportedGroupChat is AG2GroupChatAgent + + @_patch_ag2 + def test_full_yaml_to_agent_pipeline(self, mock_proxy, mock_asst, mock_llm): + """Load YAML -> InstanceFactory -> create() -> working agent.""" + from src.configs import ConfigLoader + from src.typings import InstanceFactory + + config = ConfigLoader().load_from("configs/agents/ag2-single.yaml") + config["parameters"]["api_key"] = "test-pipeline-key" + factory = InstanceFactory(**config) + agent = factory.create() + + assert isinstance(agent, AG2Agent) + assert agent.api_key == "test-pipeline-key" + assert agent.system_message.startswith("You are a helpful") + + +# ============================================================================ +# 3. End-to-end test — full TaskClient.run_sample loop +# ============================================================================ + +class _FakeTaskHandler(BaseHTTPRequestHandler): + """Minimal HTTP handler simulating AgentBench task controller. + + Implements the /api/start_sample and /api/interact endpoints + so that TaskClient.run_sample() can execute a complete loop: + 1. start_sample -> RUNNING with initial history + 2. interact (agent response) -> COMPLETED with result + """ + + # Shared across requests via class variable + interaction_count = 0 + agent_responses = [] + + def do_POST(self): + length = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(length)) if length else {} + + if self.path == "/api/start_sample": + _FakeTaskHandler.interaction_count = 0 + _FakeTaskHandler.agent_responses = [] + response = { + "session_id": 1001, + "output": { + "status": "running", + "result": None, + "history": [ + {"role": "user", "content": "You are in a dark room. What do you do?"} + ], + }, + } + self._respond(200, response) + + elif self.path == "/api/interact": + _FakeTaskHandler.interaction_count += 1 + agent_content = body.get("agent_response", {}).get("content", "") + _FakeTaskHandler.agent_responses.append(agent_content) + + if _FakeTaskHandler.interaction_count >= 2: + response = { + "output": { + "status": "completed", + "result": {"score": 1.0, "success": True}, + "history": [ + {"role": "user", "content": "You are in a dark room. What do you do?"}, + {"role": "agent", "content": _FakeTaskHandler.agent_responses[0]}, + {"role": "user", "content": "You turned on the light. You see a key."}, + {"role": "agent", "content": agent_content}, + ], + } + } + else: + response = { + "output": { + "status": "running", + "result": None, + "history": [ + {"role": "user", "content": "You are in a dark room. What do you do?"}, + {"role": "agent", "content": agent_content}, + {"role": "user", "content": "You turned on the light. You see a key."}, + ], + } + } + + self._respond(200, response) + + elif self.path == "/api/cancel": + self._respond(200, {"status": "cancelled"}) + + else: + self._respond(404, {"error": "not found"}) + + def do_GET(self): + if self.path.startswith("/api/list_workers"): + self._respond(200, {}) + elif self.path.startswith("/api/get_indices"): + self._respond(200, [0, 1, 2]) + else: + self._respond(404, {}) + + def _respond(self, code, body): + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(body).encode()) + + def log_message(self, format, *args): + pass # suppress request logging during tests + + +@pytest.fixture(scope="class") +def fake_task_server(): + """Start a local HTTP server simulating the task controller.""" + server = HTTPServer(("127.0.0.1", 0), _FakeTaskHandler) + port = server.server_address[1] + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + yield f"http://127.0.0.1:{port}/api" + server.shutdown() + + +class TestAG2EndToEnd: + """Full end-to-end test: config -> agent -> TaskClient -> fake server. + + Exercises the real code paths: + - ConfigLoader loads YAML + - InstanceFactory creates AG2Agent + - TaskClient.run_sample() calls agent.inference() in a loop + - Agent resets state between turns + - Responses flow back through the task protocol + """ + + @_patch_ag2 + def test_full_task_loop(self, mock_proxy, mock_asst, mock_llm, fake_task_server): + """Run a complete 2-turn task with AG2Agent through TaskClient.""" + from src.client.task import TaskClient + + call_count = [0] + responses = [ + "I will look for a light switch.", + "I pick up the key.", + ] + + def mock_run(recipient, message): + """Simulate run() — populate chat_messages as side effect.""" + resp = responses[min(call_count[0], len(responses) - 1)] + call_count[0] += 1 + # Populate assistant's chat_messages (what real AG2 does) + mock_asst_inst.chat_messages = defaultdict(list, { + mock_proxy_inst: [ + {"role": "user", "content": message, "name": "user"}, + {"role": "assistant", "content": resp, "name": "assistant"}, + ] + }) + run_response = MagicMock() + run_response.process.return_value = None + return run_response + + mock_asst_inst = MagicMock() + mock_asst_inst.chat_messages = defaultdict(list) + mock_asst.return_value = mock_asst_inst + + mock_proxy_inst = MagicMock() + mock_proxy_inst.run.side_effect = mock_run + mock_proxy.return_value = mock_proxy_inst + + from src.configs import ConfigLoader + from src.typings import InstanceFactory + + config = ConfigLoader().load_from("configs/agents/ag2-single.yaml") + config["parameters"]["api_key"] = "e2e-test-key" + agent = InstanceFactory(**config).create() + + task_client = TaskClient(name="test-task", controller_address=fake_task_server) + output = task_client.run_sample(index=0, agent=agent) + + assert output.error is None, f"Task failed: {output.error} — {output.info}" + assert output.output is not None + + task_output = output.output + assert task_output.status.value == "completed" + assert task_output.result["success"] is True + assert task_output.result["score"] == 1.0 + assert call_count[0] == 2 + + agent_msgs = [m.content for m in task_output.history if m.role == "agent"] + assert "I will look for a light switch." in agent_msgs + assert "I pick up the key." in agent_msgs + assert _FakeTaskHandler.interaction_count == 2 + + @_patch_ag2 + def test_task_loop_agent_error_recovery(self, mock_proxy, mock_asst, mock_llm, fake_task_server): + """Verify TaskClient handles agent errors within run_sample.""" + from src.client.task import TaskClient + + mock_proxy_inst = MagicMock() + mock_proxy_inst.run.side_effect = RuntimeError("Model overloaded") + mock_proxy.return_value = mock_proxy_inst + + agent = AG2Agent(api_key="e2e-test-key") + task_client = TaskClient(name="test-task", controller_address=fake_task_server) + output = task_client.run_sample(index=0, agent=agent) + + # Agent returns "Error: ..." string which is valid — + # TaskClient treats any string as a valid response, pipeline doesn't crash + assert output is not None + + @patch("autogen.GroupChatManager") + @patch("autogen.GroupChat") + @_patch_ag2 + def test_groupchat_in_task_loop( + self, mock_proxy, mock_asst, mock_llm, mock_gc, mock_gcm, fake_task_server + ): + """Run GroupChatAgent through a full task loop.""" + from src.client.task import TaskClient + + call_count = [0] + + def mock_run(recipient, message): + call_count[0] += 1 + # Populate group_chat.messages as side effect + agent._group_chat.messages = [ + {"role": "user", "content": message}, + {"role": "assistant", "content": "Planner: analyze task"}, + {"role": "assistant", "content": f"Executor: action {call_count[0]}"}, + ] + run_response = MagicMock() + run_response.process.return_value = None + return run_response + + mock_proxy_inst = MagicMock() + mock_proxy_inst.run.side_effect = mock_run + mock_proxy.return_value = mock_proxy_inst + + agent = AG2GroupChatAgent(api_key="e2e-test-key") + task_client = TaskClient(name="test-task", controller_address=fake_task_server) + output = task_client.run_sample(index=0, agent=agent) + + assert output.error is None, f"Task failed: {output.error}" + assert output.output.status.value == "completed" + assert call_count[0] == 2