From ec482135fb9896d2abf41314feca695e0516e28c Mon Sep 17 00:00:00 2001 From: "joseph.marinier" Date: Thu, 7 May 2026 15:26:55 -0400 Subject: [PATCH 1/2] Yield multiple validation errors at once from `_check_companion_services()`, not just the first one. --- src/eva/models/config.py | 29 +++++++++++++++---------- tests/unit/models/test_config_models.py | 5 +++++ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 338d4097..63a41d1e 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -14,6 +14,7 @@ import copy import logging +from collections.abc import Iterator from datetime import UTC, datetime from enum import StrEnum from pathlib import Path @@ -27,11 +28,13 @@ Discriminator, Field, Tag, + ValidationError, computed_field, field_serializer, field_validator, model_validator, ) +from pydantic_core import InitErrorDetails, PydanticCustomError from pydantic_settings import BaseSettings, CliSuppress, SettingsConfigDict from eva.models.provenance import RunProvenance @@ -702,16 +705,17 @@ def _warn_deprecated_aliases(cls, data: Any) -> Any: @model_validator(mode="after") def _check_companion_services(self) -> "RunConfig": """Ensure required companion services are set for each pipeline mode.""" - required_keys = ["api_key", "model"] + errors: list[InitErrorDetails] = [] if isinstance(self.model, PipelineConfig): - self._validate_service_params("STT", self.model.stt, required_keys, self.model.stt_params) - self._validate_service_params("TTS", self.model.tts, required_keys, self.model.tts_params) + errors.extend(self._validate_service_params("STT", self.model.stt, self.model.stt_params)) + errors.extend(self._validate_service_params("TTS", self.model.tts, self.model.tts_params)) elif isinstance(self.model, AudioLLMConfig): - self._validate_service_params("TTS", self.model.tts, required_keys, self.model.tts_params) - self._validate_service_params("audio_llm", self.model.audio_llm, required_keys, self.model.audio_llm_params) + errors.extend(self._validate_service_params("TTS", self.model.tts, self.model.tts_params)) + errors.extend(self._validate_service_params("AUDIO_LLM", self.model.audio_llm, self.model.audio_llm_params)) elif isinstance(self.model, SpeechToSpeechConfig): - # api_key is required, some s2s services don't require model - self._validate_service_params("S2S", self.model.s2s, required_keys, self.model.s2s_params) + errors.extend(self._validate_service_params("S2S", self.model.s2s, self.model.s2s_params)) + if errors: + raise ValidationError.from_exception_data(title=type(self).__name__, line_errors=errors) return self @model_validator(mode="after") @@ -723,17 +727,20 @@ def _set_default_run_id(self) -> "RunConfig": @classmethod def _validate_service_params( - cls, service: str, provider: str, required_keys: list[str], params: dict[str, Any] - ) -> None: + cls, service: str, provider: str, params: dict[str, Any] + ) -> Iterator[InitErrorDetails]: """Validate that STT/TTS params contain required keys.""" - missing = [key for key in required_keys if key not in params] + required_keys = ["api_key", "model"] + missing = [key for key in required_keys if key not in params] if params else required_keys if missing: missing_str = " and ".join(f'"{k}"' for k in missing) env_var = f"EVA_MODEL__{service}_PARAMS" - raise ValueError( + message = ( f"{missing_str} required in {env_var} for {provider} {service}. " f'Example: {env_var}=\'{{"api_key": "your_key", "model": "your_model"}}\'' ) + loc = ("model", f"{service.lower()}_params") + yield InitErrorDetails(type=PydanticCustomError("missing_service_params", message), loc=loc, input=params) @model_validator(mode="before") @classmethod diff --git a/tests/unit/models/test_config_models.py b/tests/unit/models/test_config_models.py index 5d794c07..c076c996 100644 --- a/tests/unit/models/test_config_models.py +++ b/tests/unit/models/test_config_models.py @@ -408,6 +408,10 @@ def test_invalid_model_list(self, environ, expected_exception, expected_message) {"EVA_MODEL__LLM": "gpt-5.2", "EVA_MODEL__STT": "deepgram"}, r"model\.pipeline\.tts\s+Field required", ), + ( + {"EVA_MODEL__LLM": "gpt-5.2"}, + r"model\.pipeline\.stt\s+Field required(?s:.+)model\.pipeline\.tts\s+Field required", + ), ( {"EVA_MODEL__AUDIO_LLM": "ultravox"}, r"model\.audio_llm\.tts\s+Field required", @@ -422,6 +426,7 @@ def test_invalid_model_list(self, environ, expected_exception, expected_message) "Mixed all three", "LLM without STT", "LLM without TTS", + "LLM without STT and TTS", "Audio LLM without TTS", ), ) From e94cfec26839b5feecf9195fd6b4335a0d9fe90a Mon Sep 17 00:00:00 2001 From: "joseph.marinier" Date: Thu, 7 May 2026 15:36:12 -0400 Subject: [PATCH 2/2] Refactor model config - Get rid of `_strip_other_mode_fields()` by merging the 3 model classes into a single `ModelConfig` that always accepts extra `EVA_MODEL__*_PARAMS` environment variables, even if not used. - Skip validating `EVA_MODEL__*` environment variables when `EVA_MAX_RERUN_ATTEMPTS=0` (and not when `EVA_FORCE_RERUN_METRICS=True`, since it could be set while there are still conversations to rerun). That way, if you specify `EVA_MAX_RERUN_ATTEMPTS=0`, you no longer need to remove these `EVA_MODEL__*` environment variables from your `.env` file. --- docs/assistant_server_contract.md | 27 +- src/eva/assistant/base_server.py | 4 +- src/eva/assistant/elevenlabs_server.py | 12 +- src/eva/assistant/gemini_live_server.py | 11 +- src/eva/assistant/openai_realtime_server.py | 8 +- src/eva/assistant/pipecat_server.py | 22 +- src/eva/metrics/runner.py | 2 +- src/eva/models/__init__.py | 10 +- src/eva/models/config.py | 412 ++++++-------------- src/eva/orchestrator/runner.py | 4 +- src/eva/run_benchmark.py | 4 +- tests/integration/test_evaluation_mode.py | 4 +- tests/unit/assistant/test_pipecat_server.py | 5 +- tests/unit/models/test_config_models.py | 192 ++++++++- tests/unit/orchestrator/test_runner.py | 84 +++- 15 files changed, 418 insertions(+), 383 deletions(-) diff --git a/docs/assistant_server_contract.md b/docs/assistant_server_contract.md index 2048312f..62124ac5 100644 --- a/docs/assistant_server_contract.md +++ b/docs/assistant_server_contract.md @@ -27,8 +27,7 @@ produce `ConversationResult`. The server name must be registered in ## Quick checklist - [ ] Subclass `AbstractAssistantServer` -- [ ] Assert `isinstance(self.pipeline_config, SpeechToSpeechConfig)` (or the correct - config type) in `__init__` +- [ ] Read `self.pipeline_config.s2s_params` (a `ModelConfig` field) in `__init__` - [ ] Expose `ws://localhost:{self.port}/ws` accepting Twilio-framed audio - [ ] Override `_audio_sample_rate` to match the recording sample rate - [ ] Populate `self.user_audio_buffer` and `self.assistant_audio_buffer` during @@ -58,16 +57,12 @@ Call `super().__init__(**kwargs)` first. The base class sets up: | `self._audio_buffer` | `bytearray` | Mixed audio (leave empty — base class mixes automatically) | | `self._audio_sample_rate` | `int` | Recording sample rate, default 24000 | -After calling `super().__init__()`, narrow the config type and initialize state: +After calling `super().__init__()`, read params and initialize state: ```python def __init__(self, **kwargs): super().__init__(**kwargs) - if isinstance(self.pipeline_config, SpeechToSpeechConfig): - s2s_params = self.pipeline_config.s2s_params - else: - logger.error("Pipeline config is not SpeechToSpeechConfig") - return + s2s_params = self.pipeline_config.s2s_params or {} self._model = s2s_params["model"] # model is required in the s2s params config self._audio_sample_rate = SAMPLE_RATE # match recording rate self._fw_log: FrameworkLogWriter | None = None @@ -75,9 +70,6 @@ def __init__(self, **kwargs): # ... other setup ``` -The assertion fails fast with a clear message rather than an obscure `AttributeError` -later. - --- ## 2. `start()` — server startup @@ -352,16 +344,13 @@ async def save_outputs(self) -> None: ## 9. Config type -`pipeline_config` arrives as a union type from the orchestrator. For s2s models it -will be `SpeechToSpeechConfig`, which exposes: +`pipeline_config` is a `ModelConfig` instance from the orchestrator. For s2s models, it exposes: ```python self.pipeline_config.s2s # model identifier string self.pipeline_config.s2s_params # dict of additional params (api_key, voice, model, etc.) ``` -Return if the config is not `SpeechToSpeechConfig`. - Server should be documented in the relevant `configs/` YAML with `framework: my_framework` and `model: {s2s: my-model-id, s2s_params: {...}}`. @@ -402,18 +391,14 @@ from eva.assistant.audio_bridge import ( sync_buffer_to_position, ) from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer -from eva.models.config import SpeechToSpeechConfig +from eva.models.config import ModelConfig class MyFrameworkAssistantServer(AbstractAssistantServer): def __init__(self, **kwargs): super().__init__(**kwargs) - if isinstance(self.pipeline_config, SpeechToSpeechConfig): - s2s_params = self.pipeline_config.s2s_params - else: - logger.error("Pipeline config is not SpeechToSpeechConfig") - return + s2s_params = self.pipeline_config.s2s_params or {} self._model = s2s_params["model"] diff --git a/src/eva/assistant/base_server.py b/src/eva/assistant/base_server.py index 70c27164..68fd92c5 100644 --- a/src/eva/assistant/base_server.py +++ b/src/eva/assistant/base_server.py @@ -19,7 +19,7 @@ from eva.assistant.audio_bridge import FrameworkLogWriter, MetricsLogWriter from eva.assistant.tools.tool_executor import ToolExecutor from eva.models.agents import AgentConfig -from eva.models.config import AudioLLMConfig, PipelineConfig, SpeechToSpeechConfig +from eva.models.config import ModelConfig from eva.utils.audio_utils import save_pcm_as_wav from eva.utils.logging import get_logger @@ -43,7 +43,7 @@ class AbstractAssistantServer(ABC): def __init__( self, current_date_time: str, - pipeline_config: PipelineConfig | SpeechToSpeechConfig | AudioLLMConfig, + pipeline_config: ModelConfig, agent: AgentConfig, agent_config_path: str, scenario_db_path: str, diff --git a/src/eva/assistant/elevenlabs_server.py b/src/eva/assistant/elevenlabs_server.py index 2b978b3f..74f1b1fd 100644 --- a/src/eva/assistant/elevenlabs_server.py +++ b/src/eva/assistant/elevenlabs_server.py @@ -17,7 +17,6 @@ import json import time from pathlib import Path -from typing import Any try: import audioop @@ -45,7 +44,7 @@ from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer from eva.assistant.elevenlabs_audio_interface import TwilioAudioBridge from eva.models.agents import AgentConfig -from eva.models.config import SpeechToSpeechConfig +from eva.models.config import ModelConfig from eva.utils.logging import get_logger from eva.utils.prompt_manager import PromptManager @@ -113,7 +112,7 @@ class ElevenLabsAssistantServer(AbstractAssistantServer): def __init__( self, current_date_time: str, - pipeline_config: SpeechToSpeechConfig, + pipeline_config: ModelConfig, agent: AgentConfig, agent_config_path: str, scenario_db_path: str, @@ -135,12 +134,7 @@ def __init__( # Recording sample rate (ElevenLabs operates at 16 kHz) self._audio_sample_rate = _RECORDING_SAMPLE_RATE - s2s_params: dict[str, Any] = {} - if isinstance(self.pipeline_config, SpeechToSpeechConfig): - s2s_params = self.pipeline_config.s2s_params - else: - logger.error("Pipeline config is not SpeechToSpeechConfig") - return + s2s_params = self.pipeline_config.s2s_params or {} self.s2s_params = s2s_params self._model = s2s_params.get("model", "elevenlabs") diff --git a/src/eva/assistant/gemini_live_server.py b/src/eva/assistant/gemini_live_server.py index ec3910b3..37b9bc08 100644 --- a/src/eva/assistant/gemini_live_server.py +++ b/src/eva/assistant/gemini_live_server.py @@ -38,7 +38,7 @@ ) from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer from eva.models.agents import AgentConfig -from eva.models.config import SpeechToSpeechConfig +from eva.models.config import ModelConfig from eva.utils.logging import get_logger from eva.utils.prompt_manager import PromptManager @@ -154,7 +154,7 @@ class GeminiLiveAssistantServer(AbstractAssistantServer): def __init__( self, current_date_time: str, - pipeline_config: SpeechToSpeechConfig, + pipeline_config: ModelConfig, agent: AgentConfig, agent_config_path: str, scenario_db_path: str, @@ -177,12 +177,7 @@ def __init__( self._audio_sample_rate = _RECORDING_SAMPLE_RATE # Gemini model name from s2s_params or default - s2s_params: dict[str, Any] = {} - if isinstance(self.pipeline_config, SpeechToSpeechConfig): - s2s_params = self.pipeline_config.s2s_params - else: - logger.error("Pipeline config is not SpeechToSpeechConfig") - return + s2s_params = self.pipeline_config.s2s_params or {} self._model = s2s_params["model"] self._voice = s2s_params.get("voice", "Kore") self._language_code = s2s_params.get("language_code", "en-US") diff --git a/src/eva/assistant/openai_realtime_server.py b/src/eva/assistant/openai_realtime_server.py index ae732410..0a80f2b7 100644 --- a/src/eva/assistant/openai_realtime_server.py +++ b/src/eva/assistant/openai_realtime_server.py @@ -27,7 +27,6 @@ sync_buffer_to_position, ) from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer -from eva.models.config import SpeechToSpeechConfig from eva.utils.logging import get_logger from eva.utils.prompt_manager import PromptManager @@ -104,12 +103,7 @@ def __init__(self, **kwargs: Any) -> None: # User speech start timestamp from audio_interface (source of truth) self._audio_interface_speech_start_ts: str | None = None - if isinstance(self.pipeline_config, SpeechToSpeechConfig): - s2s_params = self.pipeline_config.s2s_params - else: - logger.error("Pipeline config is not SpeechToSpeechConfig") - return - + s2s_params = self.pipeline_config.s2s_params or {} self._model: str = s2s_params["model"] async def start(self) -> None: diff --git a/src/eva/assistant/pipecat_server.py b/src/eva/assistant/pipecat_server.py index 769fdbe1..6f532bd9 100644 --- a/src/eva/assistant/pipecat_server.py +++ b/src/eva/assistant/pipecat_server.py @@ -61,7 +61,7 @@ ) from eva.assistant.services.llm import LiteLLMClient from eva.models.agents import AgentConfig -from eva.models.config import AudioLLMConfig, PipelineConfig, SpeechToSpeechConfig +from eva.models.config import ModelConfig, PipelineType from eva.utils.logging import get_logger logger = get_logger(__name__) @@ -88,7 +88,7 @@ class PipecatAssistantServer(AbstractAssistantServer): def __init__( self, current_date_time: str, - pipeline_config: PipelineConfig | SpeechToSpeechConfig | AudioLLMConfig, + pipeline_config: ModelConfig, agent: AgentConfig, agent_config_path: str, scenario_db_path: str, @@ -225,7 +225,7 @@ async def _handle_session(self, websocket) -> None: audio_llm_processor = None audio_llm_audio_collector = None alm_client = None - if isinstance(self.pipeline_config, SpeechToSpeechConfig): + if self.pipeline_config.pipeline_type == PipelineType.S2S: realtime_llm = create_realtime_llm_service( self.pipeline_config.s2s, self.pipeline_config.s2s_params, @@ -256,7 +256,7 @@ async def _realtime_tool_handler(params) -> None: realtime_llm.register_function(function_name=None, handler=_realtime_tool_handler) stt = None tts = None - elif isinstance(self.pipeline_config, AudioLLMConfig): + elif self.pipeline_config.pipeline_type == PipelineType.AUDIO_LLM: # Audio-LLM mode: model handles STT+LLM, TTS still needed stt = None tts = create_tts_service( @@ -338,8 +338,8 @@ async def _realtime_tool_handler(params) -> None: # Create Audio-LLM components now that context/user_aggregator are available audio_llm_audio_collector = None audio_llm_processor = None - if isinstance(self.pipeline_config, AudioLLMConfig): - assert alm_client is not None # Set in AudioLLMConfig branch above + if self.pipeline_config.pipeline_type == PipelineType.AUDIO_LLM: + assert alm_client is not None # Set in AUDIO_LLM branch above audio_llm_audio_collector = AudioLLMUserAudioCollector( context, user_aggregator, pre_speech_secs=VAD_PRE_SPEECH_BUFFER_SECS ) @@ -392,7 +392,7 @@ async def on_user_transcription(text: str, timestamp: str, turn_id: int | None) ) # Create agent processor (pipeline mode only — realtime handles LLM internally) agent_processor = None - if isinstance(self.pipeline_config, PipelineConfig): + if self.pipeline_config.pipeline_type == PipelineType.CASCADE: agent_processor = BenchmarkAgentProcessor( current_date_time=self.current_date_time, agent=self.agent, @@ -589,7 +589,7 @@ async def on_client_connected(transport, client): await audiobuffer.start_recording() # Send initial greeting - if isinstance(self.pipeline_config, SpeechToSpeechConfig): + if self.pipeline_config.pipeline_type == PipelineType.S2S: await task.queue_frames([LLMRunFrame()]) else: await task.queue_frames([TTSSpeakFrame(INITIAL_MESSAGE)]) @@ -643,13 +643,13 @@ async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMes logger.info(f"User turn stopped - complete transcript: '{message.content}'") logger.debug(f"Turn timestamp: {message.timestamp}, user_id: {message.user_id}") - if isinstance(self.pipeline_config, PipelineConfig): + if self.pipeline_config.pipeline_type == PipelineType.CASCADE: # STT provides real transcript text — save it now await self._save_transcript_message_from_turn( role="user", content=message.content, timestamp=message.timestamp ) await agent_processor.process_complete_user_turn(message.content) - elif isinstance(self.pipeline_config, AudioLLMConfig) and audio_llm_processor: + elif self.pipeline_config.pipeline_type == PipelineType.AUDIO_LLM and audio_llm_processor: # No STT → message.content is empty. # Processing is triggered by LLMContextFrame flow through ParallelPipeline # (AudioLLMUserAudioCollector pushes LLMContextFrame on UserStoppedSpeakingFrame) @@ -721,7 +721,7 @@ def _save_transcript(self) -> None: For pipeline mode, only write if not already written incrementally. """ transcript_path = self.output_dir / "transcript.jsonl" - if isinstance(self.pipeline_config, SpeechToSpeechConfig) or not transcript_path.exists(): + if self.pipeline_config.pipeline_type == PipelineType.S2S or not transcript_path.exists(): self.audit_log.save_transcript_jsonl(transcript_path) async def save_outputs(self) -> None: diff --git a/src/eva/metrics/runner.py b/src/eva/metrics/runner.py index 0cd5f742..78c25514 100644 --- a/src/eva/metrics/runner.py +++ b/src/eva/metrics/runner.py @@ -149,7 +149,7 @@ def _load_agent_config(self) -> dict[str, Any]: config_data = json.loads(config_path.read_text()) - # Determine pipeline type from config (fallback to False for legacy runs) + # Determine pipeline type from config model_data = config_data.get("model", {}) self._pipeline_type = get_pipeline_type(model_data) if model_data else PipelineType.CASCADE diff --git a/src/eva/models/__init__.py b/src/eva/models/__init__.py index cac22376..d5f1a53d 100644 --- a/src/eva/models/__init__.py +++ b/src/eva/models/__init__.py @@ -7,10 +7,9 @@ AgentToolParameter, ) from eva.models.config import ( - AudioLLMConfig, - PipelineConfig, + ModelConfig, + PipelineType, RunConfig, - SpeechToSpeechConfig, ) from eva.models.record import ( AgentOverride, @@ -37,9 +36,8 @@ "AgentOverride", # Config models "RunConfig", - "PipelineConfig", - "SpeechToSpeechConfig", - "AudioLLMConfig", + "ModelConfig", + "PipelineType", # Result models "ConversationResult", "MetricScore", diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 63a41d1e..5bc82e49 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -18,16 +18,14 @@ from datetime import UTC, datetime from enum import StrEnum from pathlib import Path -from typing import Annotated, Any, ClassVar, Literal +from typing import Any, ClassVar, Literal import yaml from litellm.types.router import DeploymentTypedDict from pydantic import ( BaseModel, ConfigDict, - Discriminator, Field, - Tag, ValidationError, computed_field, field_serializer, @@ -56,8 +54,13 @@ def _param_alias(params: dict[str, Any]) -> str: return params.get("alias") or params["model"] -class PipelineConfig(BaseModel): - """Configuration for a STT + LLM + TTS pipeline.""" +class ModelConfig(BaseModel): + """Flat model configuration covering all pipeline modes. + + Exactly one mode selector (``llm``, ``s2s``, or ``audio_llm``) should be set. + Mode exclusivity is enforced by ``RunConfig``, not here, so that + ``max_rerun_attempts == 0`` can freely construct a config with mixed env vars. + """ model_config = ConfigDict(extra="forbid") @@ -69,15 +72,28 @@ class PipelineConfig(BaseModel): } _LEGACY_DROP: ClassVar[set[str]] = {"realtime_model", "realtime_model_params"} - llm: str = Field( + # ── Mode selectors (exactly one group must be set for a real run) ── + llm: str | None = Field( + None, description="LLM model name matching a model_name in --model-list/EVA_MODEL_LIST", examples=["gpt-5.2", "gemini-3-pro"], ) - stt: str = Field(description="STT model", examples=["deepgram", "openai_whisper"]) - tts: str = Field(description="TTS model", examples=["cartesia", "elevenlabs"]) + stt: str | None = Field(None, description="STT model", examples=["deepgram", "openai_whisper"]) + tts: str | None = Field(None, description="TTS model", examples=["cartesia", "elevenlabs"]) + + s2s: str | None = Field( + None, description="Speech-to-speech model name", examples=["gpt-realtime-mini", "gemini_live"] + ) - stt_params: dict[str, Any] = Field({}, description="Additional STT model parameters (JSON)") - tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") + audio_llm: str | None = Field(None, description="Audio-LLM model identifier", examples=["vllm"]) + + # ── Params dicts ── + stt_params: dict[str, Any] | None = Field(None, description="Additional STT model parameters (JSON)") + tts_params: dict[str, Any] | None = Field(None, description="Additional TTS model parameters (JSON)") + s2s_params: dict[str, Any] | None = Field(None, description="Additional speech-to-speech model parameters (JSON)") + audio_llm_params: dict[str, Any] | None = Field( + None, description="Audio-LLM parameters (JSON): base_url (required), api_key, model, temperature, max_tokens" + ) # Configurable turn start/stop strategies turn_start_strategy: str = Field( @@ -120,14 +136,41 @@ class PipelineConfig(BaseModel): ), ) + @property + def pipeline_type(self) -> "PipelineType": + """Detected pipeline mode based on which selector is set.""" + if self.audio_llm: + return PipelineType.AUDIO_LLM + if self.s2s: + return PipelineType.S2S + if self.llm: + return PipelineType.CASCADE + @property def pipeline_parts(self) -> dict[str, str]: - """Component names for this pipeline.""" - return { - "stt": _param_alias(self.stt_params), - "llm": self.llm, - "tts": _param_alias(self.tts_params), - } + """Component names for this pipeline (used in run_id generation).""" + match self.pipeline_type: + case PipelineType.AUDIO_LLM: + return { + "audio_llm": _param_alias(self.audio_llm_params), + "tts": _param_alias(self.tts_params), + } + case PipelineType.S2S: + if self.s2s == "elevenlabs": + # hardcoded for now. Models are set on the agent UI + return { + "s2s": _param_alias(self.s2s_params) or self.s2s, + "stt": "scribe_v2.2_realtime", + "llm": "gemini-3-flash-preview", + "tts": "v3-conversational", + } + return {"s2s": _param_alias(self.s2s_params)} + case PipelineType.CASCADE: + return { + "stt": _param_alias(self.stt_params), + "llm": self.llm, + "tts": _param_alias(self.tts_params), + } @model_validator(mode="before") @classmethod @@ -145,181 +188,6 @@ def _migrate_legacy_fields(cls, data: Any) -> Any: return data -class SpeechToSpeechConfig(BaseModel): - """Configuration for a speech-to-speech model.""" - - model_config = ConfigDict(extra="forbid") - - s2s: str = Field(description="Speech-to-speech model name", examples=["gpt-realtime-mini", "gemini_live"]) - s2s_params: dict[str, Any] = Field({}, description="Additional speech-to-speech model parameters (JSON)") - - # Configurable turn start/stop strategies (same as PipelineConfig) - turn_start_strategy: str = Field( - "vad", - description=( - "User turn start strategy: 'vad', 'transcription', or 'external'. " - "Defaults to 'vad' (VADUserTurnStartStrategy). " - "Set via EVA_MODEL__TURN_START_STRATEGY." - ), - ) - turn_start_strategy_params: dict[str, Any] = Field( - {}, - description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", - ) - - turn_stop_strategy: str = Field( - "turn_analyzer", - description=( - "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " - "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " - "Set via EVA_MODEL__TURN_STOP_STRATEGY." - ), - ) - turn_stop_strategy_params: dict[str, Any] = Field( - {}, - description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", - ) - - # VAD configuration - vad: str = Field( - "silero", - description=( - "VAD analyzer type: 'silero' or 'none'. Defaults to 'silero' (SileroVADAnalyzer). Use 'none' with external turn strategies (e.g. deepgram-flux) to skip local VAD. Set via EVA_MODEL__VAD." - ), - ) - vad_params: dict[str, Any] = Field( - {}, - description=( - "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. Set via EVA_MODEL__VAD_PARAMS." - ), - ) - - @property - def pipeline_parts(self) -> dict[str, str]: - """Component names for this pipeline.""" - if self.s2s == "elevenlabs": - # hardcoded for now. Models are set on the agent UI - return { - "s2s": _param_alias(self.s2s_params), - "stt": "scribe_v2.2_realtime", - "llm": "gemini-3-flash-preview", - "tts": "v3-conversational", - } - return {"s2s": _param_alias(self.s2s_params)} - - -class AudioLLMConfig(BaseModel): - """Configuration for an Audio-LLM pipeline (audio in, text out, separate TTS). - - Used for models like self-hosted Ultravox that accept audio input + text context - and return text output, requiring a separate TTS stage for speech synthesis. - """ - - model_config = ConfigDict(extra="forbid") - - audio_llm: str = Field( - description="Audio-LLM model identifier", - examples=["vllm"], - ) - audio_llm_params: dict[str, Any] = Field( - {}, - description=( - "Audio-LLM parameters (JSON): base_url (required), api_key, model, temperature, max_tokens, " - "vad_stop_secs (default: 0.4), smart_turn_stop_secs (default: 0.8)" - ), - ) - tts: str = Field(description="TTS model", examples=["cartesia", "elevenlabs"]) - tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") - - # Configurable turn start/stop strategies (same as PipelineConfig) - turn_start_strategy: str = Field( - "vad", - description=( - "User turn start strategy: 'vad', 'transcription', or 'external'. " - "Defaults to 'vad' (VADUserTurnStartStrategy). " - "Set via EVA_MODEL__TURN_START_STRATEGY." - ), - ) - turn_start_strategy_params: dict[str, Any] = Field( - {}, - description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", - ) - - turn_stop_strategy: str = Field( - "turn_analyzer", - description=( - "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " - "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " - "Set via EVA_MODEL__TURN_STOP_STRATEGY." - ), - ) - turn_stop_strategy_params: dict[str, Any] = Field( - {}, - description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", - ) - - # VAD configuration - vad: str = Field( - "silero", - description=( - "VAD analyzer type: 'silero' or 'none'. Defaults to 'silero' (SileroVADAnalyzer). Use 'none' with external turn strategies (e.g. deepgram-flux) to skip local VAD. Set via EVA_MODEL__VAD." - ), - ) - vad_params: dict[str, Any] = Field( - {}, - description=( - "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. Set via EVA_MODEL__VAD_PARAMS." - ), - ) - - @property - def pipeline_parts(self) -> dict[str, str]: - """Component names for this pipeline.""" - return { - "audio_llm": _param_alias(self.audio_llm_params), - "tts": _param_alias(self.tts_params), - } - - -_PIPELINE_FIELDS = { - "llm", - "stt", - "tts", - "stt_params", - "tts_params", - "turn_start_strategy", - "turn_start_strategy_params", - "turn_stop_strategy", - "turn_stop_strategy_params", - "vad", - "vad_params", - *PipelineConfig._LEGACY_RENAMES, - *PipelineConfig._LEGACY_DROP, -} -_S2S_FIELDS = { - "s2s", - "s2s_params", - "turn_start_strategy", - "turn_start_strategy_params", - "turn_stop_strategy", - "turn_stop_strategy_params", - "vad", - "vad_params", -} -_AUDIO_LLM_FIELDS = { - "audio_llm", - "audio_llm_params", - "tts", - "tts_params", - "turn_start_strategy", - "turn_start_strategy_params", - "turn_stop_strategy", - "turn_stop_strategy_params", - "vad", - "vad_params", -} - - class PipelineType(StrEnum): """Type of voice pipeline.""" @@ -328,31 +196,14 @@ class PipelineType(StrEnum): S2S = "s2s" -def _model_config_discriminator(data: Any) -> str: - """Discriminate which pipeline config type to use based on unique fields.""" - if isinstance(data, dict): - if "audio_llm" in data: - return "audio_llm" - if "s2s" in data: - return "s2s" - return "pipeline" - if isinstance(data, AudioLLMConfig): - return "audio_llm" - if isinstance(data, SpeechToSpeechConfig): - return "s2s" - return "pipeline" - - -def get_pipeline_type(model_data: dict | Any) -> PipelineType: +def get_pipeline_type(model_data: dict) -> PipelineType: """Return the pipeline type for the given model config. - Works with both raw dicts (e.g. from config.json) and parsed model config objects. + Works with raw dicts, e.g., from config.json. Also handles legacy configs where ``realtime_model`` was stored alongside - ``llm_model`` in a flat dict (before the discriminated-union refactor). + ``llm_model`` in a flat dict. """ - mode = _model_config_discriminator(model_data) - if mode == "s2s": - s2s_value = model_data.get("s2s") + if s2s_value := model_data.get("s2s"): # ElevenLabs uses s2s_params for configuration but is a cascade pipeline internally if s2s_value == "elevenlabs": return PipelineType.CASCADE @@ -360,55 +211,14 @@ def get_pipeline_type(model_data: dict | Any) -> PipelineType: if s2s_value == "ultravox": return PipelineType.AUDIO_LLM return PipelineType.S2S - if mode == "audio_llm": + if model_data.get("audio_llm"): return PipelineType.AUDIO_LLM # Legacy: realtime_model was a sibling of llm_model before the union split - if isinstance(model_data, dict) and model_data.get("realtime_model"): + if model_data.get("realtime_model"): return PipelineType.S2S return PipelineType.CASCADE -def _strip_other_mode_fields(data: dict, strict: bool = True) -> dict: - """Validate pipeline mode exclusivity, then strip irrelevant shared fields. - - Raises ``ValueError`` if multiple pipeline modes are specified (when strict=True). - Then strips shared fields (e.g. ``tts`` from S2S mode) so that - ``extra="forbid"`` on each config class doesn't reject them. - - Args: - data: Raw config dictionary from the YAML/env input. - strict: If False, skip the conflict error (used for metrics-only re-runs - where the model config is not needed). - """ - # --- Mutual exclusivity: only one pipeline mode allowed --- - has_llm = bool(data.get("llm") or data.get("llm_model")) - has_s2s = bool(data.get("s2s")) - has_audio_llm = bool(data.get("audio_llm")) - active = [ - name - for flag, name in [ - (has_llm, "EVA_MODEL__LLM"), - (has_s2s, "EVA_MODEL__S2S"), - (has_audio_llm, "EVA_MODEL__AUDIO_LLM"), - ] - if flag - ] - if len(active) > 1 and strict: - raise ValueError( - f"Multiple pipeline modes set: {', '.join(active)}. " - f"Set exactly one of: EVA_MODEL__LLM (ASR-LLM-TTS), " - f"EVA_MODEL__S2S (S2S), or EVA_MODEL__AUDIO_LLM (SpeechLM-TTS)." - ) - - mode = _model_config_discriminator(data) - if mode == "audio_llm": - return {k: v for k, v in data.items() if k in _AUDIO_LLM_FIELDS} - if mode == "s2s": - return {k: v for k, v in data.items() if k in _S2S_FIELDS} - # pipeline: keep pipeline fields + any legacy fields the model_validator handles - return {k: v for k, v in data.items() if k in _PIPELINE_FIELDS} - - class BackgroundNoiseType(StrEnum): """Ambient noise type mixed into user audio (speech and silence).""" @@ -478,15 +288,6 @@ def _validate_exclusivity(self) -> "PerturbationConfig": return self -# Discriminated union so Pydantic picks the right config type from env vars / CLI -ModelConfigUnion = Annotated[ - Annotated[PipelineConfig, Tag("pipeline")] - | Annotated[SpeechToSpeechConfig, Tag("s2s")] - | Annotated[AudioLLMConfig, Tag("audio_llm")], - Discriminator(_model_config_discriminator), -] - - class RunConfig(BaseSettings): """A New End-to-end Framework for Evaluating Voice Agents\033[94m @@ -529,7 +330,8 @@ class ModelDeployment(DeploymentTypedDict): model_list: list[ModelDeployment] = Field(min_length=1) # Model to test - model: ModelConfigUnion = Field( + model: ModelConfig = Field( + default_factory=ModelConfig, description="Pipeline (STT + LLM + TTS), speech-to-speech, or audio-LLM model configuration", ) @@ -687,49 +489,65 @@ def tool_mocks_path(self) -> Path: def agent_config_path(self) -> Path: return Path(f"configs/agents/{self.domain}_agent.yaml") - @model_validator(mode="before") - @classmethod - def _warn_deprecated_aliases(cls, data: Any) -> Any: - """Error out if deprecated environment variables are detected.""" - if not isinstance(data, dict): - return data - - # Strip env-var fields from other pipeline modes so extra="forbid" doesn't reject them. - # For metrics-only re-runs, skip the strict conflict check — the model isn't used. - if isinstance(data.get("model"), dict): - force_rerun = bool(data.get("force_rerun_metrics")) - data["model"] = _strip_other_mode_fields(data["model"], strict=not force_rerun) - - return data - @model_validator(mode="after") def _check_companion_services(self) -> "RunConfig": - """Ensure required companion services are set for each pipeline mode.""" + """Validate pipeline mode mutual exclusivity and required companion services. + + Skipped entirely when ``max_rerun_attempts == 0`` where the model + config is unused and conflicting env vars are harmless. + """ + if self.max_rerun_attempts == 0: + return self + + # ── Validate pipeline mode mutual exclusivity ── + active = [ + name + for flag, name in [ + (self.model.llm, "EVA_MODEL__LLM"), + (self.model.s2s, "EVA_MODEL__S2S"), + (self.model.audio_llm, "EVA_MODEL__AUDIO_LLM"), + ] + if flag + ] + if len(active) != 1: + raise ValueError( + (f"Multiple pipeline modes set: {', '.join(active)}. " if active else "Model pipeline required. ") + + "Set exactly one of: EVA_MODEL__LLM (TTS+LLM+TTS), EVA_MODEL__S2S (S2S), or EVA_MODEL__AUDIO_LLM (Audio LLM+TTS)." + ) + + # ── Validate companion services ── errors: list[InitErrorDetails] = [] - if isinstance(self.model, PipelineConfig): - errors.extend(self._validate_service_params("STT", self.model.stt, self.model.stt_params)) - errors.extend(self._validate_service_params("TTS", self.model.tts, self.model.tts_params)) - elif isinstance(self.model, AudioLLMConfig): - errors.extend(self._validate_service_params("TTS", self.model.tts, self.model.tts_params)) - errors.extend(self._validate_service_params("AUDIO_LLM", self.model.audio_llm, self.model.audio_llm_params)) - elif isinstance(self.model, SpeechToSpeechConfig): - errors.extend(self._validate_service_params("S2S", self.model.s2s, self.model.s2s_params)) + match self.model.pipeline_type: + case PipelineType.CASCADE: + errors.extend(self._validate_service_params("STT", self.model.stt, self.model.stt_params)) + errors.extend(self._validate_service_params("TTS", self.model.tts, self.model.tts_params)) + case PipelineType.AUDIO_LLM: + errors.extend(self._validate_service_params("TTS", self.model.tts, self.model.tts_params)) + errors.extend( + self._validate_service_params("AUDIO_LLM", self.model.audio_llm, self.model.audio_llm_params) + ) + case PipelineType.S2S: + errors.extend(self._validate_service_params("S2S", self.model.s2s, self.model.s2s_params)) if errors: raise ValidationError.from_exception_data(title=type(self).__name__, line_errors=errors) - return self - @model_validator(mode="after") - def _set_default_run_id(self) -> "RunConfig": + # ── Set default run_id ── + # self.model.pipeline_parts is only available if self.model is valid, which the above asserts. if "run_id" not in self.model_fields_set: suffix = "_".join(v for v in self.model.pipeline_parts.values() if v) self.run_id = f"{datetime.now(UTC):%Y-%m-%d_%H-%M-%S.%f}_{suffix}" + return self - @classmethod def _validate_service_params( - cls, service: str, provider: str, params: dict[str, Any] + self, service: str, provider: str | None, params: dict[str, Any] | None ) -> Iterator[InitErrorDetails]: - """Validate that STT/TTS params contain required keys.""" + """Validate that the service's name is set and its params contain the required keys.""" + if not provider: + message = f"EVA_MODEL__{service} required in {self.model.pipeline_type} mode." + loc = ("model", service.lower()) + yield InitErrorDetails(type=PydanticCustomError("missing_service", message), loc=loc, input=provider) + required_keys = ["api_key", "model"] missing = [key for key in required_keys if key not in params] if params else required_keys if missing: @@ -788,9 +606,9 @@ def _redact_model_list(cls, deployments: list[ModelDeployment]) -> list[dict]: @field_serializer("model") @classmethod - def _redact_model_params(cls, model: ModelConfigUnion) -> dict: + def _redact_model_params(cls, model: ModelConfig) -> dict: """Redact secret values in STT/TTS/S2S/AudioLLM params when serializing.""" - data = model.model_dump(mode="json") + data = model.model_dump(mode="json", exclude_none=True) for field_name, value in data.items(): if field_name.endswith("_params") and isinstance(value, dict): data[field_name] = cls._redact_dict(value) diff --git a/src/eva/orchestrator/runner.py b/src/eva/orchestrator/runner.py index d9424bc9..7341b2dd 100644 --- a/src/eva/orchestrator/runner.py +++ b/src/eva/orchestrator/runner.py @@ -10,7 +10,7 @@ from eva.metrics.legacy_aliases import rename_metric_keys, rename_metric_list from eva.metrics.runner import MetricsRunner, MetricsRunResult from eva.models.agents import AgentConfig -from eva.models.config import PipelineConfig, RunConfig +from eva.models.config import PipelineType, RunConfig from eva.models.record import EvaluationRecord from eva.models.results import ConversationResult, RunResult from eva.orchestrator.port_pool import PortPool @@ -125,7 +125,7 @@ async def run(self, records: list[EvaluationRecord]) -> RunResult: (self.output_dir / "records").mkdir(exist_ok=True) # Resolve exact models used (captures defaults from services.py + any alias labels) - if isinstance(self.config.model, PipelineConfig): + if self.config.model.pipeline_type == PipelineType.CASCADE: stt_params = self.config.model.stt_params tts_params = self.config.model.tts_params self.config.resolved_models = { diff --git a/src/eva/run_benchmark.py b/src/eva/run_benchmark.py index 8e581815..0fc3a7e7 100644 --- a/src/eva/run_benchmark.py +++ b/src/eva/run_benchmark.py @@ -6,7 +6,7 @@ from dotenv import load_dotenv from eva.metrics.runner import MetricsRunner -from eva.models.config import PipelineConfig, RunConfig +from eva.models.config import PipelineType, RunConfig from eva.models.record import EvaluationRecord from eva.orchestrator.runner import BenchmarkRunner from eva.utils import router @@ -106,7 +106,7 @@ async def run_benchmark(config: RunConfig) -> int: if config.dry_run: logger.info("Dry run - configuration validated successfully") logger.info(f" Dataset: {len(records)} records") - if isinstance(config.model, PipelineConfig): + if config.model.pipeline_type == PipelineType.CASCADE: logger.info(f" STT model: {config.model.stt}") logger.info(f" LLM model: {config.model.llm}") logger.info(f" TTS model: {config.model.tts}") diff --git a/tests/integration/test_evaluation_mode.py b/tests/integration/test_evaluation_mode.py index f012593d..a70c9980 100644 --- a/tests/integration/test_evaluation_mode.py +++ b/tests/integration/test_evaluation_mode.py @@ -15,7 +15,7 @@ import pytest -from eva.models.config import PipelineConfig, RunConfig +from eva.models.config import ModelConfig, RunConfig from eva.models.record import EvaluationRecord, GroundTruth from eva.models.results import ConversationResult from eva.orchestrator.runner import BenchmarkRunner @@ -72,7 +72,7 @@ def eval_config(tmp_path): return RunConfig( run_id="test_eval_run", model_list=_TEST_MODEL_LIST, - model=PipelineConfig( + model=ModelConfig( llm="gpt-4", stt="deepgram", tts="cartesia", diff --git a/tests/unit/assistant/test_pipecat_server.py b/tests/unit/assistant/test_pipecat_server.py index 14c8f621..f0efd8e0 100644 --- a/tests/unit/assistant/test_pipecat_server.py +++ b/tests/unit/assistant/test_pipecat_server.py @@ -10,6 +10,7 @@ from eva.assistant.agentic.audit_log import AuditLog from eva.assistant.pipecat_server import SAMPLE_RATE, PipecatAssistantServer +from eva.models.config import PipelineType from eva.utils.audio_utils import save_pcm_as_wav @@ -146,7 +147,7 @@ async def test_saves_audit_log_and_both_scenario_db_snapshots(self, tmp_path): srv.user_audio_buffer = bytearray(b"\x00" * 100) srv.assistant_audio_buffer = bytearray(b"\x00" * 100) # PipelineConfig (not SpeechToSpeechConfig) — transcript.jsonl written via audit log - srv.pipeline_config = MagicMock(spec=[]) + srv.pipeline_config = MagicMock(pipeline_type=PipelineType.CASCADE) # Add an entry so audit_log is non-trivial srv.audit_log.append_user_input("Hello") @@ -170,7 +171,7 @@ async def test_saves_agent_perf_stats_when_agentic_system_present(self, tmp_path srv._audio_buffer = bytearray(b"\x00" * 100) srv.user_audio_buffer = bytearray(b"\x00" * 100) srv.assistant_audio_buffer = bytearray(b"\x00" * 100) - srv.pipeline_config = MagicMock(spec=[]) + srv.pipeline_config = MagicMock(pipeline_type=PipelineType.CASCADE) mock_system = MagicMock() srv.agentic_system = mock_system diff --git a/tests/unit/models/test_config_models.py b/tests/unit/models/test_config_models.py index c076c996..ae07ce9a 100644 --- a/tests/unit/models/test_config_models.py +++ b/tests/unit/models/test_config_models.py @@ -9,7 +9,7 @@ from pydantic import ValidationError from pydantic_settings import SettingsError -from eva.models.config import RunConfig, SpeechToSpeechConfig +from eva.models.config import PipelineType, RunConfig MODEL_LIST = [ { @@ -377,12 +377,11 @@ def test_invalid_model_list(self, environ, expected_exception, expected_message) ( ( {}, - r"model\s+Field required", + "Model pipeline required", ), ( {"EVA_MODEL": "{}"}, - # Discriminator defaults to PipelineConfig when no unique field present - r"model\.pipeline\.llm\s+Field required", + "Model pipeline required", ), ( {"EVA_MODEL__LLM": "a", "EVA_MODEL__S2S": "b"}, @@ -402,19 +401,19 @@ def test_invalid_model_list(self, environ, expected_exception, expected_message) ), ( {"EVA_MODEL__LLM": "gpt-5.2", "EVA_MODEL__TTS": "cartesia"}, - r"model\.pipeline\.stt\s+Field required", + "EVA_MODEL__STT required in cascade mode", ), ( {"EVA_MODEL__LLM": "gpt-5.2", "EVA_MODEL__STT": "deepgram"}, - r"model\.pipeline\.tts\s+Field required", + "EVA_MODEL__TTS required in cascade mode", ), ( {"EVA_MODEL__LLM": "gpt-5.2"}, - r"model\.pipeline\.stt\s+Field required(?s:.+)model\.pipeline\.tts\s+Field required", + "EVA_MODEL__STT required in cascade mode(?s:.+)EVA_MODEL__TTS required in cascade mode", ), ( {"EVA_MODEL__AUDIO_LLM": "ultravox"}, - r"model\.audio_llm\.tts\s+Field required", + "EVA_MODEL__TTS required in audio_llm mode", ), ), ids=( @@ -462,6 +461,41 @@ def test_missing_stt_tts_params(self): ) +class TestMaxRerunAttemptsZeroSkipsConflictCheck: + """When max_rerun_attempts=0, multiple pipeline modes don't raise.""" + + def test_multiple_modes_allowed_with_zero_rerun_attempts(self): + """max_rerun_attempts=0 suppresses the 'Multiple pipeline modes' error.""" + # Without max_rerun_attempts=0, this raises ValueError + with pytest.raises(ValidationError, match="Multiple pipeline modes set"): + _config(env_vars=_EVA_MODEL_LIST_ENV | {"EVA_MODEL__LLM": "a", "EVA_MODEL__S2S": "b"}) + + # With max_rerun_attempts=0, it should not raise the conflict error. + # It will still fail validation for other reasons (missing stt/tts params etc.), + # but the "Multiple pipeline modes" error must be suppressed. + try: + _config( + env_vars=_EVA_MODEL_LIST_ENV | {"EVA_MODEL__LLM": "a", "EVA_MODEL__S2S": "b"}, + max_rerun_attempts=0, + ) + except Exception as exc: + assert "Multiple pipeline modes set" not in str(exc) + + def test_single_mode_still_works_with_zero_rerun_attempts(self): + """max_rerun_attempts=0 doesn't break normal single-mode configs.""" + config = _config(env_vars=_BASE_ENV, max_rerun_attempts=0) + assert config.max_rerun_attempts == 0 + assert config.model.llm == "gpt-5.2" + + def test_no_model_config_with_zero_rerun_attempts(self): + """max_rerun_attempts=0 allows omitting model config entirely.""" + config = _config(env_vars=_EVA_MODEL_LIST_ENV, max_rerun_attempts=0) + assert config.max_rerun_attempts == 0 + assert config.model.llm is None + assert config.model.s2s is None + assert config.model.audio_llm is None + + class TestDefaults: """Verify default values match expectations.""" @@ -842,6 +876,142 @@ def test_model_used_when_no_alias(self): assert "nova-2" in config.run_id +class TestApplyEnvOverridesAcrossModes: + """Verify apply_env_overrides works for non-cascade pipeline modes and cross-mode scenarios.""" + + _S2S_ENV_WITH_SECRET = _EVA_MODEL_LIST_ENV | { + "EVA_MODEL__S2S": "gpt-realtime-mini", + "EVA_MODEL__S2S_PARAMS": json.dumps({"api_key": "secret_s2s_key", "model": "rt-mini"}), + } + _AUDIO_LLM_ENV = _EVA_MODEL_LIST_ENV | { + "EVA_MODEL__AUDIO_LLM": "ultravox", + "EVA_MODEL__AUDIO_LLM_PARAMS": json.dumps({"api_key": "secret_allm_key", "model": "ultravox-v1"}), + "EVA_MODEL__TTS": "cartesia", + "EVA_MODEL__TTS_PARAMS": json.dumps({"api_key": "secret_tts_key", "model": "sonic"}), + } + + def test_s2s_round_trip_restores_secrets(self): + """S2S config: save → redact → reload → apply_env_overrides restores api_key.""" + config = _config(env_vars=self._S2S_ENV_WITH_SECRET) + assert config.model.s2s_params["api_key"] == "secret_s2s_key" + + dumped = config.model_dump_json() + loaded = _load_json_into_runconfig(dumped) + + # Secrets are redacted after round-trip + assert loaded.model.s2s_params["api_key"] == "***" + + loaded.apply_env_overrides(config) + assert loaded.model.s2s_params["api_key"] == "secret_s2s_key" + assert loaded.model.s2s_params["model"] == "rt-mini" + + def test_audio_llm_round_trip_restores_secrets(self): + """AudioLLM config: save → redact → reload → apply_env_overrides restores api_keys.""" + config = _config(env_vars=self._AUDIO_LLM_ENV) + dumped = config.model_dump_json() + loaded = _load_json_into_runconfig(dumped) + + # Both audio_llm_params and tts_params should be redacted + assert loaded.model.audio_llm_params["api_key"] == "***" + assert loaded.model.tts_params["api_key"] == "***" + + loaded.apply_env_overrides(config) + assert loaded.model.audio_llm_params["api_key"] == "secret_allm_key" + assert loaded.model.tts_params["api_key"] == "secret_tts_key" + + def test_s2s_saved_with_cascade_env_active(self): + """Saved S2S config loaded while cascade env is active still restores S2S secrets. + + This is the scenario from commit dd47960: from_existing_run loads a saved S2S config + but the current environment has cascade (LLM+STT+TTS) vars set. apply_env_overrides + should restore the S2S secrets from a live S2S config, not the cascade one. + """ + s2s_config = _config(env_vars=self._S2S_ENV_WITH_SECRET) + dumped = s2s_config.model_dump_json() + + # Reload with isolated env (simulating from_existing_run's _StoredRunConfig) + loaded = _load_json_into_runconfig(dumped) + assert loaded.model.s2s_params["api_key"] == "***" + + # apply_env_overrides with the original S2S config + loaded.apply_env_overrides(s2s_config) + assert loaded.model.s2s_params["api_key"] == "secret_s2s_key" + + def test_zero_rerun_attempts_preserves_first_mode_params(self): + """With max_rerun_attempts=0 + conflicting modes, the surviving mode's params are intact.""" + # Cascade config saved normally + cascade_config = _config(env_vars=_BASE_ENV) + dumped = cascade_config.model_dump_json() + loaded = _load_json_into_runconfig(dumped) + assert loaded.model.stt_params["api_key"] == "***" + + # Restore secrets — should work even when max_rerun_attempts=0 is set elsewhere + loaded.apply_env_overrides(cascade_config) + assert loaded.model.stt_params["api_key"] == "test_key" + assert loaded.model.tts_params["api_key"] == "test_key" + + +class TestStripOtherModeFields: + """Cross-mode fields are stripped so extra='forbid' on each config class doesn't reject them.""" + + def test_s2s_with_leftover_pipeline_fields(self): + """S2S config ignores leftover stt/tts/llm fields from a previous pipeline setup.""" + config = _config( + env_vars=_EVA_MODEL_LIST_ENV + | { + "EVA_MODEL__S2S": "gpt-realtime-mini", + "EVA_MODEL__S2S_PARAMS": json.dumps({"api_key": "", "model": "gpt-realtime-mini"}), + # Leftover pipeline fields that should be stripped + "EVA_MODEL__STT": "deepgram", + "EVA_MODEL__TTS": "cartesia", + "EVA_MODEL__STT_PARAMS": json.dumps({"api_key": "k", "model": "nova-2"}), + "EVA_MODEL__TTS_PARAMS": json.dumps({"api_key": "k", "model": "sonic"}), + } + ) + assert config.model.s2s == "gpt-realtime-mini" + + def test_pipeline_with_leftover_s2s_fields(self): + """Pipeline config ignores leftover s2s_params from a previous S2S setup.""" + config = _config( + env_vars=_BASE_ENV + | { + # Leftover S2S field (s2s_params without s2s won't trigger exclusivity) + "EVA_MODEL__S2S_PARAMS": json.dumps({"api_key": "", "model": "old-model"}), + } + ) + assert config.model.llm == "gpt-5.2" + + def test_audio_llm_with_leftover_pipeline_fields(self): + """Audio LLM config ignores leftover stt/llm fields from a previous pipeline setup.""" + config = _config( + env_vars=_EVA_MODEL_LIST_ENV + | { + "EVA_MODEL__AUDIO_LLM": "vllm", + "EVA_MODEL__AUDIO_LLM_PARAMS": json.dumps( + {"api_key": "k", "model": "ultravox", "base_url": "http://localhost:8000"} + ), + "EVA_MODEL__TTS": "cartesia", + "EVA_MODEL__TTS_PARAMS": json.dumps({"api_key": "k", "model": "sonic"}), + # Leftover pipeline fields that should be stripped + "EVA_MODEL__STT": "deepgram", + "EVA_MODEL__STT_PARAMS": json.dumps({"api_key": "k", "model": "nova-2"}), + } + ) + assert config.model.audio_llm == "vllm" + + def test_pipeline_with_leftover_audio_llm_fields(self): + """Pipeline config ignores leftover audio_llm_params from a previous audio LLM setup.""" + config = _config( + env_vars=_BASE_ENV + | { + "EVA_MODEL__AUDIO_LLM_PARAMS": json.dumps( + {"api_key": "k", "model": "ultravox", "base_url": "http://localhost:8000"} + ), + } + ) + assert config.model.llm == "gpt-5.2" + + class TestSpeechToSpeechConfig: """Tests for SpeechToSpeechConfig discriminated union.""" @@ -854,7 +1024,7 @@ def test_s2s_config_from_env(self): "EVA_MODEL__S2S_PARAMS": json.dumps({"api_key": "", "model": "gpt-realtime-mini"}), } ) - assert isinstance(config.model, SpeechToSpeechConfig) + assert config.model.pipeline_type == PipelineType.S2S assert config.model.s2s == "gpt-realtime-mini" def test_s2s_config_from_cli(self): @@ -868,7 +1038,7 @@ def test_s2s_config_from_cli(self): '{"api_key": "test-key", "model": "gemini_live"}', ], ) - assert isinstance(config.model, SpeechToSpeechConfig) + assert config.model.pipeline_type == PipelineType.S2S assert config.model.s2s == "gemini_live" assert config.model.s2s_params == {"api_key": "test-key", "model": "gemini_live"} @@ -881,5 +1051,5 @@ def test_s2s_config_with_params(self): "s2s_params": {"voice": "alloy", "api_key": "key_1", "model": "gpt-realtime-mini"}, }, ) - assert isinstance(config.model, SpeechToSpeechConfig) + assert config.model.pipeline_type == PipelineType.S2S assert config.model.s2s_params == {"voice": "alloy", "api_key": "key_1", "model": "gpt-realtime-mini"} diff --git a/tests/unit/orchestrator/test_runner.py b/tests/unit/orchestrator/test_runner.py index eacfe4d5..89751b5c 100644 --- a/tests/unit/orchestrator/test_runner.py +++ b/tests/unit/orchestrator/test_runner.py @@ -8,7 +8,7 @@ import pytest -from eva.models.config import PipelineConfig, RunConfig +from eva.models.config import ModelConfig, RunConfig from eva.models.results import ConversationResult from eva.orchestrator.runner import BenchmarkRunner from tests.unit.conftest import make_evaluation_record @@ -25,7 +25,7 @@ def _make_record(record_id: str): def _make_config(tmp_path: Path, max_concurrent: int = 3) -> RunConfig: """Create a minimal RunConfig for testing.""" return RunConfig( - model=PipelineConfig( + model=ModelConfig( llm="test-model", stt="deepgram", tts="cartesia", @@ -194,6 +194,86 @@ def test_sets_output_dir_to_run_dir(self, tmp_path): assert runner.output_dir == run_dir + def test_ignores_env_vars_when_loading_saved_config(self, tmp_path): + """from_existing_run loads the saved config without env var contamination. + + If the current environment has a different pipeline mode set (e.g. EVA_MODEL__LLM) + but the saved run used S2S, the saved config should load without conflicts. + """ + # Create a saved S2S config on disk (in a clean env to avoid conflicts) + with patch.dict(os.environ, {}, clear=True): + s2s_config = RunConfig( + model={"s2s": "gpt-realtime-mini", "s2s_params": {"api_key": "k", "model": "rt"}}, + model_list=_MODEL_LIST, + run_id="s2s-run", + output_dir=tmp_path / "output", + ) + run_dir = tmp_path / "run" + run_dir.mkdir() + (run_dir / "config.json").write_text(s2s_config.model_dump_json(indent=2)) + + # Set env vars for a *different* pipeline mode — these must be ignored + conflicting_env = _BASE_ENV | { + "EVA_MODEL__LLM": "gpt-5.2", + "EVA_MODEL__STT": "deepgram", + "EVA_MODEL__TTS": "cartesia", + "EVA_MODEL__STT_PARAMS": json.dumps({"api_key": "k", "model": "nova-2"}), + "EVA_MODEL__TTS_PARAMS": json.dumps({"api_key": "k", "model": "sonic"}), + } + with patch.dict(os.environ, conflicting_env, clear=True): + with patch.object(BenchmarkRunner, "_load_agent_config", return_value=MagicMock()): + runner = BenchmarkRunner.from_existing_run(run_dir) + + assert runner.config.model.s2s == "gpt-realtime-mini" + assert runner.output_dir == run_dir + + def test_from_existing_run_then_apply_env_overrides_restores_secrets(self, tmp_path): + """Full flow: save S2S config → from_existing_run → apply_env_overrides with a live config built from an env where cascade vars also leak in. + + The live config is the one that carries fresh secrets from the current + environment. If that environment has both S2S *and* cascade vars set + (e.g. because .env contains both), max_rerun_attempts=0 must allow + constructing the live config and apply_env_overrides must still restore + the S2S secrets. + """ + # Create a saved S2S config with a real secret + with patch.dict(os.environ, {}, clear=True): + saved = RunConfig( + model={"s2s": "gpt-realtime-mini", "s2s_params": {"api_key": "real_secret", "model": "rt"}}, + model_list=_MODEL_LIST, + run_id="s2s-run", + output_dir=tmp_path / "output", + ) + run_dir = tmp_path / "run" + run_dir.mkdir() + (run_dir / "config.json").write_text(saved.model_dump_json(indent=2)) + + # Load via from_existing_run (env doesn't matter — _StoredRunConfig ignores it) + with patch.dict(os.environ, _BASE_ENV, clear=True): + with patch.object(BenchmarkRunner, "_load_agent_config", return_value=MagicMock()): + runner = BenchmarkRunner.from_existing_run(run_dir) + + assert runner.config.model.s2s_params["api_key"] == "***" + + # Build the live config from an env that has both S2S and cascade vars. + # max_rerun_attempts=0 suppresses the conflict error. + conflicting_env = _BASE_ENV | { + "EVA_MODEL__S2S": "gpt-realtime-mini", + "EVA_MODEL__S2S_PARAMS": json.dumps({"api_key": "fresh_secret", "model": "rt"}), + "EVA_MODEL__LLM": "gpt-5.2", + "EVA_MODEL__STT": "deepgram", + "EVA_MODEL__TTS": "cartesia", + "EVA_MODEL__STT_PARAMS": json.dumps({"api_key": "k", "model": "nova-2"}), + "EVA_MODEL__TTS_PARAMS": json.dumps({"api_key": "k", "model": "sonic"}), + "EVA_MODEL__AUDIO_LLM": "whatever", + } + with patch.dict(os.environ, conflicting_env, clear=True): + live = RunConfig(max_rerun_attempts=0, _cli_parse_args=[]) + + # apply_env_overrides restores secrets from the live config + runner.config.apply_env_overrides(live, strict_llm=False) + assert runner.config.model.s2s_params["api_key"] == "fresh_secret" + def test_missing_config_json_raises_file_not_found(self, tmp_path): run_dir = tmp_path / "no_config" run_dir.mkdir()