From 77e858cce98e696ebfcad59c25564794551c1ba0 Mon Sep 17 00:00:00 2001 From: "wei.zhong1" Date: Tue, 9 Jun 2026 19:34:16 -0700 Subject: [PATCH 1/3] Add Deepgram Voice Agent framework support Adds a `deepgram` assistant-server framework backed by Deepgram's Voice Agent API (unified STT->LLM->TTS over a single WebSocket), so it can be benchmarked like the existing S2S frameworks. - New DeepgramAssistantServer (src/eva/assistant/deepgram_server.py), modeled on the Gemini Live server and the assistant_server_contract. - Register `deepgram` in worker._get_server_class and the framework Literal. - Parse raw WebSocket JSON by event `type` rather than the SDK's typed iterator, which in deepgram-sdk 6.1.x mis-deserializes every agent event as the same model (dropping transcripts and tool-call requests). - KeepAlive task to prevent Deepgram's ~10s input-audio timeout from closing the session while the (half-duplex) agent is speaking. - Compute model_response latency from the server-side receipt time of user_speech_stop (the simulator emits it on a monotonic clock). - Unit tests for settings/tool conversion + framework dispatch test. - Docs section in assistant_server_contract.md; .env.example framework enum. - Bump simulation_version 2.0.0 -> 2.1.0 (affects benchmark outputs). --- .env.example | 2 +- docs/assistant_server_contract.md | 28 + src/eva/__init__.py | 2 +- src/eva/assistant/deepgram_server.py | 532 ++++++++++++++++++ src/eva/models/config.py | 3 +- src/eva/orchestrator/worker.py | 6 +- tests/unit/assistant/test_deepgram_server.py | 103 ++++ .../orchestrator/test_framework_dispatch.py | 6 + 8 files changed, 678 insertions(+), 4 deletions(-) create mode 100644 src/eva/assistant/deepgram_server.py create mode 100644 tests/unit/assistant/test_deepgram_server.py diff --git a/.env.example b/.env.example index dd78e6ef..d84bc222 100644 --- a/.env.example +++ b/.env.example @@ -110,7 +110,7 @@ EVA_MODEL__TTS_PARAMS='{"api_key": "your_cartesia_api_key", "model": "sonic"}' # --- Framework (S2S / AudioLLM) --- #i Base framework for S2S or AudioLLM pipelines. #d enum -#e pipecat,openai_realtime,gemini_live,elevenlabs,grok_voice +#e pipecat,openai_realtime,gemini_live,elevenlabs,grok_voice,deepgram #v EVA_FRAMEWORK=openai_realtime # ============================================== diff --git a/docs/assistant_server_contract.md b/docs/assistant_server_contract.md index 62124ac5..b2363572 100644 --- a/docs/assistant_server_contract.md +++ b/docs/assistant_server_contract.md @@ -535,3 +535,31 @@ the run to fail or produce `None` latency fields in the result. | `audio_assistant.wav` | Yes | TTS quality metrics | | `framework_logs.jsonl` | Yes | Turn boundary metrics | | `pipecat_metrics.jsonl` | Yes | `model_response_latency` in `ConversationResult` | + +--- + +## 13. Reference implementation: Deepgram Voice Agent + +`src/eva/assistant/deepgram_server.py` (`framework: deepgram`) bridges to Deepgram's +**Voice Agent API** (unified STT→LLM→TTS over one WebSocket) via the `deepgram-sdk` +`client.agent.v1.connect()` interface. It is the closest analogue to the Gemini Live +server and a good template for a new S2S framework. + +Notable points specific to Deepgram: + +- **Config.** `framework: deepgram`, `model: {s2s: deepgram, s2s_params: {...}}`. Recognised + `s2s_params`: `api_key` (required), `think_provider` (default `open_ai`), + `think_model` / `model` (LLM + metrics label, default `gpt-4o-mini`), + `listen_model` (STT, default `nova-3`), `speak_model` (TTS, default `aura-2-thalia-en`), + `language` (default `en`). +- **Settings.** Sent once on connect via `send_settings(AgentV1Settings)`. Built from a plain + dict and validated with `AgentV1Settings.model_validate(...)`, which resolves the + discriminated provider unions. Audio is `linear16` @ 24 kHz both directions with output + `container: "none"` (raw PCM); `agent.greeting` carries `INITIAL_MESSAGE`. +- **Tools.** Configured under `agent.think.functions` (no `endpoint` ⇒ *client-side*), so the + agent emits `FunctionCallRequest` events; reply with `send_function_call_response`. +- **Events.** `async for message in connection` yields raw `bytes` (TTS audio) or typed events + (`ConversationText`, `UserStartedSpeaking`, `AgentStartedSpeaking`, `AgentAudioDone`, + `FunctionCallRequest`, `Error`, `Warning`). +- **Limitation.** The Voice Agent event stream exposes no token-usage event, so token usage is + not reported for this framework. Latency is still emitted on the first audio chunk per turn. diff --git a/src/eva/__init__.py b/src/eva/__init__.py index ecc5f2a8..94accf13 100644 --- a/src/eva/__init__.py +++ b/src/eva/__init__.py @@ -7,7 +7,7 @@ # Bump simulation_version when changes affect benchmark outputs (agent code, # user simulator, orchestrator, simulation prompts, agent configs, tool mocks). -simulation_version = "2.0.0" +simulation_version = "2.1.0" # Bump metrics_version when changes affect metric computation (metrics code, # judge prompts, pricing tables, postprocessor). diff --git a/src/eva/assistant/deepgram_server.py b/src/eva/assistant/deepgram_server.py new file mode 100644 index 00000000..bd1343d2 --- /dev/null +++ b/src/eva/assistant/deepgram_server.py @@ -0,0 +1,532 @@ +"""Deepgram Voice Agent AssistantServer for EVA-Bench. + +Bridges between the Twilio-framed WebSocket (user simulator) and Deepgram's +**Voice Agent API** (a unified STT -> LLM -> TTS agent over a single WebSocket) +via the ``deepgram-sdk`` ``client.agent.v1.connect()`` interface. Audio flows: + + User simulator (8 kHz mulaw) + -> 24 kHz PCM16 -> Deepgram agent input + Deepgram agent output (24 kHz PCM16) + -> 8 kHz mulaw -> User simulator + +All tool calls are executed locally via ``ToolExecutor`` (the agent is configured +with *client-side* functions, so Deepgram emits ``FunctionCallRequest`` events and +we reply with ``send_function_call_response``). ``ConversationText`` events populate +the audit log. + +Note: the Voice Agent event stream does not expose token usage, so token usage is +not reported for this framework (latency is still emitted on the first audio chunk +of each turn). +""" + +from __future__ import annotations + +import asyncio +import contextlib +import json +import time +from typing import Any + +import uvicorn +from deepgram import AsyncDeepgramClient +from deepgram.agent.v1.types.agent_v1send_function_call_response import AgentV1SendFunctionCallResponse +from deepgram.agent.v1.types.agent_v1settings import AgentV1Settings +from fastapi import FastAPI, WebSocket, WebSocketDisconnect + +from eva.assistant.audio_bridge import ( + FrameworkLogWriter, + MetricsLogWriter, + create_twilio_media_message, + mulaw_8k_to_pcm16_24k, + parse_twilio_media_message, + pcm16_24k_to_mulaw_8k, + sync_buffer_to_position, +) +from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer +from eva.models.agents import AgentConfig +from eva.utils.logging import get_logger +from eva.utils.prompt_manager import PromptManager + +logger = get_logger(__name__) + +# Deepgram agent runs at 24 kHz PCM16 in both directions (matches the recording rate). +_RECORDING_SAMPLE_RATE = 24000 + +# Audio output pacing: send 160-byte mulaw chunks (20ms at 8kHz) at real-time rate +# so the user simulator's silence detection works correctly. +MULAW_CHUNK_SIZE = 160 # bytes per chunk (20ms at 8kHz, 1 byte per sample) +MULAW_CHUNK_DURATION_S = 0.02 # 20ms per chunk + +# Send a KeepAlive at least this often so Deepgram's ~10s input-audio timeout never +# fires during user silence (e.g. while the agent is speaking). +KEEPALIVE_INTERVAL_S = 5.0 + +# Defaults for the Voice Agent listen/think/speak providers (overridable via s2s_params). +_DEFAULT_LISTEN_MODEL = "nova-3" +_DEFAULT_THINK_PROVIDER = "open_ai" +_DEFAULT_THINK_MODEL = "gpt-4o-mini" +_DEFAULT_SPEAK_MODEL = "aura-2-thalia-en" +_DEFAULT_LANGUAGE = "en" + + +def _agent_tools_to_deepgram(agent: AgentConfig) -> list[dict[str, Any]] | None: + """Convert EVA AgentConfig tools to Deepgram ``think.functions`` (client-side). + + Omitting ``endpoint`` marks each function as client-side, so the agent emits a + ``FunctionCallRequest`` event instead of calling an HTTP endpoint itself. + """ + if not agent.tools: + return None + + functions: list[dict[str, Any]] = [] + for tool in agent.tools: + functions.append( + { + "name": tool.function_name, + "description": f"{tool.name}: {tool.description}", + "parameters": { + "type": "object", + "properties": tool.get_parameter_properties(), + "required": tool.get_required_param_names(), + }, + } + ) + return functions or None + + +class DeepgramAssistantServer(AbstractAssistantServer): + """Bridges Twilio WebSocket <-> Deepgram Voice Agent API for EVA-Bench evaluation.""" + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + # Recording sample rate (Deepgram agent runs at 24 kHz) + self._audio_sample_rate = _RECORDING_SAMPLE_RATE + + s2s_params = self.pipeline_config.s2s_params or {} + self._api_key: str = s2s_params.get("api_key", "") + # ``think_model`` is the LLM driving the agent; used as the metrics label. + # Accept ``model`` as an alias for the contract's "model required" convention. + self._think_model: str = s2s_params.get("think_model") or s2s_params.get("model") or _DEFAULT_THINK_MODEL + self._model = self._think_model + self._think_provider: str = s2s_params.get("think_provider", _DEFAULT_THINK_PROVIDER) + self._listen_model: str = s2s_params.get("listen_model", _DEFAULT_LISTEN_MODEL) + self._speak_model: str = s2s_params.get("speak_model", _DEFAULT_SPEAK_MODEL) + self._language: str = s2s_params.get("language", _DEFAULT_LANGUAGE) + + # Build system prompt (same pattern as the other realtime/S2S servers) + prompt_manager = PromptManager() + self._system_prompt = prompt_manager.get_prompt( + "realtime_agent.system_prompt", + agent_personality=self.agent.description, + agent_instructions=self.agent.instructions, + datetime=self.current_date_time, + ) + + self._functions = _agent_tools_to_deepgram(self.agent) + + # ------------------------------------------------------------------ + # Server lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + """Start the FastAPI WebSocket server (non-blocking).""" + if self._running: + logger.warning("Server already running") + return + + if not self._api_key: + raise ValueError("API key required for Deepgram Voice Agent (set s2s_params.api_key)") + + self.output_dir.mkdir(parents=True, exist_ok=True) + self._fw_log = FrameworkLogWriter(self.output_dir) + self._metrics_log = MetricsLogWriter(self.output_dir) + + self._app = FastAPI() + + @self._app.websocket("/ws") + async def websocket_endpoint(websocket: WebSocket) -> None: + await websocket.accept() + await self._handle_session(websocket) + + @self._app.websocket("/") + async def websocket_root(websocket: WebSocket) -> None: + await websocket.accept() + await self._handle_session(websocket) + + config = uvicorn.Config( + self._app, + host="0.0.0.0", + port=self.port, + log_level="warning", + lifespan="off", + ) + self._server = uvicorn.Server(config) + self._running = True + self._server_task = asyncio.create_task(self._server.serve()) + + while not self._server.started: + await asyncio.sleep(0.01) + + logger.info(f"Deepgram agent server started on ws://localhost:{self.port}") + + async def _shutdown(self) -> None: + """Stop the Deepgram agent server.""" + if not self._running: + return + self._running = False + + if self._server: + self._server.should_exit = True + if self._server_task: + try: + await asyncio.wait_for(self._server_task, timeout=5.0) + except TimeoutError: + self._server_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._server_task + except (asyncio.CancelledError, KeyboardInterrupt): + pass + self._server = None + self._server_task = None + + logger.info(f"Deepgram agent server stopped on port {self.port}") + + # ------------------------------------------------------------------ + # Settings + # ------------------------------------------------------------------ + + def _build_settings(self) -> AgentV1Settings: + """Build the Voice Agent ``Settings`` message. + + Constructed from a plain dict and validated into the typed model; pydantic + resolves the discriminated provider unions and produces the correct wire JSON. + """ + think: dict[str, Any] = { + "provider": {"type": self._think_provider, "model": self._think_model}, + "prompt": self._system_prompt, + } + if self._functions: + think["functions"] = self._functions + + settings_dict: dict[str, Any] = { + "type": "Settings", + "audio": { + "input": {"encoding": "linear16", "sample_rate": self._audio_sample_rate}, + "output": {"encoding": "linear16", "sample_rate": self._audio_sample_rate, "container": "none"}, + }, + "agent": { + "language": self._language, + "greeting": INITIAL_MESSAGE, + "listen": {"provider": {"type": "deepgram", "model": self._listen_model}}, + "think": think, + "speak": {"provider": {"type": "deepgram", "model": self._speak_model}}, + }, + } + return AgentV1Settings.model_validate(settings_dict) + + # ------------------------------------------------------------------ + # Session handler + # ------------------------------------------------------------------ + + async def _handle_session(self, websocket: WebSocket) -> None: + """Bridge a single Twilio WebSocket session with the Deepgram Voice Agent.""" + logger.info("Client connected to Deepgram agent server") + # start() always instantiates these before a session can connect; bind to + # locals so the narrowed (non-None) type is visible inside the nested tasks. + assert self._fw_log is not None and self._metrics_log is not None + fw_log = self._fw_log + metrics_log = self._metrics_log + + stream_sid: str = self.conversation_id + twilio_connected = True + + # Per-turn assistant text accumulated from ConversationText(role=assistant) + _assistant_turn_text: list[str] = [] + + _in_model_turn = False + _user_speaking = False + _user_speech_start_ts: str | None = None # From the simulator's VAD + _user_speech_stop_ts: str | None = None # From the simulator's VAD + _assistant_turn_start_ts: str | None = None # Wall-clock ms of first audio chunk + + # Outbound mulaw chunks; drained by the pacer at real-time rate. + audio_output_queue: asyncio.Queue[bytes] = asyncio.Queue() + + client = AsyncDeepgramClient(api_key=self._api_key) + settings = self._build_settings() + + try: + async with client.agent.v1.connect() as connection: + logger.info(f"Deepgram agent session connected (think_model={self._think_model})") + await connection.send_settings(settings) + fw_log.turn_start() + + # ----- Concurrent tasks ----- + async def _forward_user_audio() -> None: + """Read Twilio WS messages, convert audio, send to Deepgram.""" + nonlocal stream_sid, twilio_connected, _user_speech_start_ts, _user_speech_stop_ts + try: + while twilio_connected and self._running: + try: + raw = await asyncio.wait_for(websocket.receive_text(), timeout=1.0) + except TimeoutError: + continue + + try: + msg = json.loads(raw) + except json.JSONDecodeError: + continue + + event = msg.get("event") + if event == "start": + stream_sid = msg.get("start", {}).get("streamSid", stream_sid) + logger.info(f"Twilio stream started: {stream_sid}") + elif event == "stop": + logger.info("Twilio stream stopped") + twilio_connected = False + break + elif event == "user_speech_start": + _user_speech_start_ts = msg.get("timestamp_ms") + elif event == "user_speech_stop": + # Record our own wall-clock receipt time rather than the event's + # timestamp_ms: the simulator sends user_speech_stop on a monotonic + # clock (unlike the wall-clock user_speech_start), so its value can't + # be diffed against the wall-clock first-audio time. The event arrives + # in ~real time over the local socket, so receipt time is accurate. + _user_speech_stop_ts = str(int(time.time() * 1000)) + elif event == "media": + mulaw_bytes = parse_twilio_media_message(raw) + if mulaw_bytes is None: + continue + pcm_24k = mulaw_8k_to_pcm16_24k(mulaw_bytes) + if not _in_model_turn: + sync_buffer_to_position(self.assistant_audio_buffer, len(self.user_audio_buffer)) + self.user_audio_buffer.extend(pcm_24k) + await connection.send_media(pcm_24k) + except WebSocketDisconnect: + logger.info("Twilio WebSocket disconnected") + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error in user audio forwarder: {e}", exc_info=True) + finally: + twilio_connected = False + + async def _pace_audio_output() -> None: + """Drain audio_output_queue and forward chunks at real-time rate.""" + nonlocal twilio_connected + next_send_time = time.monotonic() + try: + while self._running: + try: + chunk = await asyncio.wait_for(audio_output_queue.get(), timeout=1.0) + except TimeoutError: + continue + + twilio_msg = create_twilio_media_message(stream_sid, chunk) + try: + await websocket.send_text(twilio_msg) + except Exception: + twilio_connected = False + return + + now = time.monotonic() + if next_send_time <= now: + next_send_time = now + next_send_time += MULAW_CHUNK_DURATION_S + sleep_duration = next_send_time - time.monotonic() + if sleep_duration > 0: + await asyncio.sleep(sleep_duration) + except asyncio.CancelledError: + pass + + def _flush_assistant_turn(interrupted: bool) -> None: + nonlocal _assistant_turn_text, _in_model_turn, _assistant_turn_start_ts + full_text = " ".join(_assistant_turn_text).strip() + if full_text: + text = f"{full_text} [interrupted]" if interrupted else full_text + self.audit_log.append_assistant_output(text, timestamp_ms=_assistant_turn_start_ts) + if interrupted: + fw_log.s2s_transcript(full_text) + else: + fw_log.llm_response(full_text) + fw_log.turn_end(was_interrupted=interrupted) + _in_model_turn = False + _assistant_turn_text = [] + _assistant_turn_start_ts = None + + def _drain_audio_queue() -> None: + while not audio_output_queue.empty(): + with contextlib.suppress(asyncio.QueueEmpty): + audio_output_queue.get_nowait() + + async def _process_deepgram_events() -> None: + """Consume events from the Deepgram agent session. + + We iterate the underlying websocket directly and dispatch on the + raw ``type`` field instead of the SDK's typed iterator. In + deepgram-sdk 6.1.x the agent response-union deserialization is not + discriminated by ``type``: it mis-constructs every JSON event as the + same model, so isinstance-based dispatch silently drops transcripts + and tool-call requests. Parsing the JSON ourselves is deterministic. + Binary frames (TTS audio) are delivered as ``bytes`` unchanged. + """ + nonlocal _assistant_turn_text, _in_model_turn, _user_speaking + nonlocal _user_speech_start_ts, _user_speech_stop_ts, _assistant_turn_start_ts + try: + async for raw in connection._websocket: + if not self._running: + break + + # --- Raw TTS audio output (24 kHz PCM16) --- + if isinstance(raw, bytes): + if not raw: + continue + if not _in_model_turn: + _in_model_turn = True + _user_speaking = False + _assistant_turn_start_ts = str(int(round(time.time() * 1000))) + fw_log.turn_start() + + # model_response latency: user speech end -> first audio. + # Absent on the initial greeting (model-initiated) turn. + if _user_speech_stop_ts: + latency_ms = int(_assistant_turn_start_ts) - int(_user_speech_stop_ts) + if 0 < latency_ms < 30_000: + metrics_log.write_latency("model_response", latency_ms / 1000, self._model) + _user_speech_stop_ts = None + + if not _user_speaking: + sync_buffer_to_position(self.user_audio_buffer, len(self.assistant_audio_buffer)) + self.assistant_audio_buffer.extend(raw) + + if twilio_connected: + try: + mulaw = pcm16_24k_to_mulaw_8k(raw) + except Exception as conv_err: + logger.warning(f"Audio conversion error ({len(raw)} bytes): {conv_err}") + continue + offset = 0 + while offset < len(mulaw): + await audio_output_queue.put(mulaw[offset : offset + MULAW_CHUNK_SIZE]) + offset += MULAW_CHUNK_SIZE + continue + + # --- JSON control / transcript events --- + try: + event = json.loads(raw) + except (json.JSONDecodeError, TypeError): + continue + event_type = event.get("type") + + # Conversation transcripts (final per turn) + if event_type == "ConversationText": + text = (event.get("content") or "").strip() + if not text: + continue + if event.get("role") == "user": + _user_speaking = False + logger.info(f"User transcription: {text}") + self.audit_log.append_user_input(text, timestamp_ms=_user_speech_start_ts) + _user_speech_start_ts = None + else: + _assistant_turn_text.append(text) + + # Agent finished speaking -> end of assistant turn + elif event_type == "AgentAudioDone": + logger.debug("Deepgram agent audio done") + _flush_assistant_turn(interrupted=False) + + # User barge-in + elif event_type == "UserStartedSpeaking": + if _in_model_turn: + logger.debug("User barge-in during agent turn") + _user_speaking = True + _drain_audio_queue() + _flush_assistant_turn(interrupted=True) + + # Client-side tool calls + elif event_type == "FunctionCallRequest": + for fn in event.get("functions", []): + raw_args = fn.get("arguments") + try: + arguments = json.loads(raw_args) if raw_args else {} + except json.JSONDecodeError: + arguments = {} + fn_name = fn.get("name", "") + logger.info(f"Tool call: {fn_name}({json.dumps(arguments)})") + result = await self.execute_tool(fn_name, arguments) + await connection.send_function_call_response( + AgentV1SendFunctionCallResponse( + type="FunctionCallResponse", + id=fn.get("id"), + name=fn_name, + content=json.dumps(result), + ) + ) + + elif event_type in ("Error", "FatalError"): + logger.error(f"Deepgram agent error: {event.get('description')}") + elif event_type == "Warning": + logger.warning(f"Deepgram agent warning: {event.get('description')}") + + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error in Deepgram event processor: {e}", exc_info=True) + + async def _send_keepalives() -> None: + """Keep the Deepgram input stream alive during user silence. + + The user simulator is half-duplex and stops sending mic audio while + the agent is speaking. Without input, Deepgram closes the session with + a "did not receive audio within our timeout" error (~10s). Periodic + KeepAlive messages reset that timer; they are no-ops when audio flows. + """ + try: + while self._running and twilio_connected: + await asyncio.sleep(KEEPALIVE_INTERVAL_S) + try: + await connection.send_keep_alive() + except Exception: + break + except asyncio.CancelledError: + pass + + user_task = asyncio.create_task(_forward_user_audio()) + events_task = asyncio.create_task(_process_deepgram_events()) + pacer_task = asyncio.create_task(_pace_audio_output()) + keepalive_task = asyncio.create_task(_send_keepalives()) + + done, pending = await asyncio.wait( + [user_task, events_task, pacer_task, keepalive_task], + return_when=asyncio.FIRST_COMPLETED, + ) + + def _task_name(t: asyncio.Task[None]) -> str: + if t is user_task: + return "user_audio" + if t is events_task: + return "deepgram_events" + if t is keepalive_task: + return "keepalive" + return "audio_pacer" + + for task in done: + exc = task.exception() + if exc: + logger.error(f"Task '{_task_name(task)}' failed: {exc}", exc_info=exc) + else: + logger.info(f"Task '{_task_name(task)}' completed normally") + + for task in pending: + logger.info(f"Cancelling pending task '{_task_name(task)}'") + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + except Exception as e: + logger.error(f"Deepgram agent session error: {e}", exc_info=True) + finally: + logger.info("Client disconnected from Deepgram agent server") diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 6752d54b..0f6451bd 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -379,7 +379,7 @@ class ModelDeployment(DeploymentTypedDict): ) # Framework selection - framework: Literal["pipecat", "openai_realtime", "gemini_live", "elevenlabs", "grok_voice"] = Field( + framework: Literal["pipecat", "openai_realtime", "gemini_live", "elevenlabs", "grok_voice", "deepgram"] = Field( "pipecat", description=( "Agent framework to use for the assistant server." @@ -388,6 +388,7 @@ class ModelDeployment(DeploymentTypedDict): "'gemini_live': Gemini Live API via google-genai." "'elevenlabs': ElevenLabs Conversational AI API." "'grok_voice': xAI Grok voice realtime API." + "'deepgram': Deepgram Voice Agent API." ), ) diff --git a/src/eva/orchestrator/worker.py b/src/eva/orchestrator/worker.py index 61b9db53..7c26c6d9 100644 --- a/src/eva/orchestrator/worker.py +++ b/src/eva/orchestrator/worker.py @@ -46,10 +46,14 @@ def _get_server_class(framework: str) -> type[AbstractAssistantServer]: from eva.assistant.grok_voice_server import GrokVoiceAssistantServer return GrokVoiceAssistantServer + elif framework == "deepgram": + from eva.assistant.deepgram_server import DeepgramAssistantServer + + return DeepgramAssistantServer else: raise ValueError( f"Unknown framework: {framework!r}. " - "Supported: pipecat, openai_realtime, gemini_live, elevenlabs, grok_voice" + "Supported: pipecat, openai_realtime, gemini_live, elevenlabs, grok_voice, deepgram" ) diff --git a/tests/unit/assistant/test_deepgram_server.py b/tests/unit/assistant/test_deepgram_server.py new file mode 100644 index 00000000..b5993f92 --- /dev/null +++ b/tests/unit/assistant/test_deepgram_server.py @@ -0,0 +1,103 @@ +"""Tests for DeepgramAssistantServer settings + tool conversion helpers.""" + +from unittest.mock import MagicMock + +from deepgram.agent.v1.types.agent_v1settings import AgentV1Settings + +from eva.assistant.deepgram_server import ( + INITIAL_MESSAGE, + DeepgramAssistantServer, + _agent_tools_to_deepgram, +) +from eva.models.agents import AgentConfig, AgentTool, AgentToolParameter + + +def _agent_with_tools() -> AgentConfig: + return AgentConfig( + id="a1", + name="Test Agent", + description="desc", + role="role", + instructions="be helpful", + tool_module_path="eva.assistant.tools.airline_tools", + tools=[ + AgentTool( + id="t1", + name="Lookup Booking", + description="Look up a booking", + required_parameters=[AgentToolParameter(name="booking_id", type="str", description="The booking id")], + optional_parameters=[AgentToolParameter(name="verbose", type="bool")], + ) + ], + ) + + +def _bare_server() -> DeepgramAssistantServer: + """Build a server without running __init__ (which needs file-backed tool config).""" + srv = object.__new__(DeepgramAssistantServer) + srv._audio_sample_rate = 24000 + srv._language = "en" + srv._listen_model = "nova-3" + srv._think_provider = "open_ai" + srv._think_model = "gpt-4o-mini" + srv._model = "gpt-4o-mini" + srv._speak_model = "aura-2-thalia-en" + srv._system_prompt = "you are a helpful assistant" + srv._functions = None + return srv + + +class TestToolConversion: + def test_no_tools_returns_none(self): + agent = MagicMock() + agent.tools = [] + assert _agent_tools_to_deepgram(agent) is None + + def test_tool_converted_to_client_side_function(self): + functions = _agent_tools_to_deepgram(_agent_with_tools()) + assert functions is not None + assert len(functions) == 1 + fn = functions[0] + # Client-side functions have no "endpoint" key. + assert "endpoint" not in fn + assert fn["name"] # function_name derived from the tool + assert "Lookup Booking" in fn["description"] + params = fn["parameters"] + assert params["type"] == "object" + assert "booking_id" in params["properties"] + assert params["required"] == ["booking_id"] + + +class TestBuildSettings: + def test_audio_encoding_and_sample_rate(self): + settings = _bare_server()._build_settings() + assert isinstance(settings, AgentV1Settings) + wire = settings.dict() + assert wire["audio"]["input"] == {"encoding": "linear16", "sample_rate": 24000} + assert wire["audio"]["output"]["encoding"] == "linear16" + assert wire["audio"]["output"]["sample_rate"] == 24000 + # Raw PCM output (no WAV header) so the pacer can stream it directly. + assert wire["audio"]["output"]["container"] == "none" + + def test_providers_and_greeting(self): + wire = _bare_server()._build_settings().dict() + agent = wire["agent"] + assert agent["greeting"] == INITIAL_MESSAGE + assert agent["language"] == "en" + assert agent["listen"]["provider"]["model"] == "nova-3" + assert agent["think"]["provider"]["type"] == "open_ai" + assert agent["think"]["provider"]["model"] == "gpt-4o-mini" + assert agent["think"]["prompt"] == "you are a helpful assistant" + assert agent["speak"]["provider"]["model"] == "aura-2-thalia-en" + + def test_functions_omitted_when_no_tools(self): + wire = _bare_server()._build_settings().dict() + assert "functions" not in wire["agent"]["think"] + + def test_functions_included_when_present(self): + srv = _bare_server() + srv._functions = _agent_tools_to_deepgram(_agent_with_tools()) + wire = srv._build_settings().dict() + functions = wire["agent"]["think"]["functions"] + assert len(functions) == 1 + assert functions[0]["parameters"]["required"] == ["booking_id"] diff --git a/tests/unit/orchestrator/test_framework_dispatch.py b/tests/unit/orchestrator/test_framework_dispatch.py index 82884115..fbb010ce 100644 --- a/tests/unit/orchestrator/test_framework_dispatch.py +++ b/tests/unit/orchestrator/test_framework_dispatch.py @@ -2,6 +2,7 @@ import pytest +from eva.assistant.deepgram_server import DeepgramAssistantServer from eva.assistant.grok_voice_server import GrokVoiceAssistantServer from eva.assistant.openai_realtime_server import OpenAIRealtimeAssistantServer from eva.orchestrator.worker import _get_server_class @@ -12,6 +13,11 @@ def test_grok_voice_dispatch_returns_grok_class(): assert cls is GrokVoiceAssistantServer +def test_deepgram_dispatch_returns_deepgram_class(): + cls = _get_server_class("deepgram") + assert cls is DeepgramAssistantServer + + def test_grok_voice_is_subclass_of_openai_realtime(): assert issubclass(GrokVoiceAssistantServer, OpenAIRealtimeAssistantServer) From bba4c1201e4c4d7eef2d06a9ba0e38c829310e31 Mon Sep 17 00:00:00 2001 From: weiz9 Date: Wed, 10 Jun 2026 11:21:59 -0700 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Katrina Stankiewicz --- src/eva/assistant/deepgram_server.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/eva/assistant/deepgram_server.py b/src/eva/assistant/deepgram_server.py index 86061a67..b644a7a7 100644 --- a/src/eva/assistant/deepgram_server.py +++ b/src/eva/assistant/deepgram_server.py @@ -64,9 +64,7 @@ # Defaults for the Voice Agent listen/think/speak providers (overridable via s2s_params). _DEFAULT_LISTEN_MODEL = "nova-3" _DEFAULT_THINK_PROVIDER = "open_ai" -_DEFAULT_THINK_MODEL = "gpt-4o-mini" _DEFAULT_SPEAK_MODEL = "aura-2-thalia-en" -_DEFAULT_LANGUAGE = "en" def _agent_tools_to_deepgram(agent: AgentConfig) -> list[dict[str, Any]] | None: @@ -107,12 +105,11 @@ def __init__(self, **kwargs: Any) -> None: self._api_key: str = s2s_params.get("api_key", "") # ``think_model`` is the LLM driving the agent; used as the metrics label. # Accept ``model`` as an alias for the contract's "model required" convention. - self._think_model: str = s2s_params.get("think_model") or s2s_params.get("model") or _DEFAULT_THINK_MODEL + self._think_model: str = s2s_params["model"] self._model = self._think_model self._think_provider: str = s2s_params.get("think_provider", _DEFAULT_THINK_PROVIDER) self._listen_model: str = s2s_params.get("listen_model", _DEFAULT_LISTEN_MODEL) self._speak_model: str = s2s_params.get("speak_model", _DEFAULT_SPEAK_MODEL) - self._language: str = s2s_params.get("language", _DEFAULT_LANGUAGE) # Build system prompt (same pattern as the other realtime/S2S servers) prompt_manager = PromptManager() @@ -216,7 +213,7 @@ def _build_settings(self) -> AgentV1Settings: "output": {"encoding": "linear16", "sample_rate": self._audio_sample_rate, "container": "none"}, }, "agent": { - "language": self._language, + "language": self.language, "greeting": self.initial_message, "listen": {"provider": {"type": "deepgram", "model": self._listen_model}}, "think": think, From 234d46a1de4b303fd1091c15d00010d288520701 Mon Sep 17 00:00:00 2001 From: "wei.zhong1" Date: Wed, 10 Jun 2026 11:48:42 -0700 Subject: [PATCH 3/3] Deepgram: cascade evaluation + think_label, fix review-suggestion test Builds on the code-review suggestions (model now required in s2s_params; use base self.language): - Evaluate Deepgram as a cascade pipeline (get_pipeline_type -> CASCADE) since it runs STT->LLM->TTS internally, so stt_wer / transcription_accuracy / speakability run; expose {stt, llm, tts} via pipeline_parts so the run_id folder shows the three component models. - Add optional `think_label` to decouple the short metrics/run_id label from the (long) Deepgram model id (Deepgram still receives `model`). - Fix test broken by the review suggestion: _bare_server set `_language` but the server now reads base `self.language`. - Docs updated to match. --- docs/assistant_server_contract.md | 13 +++++++++---- src/eva/assistant/deepgram_server.py | 7 ++++--- src/eva/models/config.py | 16 ++++++++++++++-- tests/unit/assistant/test_deepgram_server.py | 2 +- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/docs/assistant_server_contract.md b/docs/assistant_server_contract.md index b2363572..9394b6bc 100644 --- a/docs/assistant_server_contract.md +++ b/docs/assistant_server_contract.md @@ -548,10 +548,15 @@ server and a good template for a new S2S framework. Notable points specific to Deepgram: - **Config.** `framework: deepgram`, `model: {s2s: deepgram, s2s_params: {...}}`. Recognised - `s2s_params`: `api_key` (required), `think_provider` (default `open_ai`), - `think_model` / `model` (LLM + metrics label, default `gpt-4o-mini`), - `listen_model` (STT, default `nova-3`), `speak_model` (TTS, default `aura-2-thalia-en`), - `language` (default `en`). + `s2s_params`: `api_key` and `model` (both **required**; `model` is the exact Deepgram LLM id, + e.g. `gpt-4o-mini` or `claude-haiku-4-5`), `think_provider` (default `open_ai`; use `anthropic` + for Claude models), `think_label` (optional short metrics/run_id label — Deepgram still receives + `model`), `listen_model` (STT, default `nova-3`), `speak_model` (TTS, default `aura-2-thalia-en`). + The conversation language comes from the run's `language` (base server), not `s2s_params`. +- **Evaluation.** Although configured via `s2s`, Deepgram is scored as a **cascade** pipeline + (`get_pipeline_type` → `CASCADE`), since it runs STT→LLM→TTS internally — so STT/TTS metrics + (`stt_wer`, `transcription_accuracy_key_entities`, `speakability`) run. `pipeline_parts` exposes + `{stt, llm, tts}` so the run_id/folder shows the three component models. - **Settings.** Sent once on connect via `send_settings(AgentV1Settings)`. Built from a plain dict and validated with `AgentV1Settings.model_validate(...)`, which resolves the discriminated provider unions. Audio is `linear16` @ 24 kHz both directions with output diff --git a/src/eva/assistant/deepgram_server.py b/src/eva/assistant/deepgram_server.py index b644a7a7..35878361 100644 --- a/src/eva/assistant/deepgram_server.py +++ b/src/eva/assistant/deepgram_server.py @@ -103,10 +103,11 @@ def __init__(self, **kwargs: Any) -> None: s2s_params = self.pipeline_config.s2s_params or {} self._api_key: str = s2s_params.get("api_key", "") - # ``think_model`` is the LLM driving the agent; used as the metrics label. - # Accept ``model`` as an alias for the contract's "model required" convention. + # ``model`` is the exact LLM id sent to Deepgram (required). self._think_model: str = s2s_params["model"] - self._model = self._think_model + # Metrics/run_id label, decoupled from the (often long) Deepgram model id: + # an explicit ``think_label`` if provided, else the model id itself. + self._model = s2s_params.get("think_label") or self._think_model self._think_provider: str = s2s_params.get("think_provider", _DEFAULT_THINK_PROVIDER) self._listen_model: str = s2s_params.get("listen_model", _DEFAULT_LISTEN_MODEL) self._speak_model: str = s2s_params.get("speak_model", _DEFAULT_SPEAK_MODEL) diff --git a/src/eva/models/config.py b/src/eva/models/config.py index a6d29ec1..eb3bbdb8 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -209,6 +209,17 @@ def pipeline_parts(self) -> dict[str, str]: "s2s": _param_alias(self.s2s_params) or self.s2s, **_fetch_elevenlabs_agent_models(self.s2s_params), } + if self.s2s == "deepgram": + # Deepgram Voice Agent is a cascade internally (STT -> LLM -> TTS); + # expose its component models. The `llm` part uses the short + # `think_label` if provided (else the Deepgram model id), so the + # run_id/folder stays readable; defaults mirror deepgram_server.py. + p = self.s2s_params or {} + return { + "stt": p.get("listen_model", "nova-3"), + "llm": p.get("think_label") or p.get("model") or p.get("think_model", ""), + "tts": p.get("speak_model", "aura-2-thalia-en"), + } return {"s2s": _param_alias(self.s2s_params)} case PipelineType.CASCADE: return { @@ -249,8 +260,9 @@ def get_pipeline_type(model_data: dict) -> PipelineType: ``llm_model`` in a flat dict. """ if s2s_value := model_data.get("s2s"): - # ElevenLabs uses s2s_params for configuration but is a cascade pipeline internally - if s2s_value == "elevenlabs": + # ElevenLabs and Deepgram use s2s_params for configuration but are cascade + # pipelines internally (STT -> LLM -> TTS), so they're scored as cascade. + if s2s_value in ("elevenlabs", "deepgram"): return PipelineType.CASCADE # Ultravox uses s2s_params for plumbing but is an audio-LLM (audio in, text out, separate TTS) if s2s_value == "ultravox": diff --git a/tests/unit/assistant/test_deepgram_server.py b/tests/unit/assistant/test_deepgram_server.py index f4321a15..299147ed 100644 --- a/tests/unit/assistant/test_deepgram_server.py +++ b/tests/unit/assistant/test_deepgram_server.py @@ -37,7 +37,7 @@ def _bare_server() -> DeepgramAssistantServer: """Build a server without running __init__ (which needs file-backed tool config).""" srv = object.__new__(DeepgramAssistantServer) srv._audio_sample_rate = 24000 - srv._language = "en" + srv.language = "en" srv._listen_model = "nova-3" srv._think_provider = "open_ai" srv._think_model = "gpt-4o-mini"