Docs / Raw

Talker Service

Sourced from docs/services/talker-service.md

Edit on GitHub

Talker Service

Location: services/api-gateway/app/services/talker_service.py Status: Production Ready Last Updated: 2025-12-01

Overview

The TalkerService handles text-to-speech synthesis for the Thinker-Talker voice pipeline. It streams LLM tokens through a sentence chunker and synthesizes speech via ElevenLabs for gapless audio playback.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                       TalkerService                              │
│                                                                  │
│   LLM Tokens ──►┌──────────────────┐                            │
│                 │ Markdown Buffer  │  (accumulates for pattern  │
│                 │                  │   detection before strip)   │
│                 └────────┬─────────┘                            │
│                          │                                       │
│                          ▼                                       │
│                 ┌──────────────────┐                            │
│                 │ SentenceChunker  │  (splits at natural        │
│                 │ (40-120-200 chars)│   boundaries)              │
│                 └────────┬─────────┘                            │
│                          │                                       │
│                          ▼                                       │
│                 ┌──────────────────┐                            │
│                 │ strip_markdown   │  (removes **bold**,        │
│                 │ _for_tts()       │   [links](url), LaTeX)     │
│                 └────────┬─────────┘                            │
│                          │                                       │
│                          ▼                                       │
│                 ┌──────────────────┐                            │
│                 │ ElevenLabs TTS   │  (streaming synthesis      │
│                 │ (sequential)     │   with previous_text)      │
│                 └────────┬─────────┘                            │
│                          │                                       │
│                          ▼                                       │
│                   Audio Chunks ──► on_audio_chunk callback       │
└─────────────────────────────────────────────────────────────────┘

Classes

TalkerService

Main service class (singleton pattern).

from app.services.talker_service import talker_service # Check if TTS is available if talker_service.is_enabled(): # Start a speaking session (uses DEFAULT_VOICE_ID from voice_constants.py) session = await talker_service.start_session( on_audio_chunk=handle_audio, voice_config=VoiceConfig( # voice_id defaults to DEFAULT_VOICE_ID (Brian) stability=0.65, ), ) # Feed tokens from LLM for token in llm_stream: await session.add_token(token) # Finish and get metrics metrics = await session.finish()

Methods

MethodDescriptionParametersReturns
is_enabled()Check if TTS is availableNonebool
get_provider()Get active TTS providerNoneTTSProvider
start_session()Start a TTS sessionon_audio_chunk, voice_configTalkerSession
synthesize_text()Simple text synthesistext, voice_configAsyncIterator[bytes]
get_available_voices()List available voicesNoneList[Dict]

TalkerSession

Session class for streaming TTS.

class TalkerSession: """ A single TTS speaking session with streaming support. Manages the flow: 1. Receive LLM tokens 2. Chunk into sentences 3. Synthesize each sentence 4. Stream audio chunks to callback """

Methods

MethodDescriptionParametersReturns
add_token()Add token from LLMtoken: strNone
finish()Complete synthesisNoneTalkerMetrics
cancel()Cancel for barge-inNoneNone
get_metrics()Get session metricsNoneTalkerMetrics

Properties

PropertyTypeDescription
stateTalkerStateCurrent state

AudioQueue

Queue management for gapless playback.

class AudioQueue: """ Manages audio chunks for gapless playback with cancellation support. Features: - Async queue for audio chunks - Cancellation clears pending audio - Tracks queue state """ async def put(self, chunk: AudioChunk) -> bool async def get(self) -> Optional[AudioChunk] async def cancel(self) -> None def finish(self) -> None def reset(self) -> None

Data Classes

TalkerState

class TalkerState(str, Enum): IDLE = "idle" # Ready for input SPEAKING = "speaking" # Synthesizing/playing CANCELLED = "cancelled" # Interrupted by barge-in

TTSProvider

class TTSProvider(str, Enum): ELEVENLABS = "elevenlabs" OPENAI = "openai" # Fallback

VoiceConfig

Note: Default voice is configured in app/core/voice_constants.py. See Voice Configuration for details.

from app.core.voice_constants import DEFAULT_VOICE_ID, DEFAULT_TTS_MODEL @dataclass class VoiceConfig: provider: TTSProvider = TTSProvider.ELEVENLABS voice_id: str = DEFAULT_VOICE_ID # Brian (from voice_constants.py) model_id: str = DEFAULT_TTS_MODEL # eleven_flash_v2_5 stability: float = 0.65 # 0.0-1.0, higher = consistent similarity_boost: float = 0.80 # 0.0-1.0, higher = clearer style: float = 0.15 # 0.0-1.0, lower = natural use_speaker_boost: bool = True output_format: str = "pcm_24000"

AudioChunk

@dataclass class AudioChunk: data: bytes # Raw audio bytes format: str # "pcm16" or "mp3" is_final: bool # True for last chunk sentence_index: int # Which sentence this is from latency_ms: int # Time since synthesis started

TalkerMetrics

@dataclass class TalkerMetrics: sentences_processed: int = 0 total_chars_synthesized: int = 0 total_audio_bytes: int = 0 total_latency_ms: int = 0 first_audio_latency_ms: int = 0 cancelled: bool = False

Sentence Chunking

The TalkerSession uses SentenceChunker with these settings:

self._chunker = SentenceChunker( ChunkerConfig( min_chunk_chars=40, # Avoid tiny fragments optimal_chunk_chars=120, # Full sentences max_chunk_chars=200, # Allow complete thoughts ) )

Why These Settings?

ParameterValueRationale
min_chunk_chars40Prevents choppy TTS from short phrases
optimal_chunk_chars120Full sentences sound more natural
max_chunk_chars200Prevents excessive buffering

Trade-off: Larger chunks = better prosody but higher latency to first audio.

Markdown Stripping

LLM responses often contain markdown that sounds unnatural when spoken:

def strip_markdown_for_tts(text: str) -> str: """ Converts: - [Link Text](URL) → "Link Text" - **bold** → "bold" - *italic* → "italic" - `code` → "code" - ```blocks``` → (removed) - # Headers → "Headers" - LaTeX formulas → (removed) """

Markdown-Aware Token Buffering

The TalkerSession buffers tokens to detect incomplete patterns:

def _process_markdown_token(self, token: str) -> str: """ Accumulates tokens to detect patterns that should be stripped: - Markdown links: [text](url) - wait for closing ) - LaTeX display: [ ... ] with backslashes - LaTeX inline: \\( ... \\) - Bold/italic: **text** - wait for closing ** """

This prevents sending "[Link Te" to TTS before we know it's a markdown link.

Voice Continuity

For consistent voice across sentences:

async for audio_data in self._elevenlabs.synthesize_stream( text=tts_text, previous_text=self._previous_text, # Context for voice continuity ... ): ... # Update for next synthesis self._previous_text = tts_text

The previous_text parameter helps ElevenLabs maintain consistent prosody.

Sequential Synthesis

To prevent voice variations between chunks:

# Semaphore ensures one synthesis at a time self._synthesis_semaphore = asyncio.Semaphore(1) async with self._synthesis_semaphore: async for audio_data in self._elevenlabs.synthesize_stream(...): ...

Parallel synthesis can cause noticeable voice quality differences between sentences.

Usage Examples

Basic Token Streaming

async def handle_llm_response(llm_stream): async def on_audio_chunk(chunk: AudioChunk): # Send to client via WebSocket await websocket.send_json({ "type": "audio.output", "audio": base64.b64encode(chunk.data).decode(), "is_final": chunk.is_final, }) session = await talker_service.start_session(on_audio_chunk=on_audio_chunk) async for token in llm_stream: await session.add_token(token) metrics = await session.finish() print(f"Synthesized {metrics.sentences_processed} sentences") print(f"First audio in {metrics.first_audio_latency_ms}ms")

Custom Voice Configuration

config = VoiceConfig( voice_id="21m00Tcm4TlvDq8ikWAM", # Rachel (female) model_id="eleven_flash_v2_5", # Lower latency stability=0.65, # More variation similarity_boost=0.90, # Very clear style=0.15, # Slightly expressive ) session = await talker_service.start_session( on_audio_chunk=handle_audio, voice_config=config, )

Handling Barge-in

active_session = None async def start_speaking(llm_stream): global active_session active_session = await talker_service.start_session(on_audio_chunk=send_audio) for token in llm_stream: if active_session.is_cancelled(): break await active_session.add_token(token) await active_session.finish() async def handle_barge_in(): global active_session if active_session: await active_session.cancel() # Cancels pending synthesis and clears audio queue

Simple Text Synthesis

# For non-streaming use cases async for audio_chunk in talker_service.synthesize_text( text="Hello, how can I help you today?", voice_config=VoiceConfig(voice_id="TxGEqnHWrfWFTfGW9XjX"), ): await send_audio(audio_chunk)

Available Voices

voices = talker_service.get_available_voices() # Returns: [ {"id": "TxGEqnHWrfWFTfGW9XjX", "name": "Josh", "gender": "male", "premium": True}, {"id": "pNInz6obpgDQGcFmaJgB", "name": "Adam", "gender": "male", "premium": True}, {"id": "EXAVITQu4vr4xnSDxMaL", "name": "Bella", "gender": "female", "premium": True}, {"id": "21m00Tcm4TlvDq8ikWAM", "name": "Rachel", "gender": "female", "premium": True}, # ... more voices ]

Performance Tuning

Latency Optimization

SettingLower LatencyHigher Quality
model_ideleven_flash_v2_5eleven_turbo_v2_5
min_chunk_chars1540
optimal_chunk_chars50120
output_formatpcm_24000mp3_44100_192

Quality Optimization

SettingMore NaturalMore Consistent
stability0.500.85
similarity_boost0.700.90
style0.200.05

Error Handling

Synthesis errors don't fail the entire session:

async def _synthesize_sentence(self, sentence: str) -> None: try: async for audio_data in self._elevenlabs.synthesize_stream(...): if self._state == TalkerState.CANCELLED: return await self._on_audio_chunk(chunk) except Exception as e: logger.error(f"TTS synthesis error: {e}") # Session continues, just skips this sentence
Beginning of guide
End of guide