Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions docs/assistant_server_contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ produce `ConversationResult`. The server name must be registered in
## Quick checklist

- [ ] Subclass `AbstractAssistantServer`
- [ ] Assert `isinstance(self.pipeline_config, SpeechToSpeechConfig)` (or the correct
config type) in `__init__`
- [ ] Read `self.pipeline_config.s2s_params` (a `ModelConfig` field) in `__init__`
- [ ] Expose `ws://localhost:{self.port}/ws` accepting Twilio-framed audio
- [ ] Override `_audio_sample_rate` to match the recording sample rate
- [ ] Populate `self.user_audio_buffer` and `self.assistant_audio_buffer` during
Expand Down Expand Up @@ -58,26 +57,19 @@ Call `super().__init__(**kwargs)` first. The base class sets up:
| `self._audio_buffer` | `bytearray` | Mixed audio (leave empty — base class mixes automatically) |
| `self._audio_sample_rate` | `int` | Recording sample rate, default 24000 |

After calling `super().__init__()`, narrow the config type and initialize state:
After calling `super().__init__()`, read params and initialize state:

```python
def __init__(self, **kwargs):
super().__init__(**kwargs)
if isinstance(self.pipeline_config, SpeechToSpeechConfig):
s2s_params = self.pipeline_config.s2s_params
else:
logger.error("Pipeline config is not SpeechToSpeechConfig")
return
s2s_params = self.pipeline_config.s2s_params or {}
self._model = s2s_params["model"] # model is required in the s2s params config
self._audio_sample_rate = SAMPLE_RATE # match recording rate
self._fw_log: FrameworkLogWriter | None = None
self._metrics_log: MetricsLogWriter | None = None
# ... other setup
```

The assertion fails fast with a clear message rather than an obscure `AttributeError`
later.

---

## 2. `start()` — server startup
Expand Down Expand Up @@ -352,16 +344,13 @@ async def save_outputs(self) -> None:

## 9. Config type

`pipeline_config` arrives as a union type from the orchestrator. For s2s models it
will be `SpeechToSpeechConfig`, which exposes:
`pipeline_config` is a `ModelConfig` instance from the orchestrator. For s2s models, it exposes:

```python
self.pipeline_config.s2s # model identifier string
self.pipeline_config.s2s_params # dict of additional params (api_key, voice, model, etc.)
```

Return if the config is not `SpeechToSpeechConfig`.

Server should be documented in the relevant `configs/` YAML with `framework:
my_framework` and `model: {s2s: my-model-id, s2s_params: {...}}`.

Expand Down Expand Up @@ -402,18 +391,14 @@ from eva.assistant.audio_bridge import (
sync_buffer_to_position,
)
from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer
from eva.models.config import SpeechToSpeechConfig
from eva.models.config import ModelConfig


class MyFrameworkAssistantServer(AbstractAssistantServer):

def __init__(self, **kwargs):
super().__init__(**kwargs)
if isinstance(self.pipeline_config, SpeechToSpeechConfig):
s2s_params = self.pipeline_config.s2s_params
else:
logger.error("Pipeline config is not SpeechToSpeechConfig")
return
s2s_params = self.pipeline_config.s2s_params or {}
self._model = s2s_params["model"]


Expand Down
4 changes: 2 additions & 2 deletions src/eva/assistant/base_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from eva.assistant.audio_bridge import FrameworkLogWriter, MetricsLogWriter
from eva.assistant.tools.tool_executor import ToolExecutor
from eva.models.agents import AgentConfig
from eva.models.config import AudioLLMConfig, PipelineConfig, SpeechToSpeechConfig
from eva.models.config import ModelConfig
from eva.utils.audio_utils import save_pcm_as_wav
from eva.utils.logging import get_logger

Expand All @@ -43,7 +43,7 @@ class AbstractAssistantServer(ABC):
def __init__(
self,
current_date_time: str,
pipeline_config: PipelineConfig | SpeechToSpeechConfig | AudioLLMConfig,
pipeline_config: ModelConfig,
agent: AgentConfig,
agent_config_path: str,
scenario_db_path: str,
Expand Down
12 changes: 3 additions & 9 deletions src/eva/assistant/elevenlabs_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import json
import time
from pathlib import Path
from typing import Any

try:
import audioop
Expand Down Expand Up @@ -45,7 +44,7 @@
from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer
from eva.assistant.elevenlabs_audio_interface import TwilioAudioBridge
from eva.models.agents import AgentConfig
from eva.models.config import SpeechToSpeechConfig
from eva.models.config import ModelConfig
from eva.utils.logging import get_logger
from eva.utils.prompt_manager import PromptManager

Expand Down Expand Up @@ -113,7 +112,7 @@ class ElevenLabsAssistantServer(AbstractAssistantServer):
def __init__(
self,
current_date_time: str,
pipeline_config: SpeechToSpeechConfig,
pipeline_config: ModelConfig,
agent: AgentConfig,
agent_config_path: str,
scenario_db_path: str,
Expand All @@ -135,12 +134,7 @@ def __init__(
# Recording sample rate (ElevenLabs operates at 16 kHz)
self._audio_sample_rate = _RECORDING_SAMPLE_RATE

s2s_params: dict[str, Any] = {}
if isinstance(self.pipeline_config, SpeechToSpeechConfig):
s2s_params = self.pipeline_config.s2s_params
else:
logger.error("Pipeline config is not SpeechToSpeechConfig")
return
s2s_params = self.pipeline_config.s2s_params or {}
self.s2s_params = s2s_params
self._model = s2s_params.get("model", "elevenlabs")

Expand Down
11 changes: 3 additions & 8 deletions src/eva/assistant/gemini_live_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
)
from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer
from eva.models.agents import AgentConfig
from eva.models.config import SpeechToSpeechConfig
from eva.models.config import ModelConfig
from eva.utils.logging import get_logger
from eva.utils.prompt_manager import PromptManager

Expand Down Expand Up @@ -154,7 +154,7 @@ class GeminiLiveAssistantServer(AbstractAssistantServer):
def __init__(
self,
current_date_time: str,
pipeline_config: SpeechToSpeechConfig,
pipeline_config: ModelConfig,
agent: AgentConfig,
agent_config_path: str,
scenario_db_path: str,
Expand All @@ -177,12 +177,7 @@ def __init__(
self._audio_sample_rate = _RECORDING_SAMPLE_RATE

# Gemini model name from s2s_params or default
s2s_params: dict[str, Any] = {}
if isinstance(self.pipeline_config, SpeechToSpeechConfig):
s2s_params = self.pipeline_config.s2s_params
else:
logger.error("Pipeline config is not SpeechToSpeechConfig")
return
s2s_params = self.pipeline_config.s2s_params or {}
self._model = s2s_params["model"]
self._voice = s2s_params.get("voice", "Kore")
self._language_code = s2s_params.get("language_code", "en-US")
Expand Down
8 changes: 1 addition & 7 deletions src/eva/assistant/openai_realtime_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
sync_buffer_to_position,
)
from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer
from eva.models.config import SpeechToSpeechConfig
from eva.utils.logging import get_logger
from eva.utils.prompt_manager import PromptManager

Expand Down Expand Up @@ -104,12 +103,7 @@ def __init__(self, **kwargs: Any) -> None:
# User speech start timestamp from audio_interface (source of truth)
self._audio_interface_speech_start_ts: str | None = None

if isinstance(self.pipeline_config, SpeechToSpeechConfig):
s2s_params = self.pipeline_config.s2s_params
else:
logger.error("Pipeline config is not SpeechToSpeechConfig")
return

s2s_params = self.pipeline_config.s2s_params or {}
self._model: str = s2s_params["model"]

async def start(self) -> None:
Expand Down
22 changes: 11 additions & 11 deletions src/eva/assistant/pipecat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
)
from eva.assistant.services.llm import LiteLLMClient
from eva.models.agents import AgentConfig
from eva.models.config import AudioLLMConfig, PipelineConfig, SpeechToSpeechConfig
from eva.models.config import ModelConfig, PipelineType
from eva.utils.logging import get_logger

logger = get_logger(__name__)
Expand All @@ -88,7 +88,7 @@ class PipecatAssistantServer(AbstractAssistantServer):
def __init__(
self,
current_date_time: str,
pipeline_config: PipelineConfig | SpeechToSpeechConfig | AudioLLMConfig,
pipeline_config: ModelConfig,
agent: AgentConfig,
agent_config_path: str,
scenario_db_path: str,
Expand Down Expand Up @@ -225,7 +225,7 @@ async def _handle_session(self, websocket) -> None:
audio_llm_processor = None
audio_llm_audio_collector = None
alm_client = None
if isinstance(self.pipeline_config, SpeechToSpeechConfig):
if self.pipeline_config.pipeline_type == PipelineType.S2S:
realtime_llm = create_realtime_llm_service(
self.pipeline_config.s2s,
self.pipeline_config.s2s_params,
Expand Down Expand Up @@ -256,7 +256,7 @@ async def _realtime_tool_handler(params) -> None:
realtime_llm.register_function(function_name=None, handler=_realtime_tool_handler)
stt = None
tts = None
elif isinstance(self.pipeline_config, AudioLLMConfig):
elif self.pipeline_config.pipeline_type == PipelineType.AUDIO_LLM:
# Audio-LLM mode: model handles STT+LLM, TTS still needed
stt = None
tts = create_tts_service(
Expand Down Expand Up @@ -338,8 +338,8 @@ async def _realtime_tool_handler(params) -> None:
# Create Audio-LLM components now that context/user_aggregator are available
audio_llm_audio_collector = None
audio_llm_processor = None
if isinstance(self.pipeline_config, AudioLLMConfig):
assert alm_client is not None # Set in AudioLLMConfig branch above
if self.pipeline_config.pipeline_type == PipelineType.AUDIO_LLM:
assert alm_client is not None # Set in AUDIO_LLM branch above
audio_llm_audio_collector = AudioLLMUserAudioCollector(
context, user_aggregator, pre_speech_secs=VAD_PRE_SPEECH_BUFFER_SECS
)
Expand Down Expand Up @@ -392,7 +392,7 @@ async def on_user_transcription(text: str, timestamp: str, turn_id: int | None)
)
# Create agent processor (pipeline mode only — realtime handles LLM internally)
agent_processor = None
if isinstance(self.pipeline_config, PipelineConfig):
if self.pipeline_config.pipeline_type == PipelineType.CASCADE:
agent_processor = BenchmarkAgentProcessor(
current_date_time=self.current_date_time,
agent=self.agent,
Expand Down Expand Up @@ -589,7 +589,7 @@ async def on_client_connected(transport, client):
await audiobuffer.start_recording()

# Send initial greeting
if isinstance(self.pipeline_config, SpeechToSpeechConfig):
if self.pipeline_config.pipeline_type == PipelineType.S2S:
await task.queue_frames([LLMRunFrame()])
else:
await task.queue_frames([TTSSpeakFrame(INITIAL_MESSAGE)])
Expand Down Expand Up @@ -643,13 +643,13 @@ async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMes
logger.info(f"User turn stopped - complete transcript: '{message.content}'")
logger.debug(f"Turn timestamp: {message.timestamp}, user_id: {message.user_id}")

if isinstance(self.pipeline_config, PipelineConfig):
if self.pipeline_config.pipeline_type == PipelineType.CASCADE:
# STT provides real transcript text — save it now
await self._save_transcript_message_from_turn(
role="user", content=message.content, timestamp=message.timestamp
)
await agent_processor.process_complete_user_turn(message.content)
elif isinstance(self.pipeline_config, AudioLLMConfig) and audio_llm_processor:
elif self.pipeline_config.pipeline_type == PipelineType.AUDIO_LLM and audio_llm_processor:
# No STT → message.content is empty.
# Processing is triggered by LLMContextFrame flow through ParallelPipeline
# (AudioLLMUserAudioCollector pushes LLMContextFrame on UserStoppedSpeakingFrame)
Expand Down Expand Up @@ -721,7 +721,7 @@ def _save_transcript(self) -> None:
For pipeline mode, only write if not already written incrementally.
"""
transcript_path = self.output_dir / "transcript.jsonl"
if isinstance(self.pipeline_config, SpeechToSpeechConfig) or not transcript_path.exists():
if self.pipeline_config.pipeline_type == PipelineType.S2S or not transcript_path.exists():
self.audit_log.save_transcript_jsonl(transcript_path)

async def save_outputs(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion src/eva/metrics/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _load_agent_config(self) -> dict[str, Any]:

config_data = json.loads(config_path.read_text())

# Determine pipeline type from config (fallback to False for legacy runs)
# Determine pipeline type from config
model_data = config_data.get("model", {})
self._pipeline_type = get_pipeline_type(model_data) if model_data else PipelineType.CASCADE

Expand Down
10 changes: 4 additions & 6 deletions src/eva/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
AgentToolParameter,
)
from eva.models.config import (
AudioLLMConfig,
PipelineConfig,
ModelConfig,
PipelineType,
RunConfig,
SpeechToSpeechConfig,
)
from eva.models.record import (
AgentOverride,
Expand All @@ -37,9 +36,8 @@
"AgentOverride",
# Config models
"RunConfig",
"PipelineConfig",
"SpeechToSpeechConfig",
"AudioLLMConfig",
"ModelConfig",
"PipelineType",
# Result models
"ConversationResult",
"MetricScore",
Expand Down
Loading
Loading