Docs / Raw

Speaker Diarization Service

Sourced from docs/voice/speaker-diarization-service.md

Edit on GitHub

Speaker Diarization Service

Phase 3 - Voice Mode v4.1

Multi-speaker detection and attribution for conversations using pyannote.audio.

Overview

The Speaker Diarization Service identifies and tracks multiple speakers in audio streams, enabling speaker-attributed transcripts and multi-party conversation support.

Audio Stream
    │
    ▼
┌─────────────────────────────────────────────────┐
│           Speaker Diarization Pipeline          │
│                                                 │
│  ┌─────────────┐   ┌──────────────┐   ┌──────┐ │
│  │ VAD/Segment │ → │  Embedding   │ → │Cluster│ │
│  │  Detection  │   │  Extraction  │   │       │ │
│  └─────────────┘   └──────────────┘   └──────┘ │
│                                                 │
└─────────────────────────────────────────────────┘
    │
    ▼
Speaker Segments: [Speaker A: 0-5s] [Speaker B: 5-12s] [Speaker A: 12-15s]

Features

  • Real-time Detection: Streaming speaker change detection
  • Speaker Embeddings: 512-dimensional voice prints
  • Speaker Database: Persist and re-identify recurring speakers
  • Multi-speaker Support: Up to 4 concurrent speakers
  • Offline & Streaming: Both batch and real-time modes

Requirements

pip install pyannote.audio torch torchaudio scipy

Environment Variables:

HUGGINGFACE_TOKEN=hf_xxxxxxxxxxxx # Required for model access

Feature Flag

Enable via feature flag:

# flag_definitions.yaml backend.voice_v4_speaker_diarization: default: false description: "Enable speaker diarization"

Basic Usage

Offline Processing

from app.services.speaker_diarization_service import ( get_speaker_diarization_service, DiarizationResult ) # Get service instance service = get_speaker_diarization_service() await service.initialize() # Process audio file with open("conversation.wav", "rb") as f: audio_data = f.read() result: DiarizationResult = await service.process_audio( audio_data=audio_data, sample_rate=16000, num_speakers=2 # Optional: specify if known ) # Access results print(f"Detected {result.num_speakers} speakers") for segment in result.segments: print(f" {segment.speaker_id}: {segment.start_ms}ms - {segment.end_ms}ms")

Streaming Mode

from app.services.speaker_diarization_service import ( create_streaming_session, StreamingDiarizationSession ) # Create session session = await create_streaming_session( session_id="voice-session-123", sample_rate=16000 ) # Register speaker change callback def on_speaker_change(segment): print(f"Speaker changed to: {segment.speaker_id}") session.on_speaker_change(on_speaker_change) # Process audio chunks async for audio_chunk in audio_stream: segment = await session.process_chunk(audio_chunk) if segment: print(f"Current speaker: {segment.speaker_id}") # Get final results result = await session.stop()

Data Structures

SpeakerSegment

@dataclass class SpeakerSegment: speaker_id: str # e.g., "SPEAKER_00" start_ms: int # Segment start time end_ms: int # Segment end time confidence: float # Detection confidence (0-1) embedding: List[float] | None # Optional voice embedding @property def duration_ms(self) -> int: return self.end_ms - self.start_ms

DiarizationResult

@dataclass class DiarizationResult: segments: List[SpeakerSegment] num_speakers: int total_duration_ms: int processing_time_ms: float model_version: str = "pyannote-3.1" def get_speaker_summary(self) -> Dict[str, int]: """Total speaking time per speaker."""

SpeakerProfile

@dataclass class SpeakerProfile: speaker_id: str name: str | None embedding: List[float] | None voice_samples: int first_seen: datetime last_seen: datetime metadata: Dict[str, Any]

Speaker Database

The SpeakerDatabase class provides speaker re-identification:

from app.services.speaker_diarization_service import ( get_speaker_database, get_embedding_extractor ) # Get instances database = get_speaker_database() extractor = get_embedding_extractor() await extractor.initialize() # Extract embedding from audio embedding = await extractor.extract_embedding(audio_data) # Find or create speaker profile profile = await database.identify_or_create( embedding=embedding, threshold=0.75, # Similarity threshold name="John" # Optional name ) print(f"Speaker: {profile.speaker_id}, Samples: {profile.voice_samples}")

Persistence

# Export database data = database.to_dict() with open("speakers.json", "w") as f: json.dump(data, f) # Import database with open("speakers.json", "r") as f: data = json.load(f) database.load_from_dict(data)

Configuration

DiarizationConfig

@dataclass class DiarizationConfig: # Model settings model_name: str = "pyannote/speaker-diarization-3.1" use_gpu: bool = True device: str = "cuda" # or "cpu" # Detection settings min_speakers: int = 1 max_speakers: int = 4 min_segment_duration_ms: int = 200 min_cluster_size: int = 3 # Embedding settings embedding_model: str = "pyannote/embedding" embedding_dim: int = 512 similarity_threshold: float = 0.75 # Streaming settings chunk_duration_ms: int = 1000 overlap_duration_ms: int = 200 # Performance max_processing_time_ms: int = 500

Custom Configuration

from app.services.speaker_diarization_service import ( SpeakerDiarizationService, DiarizationConfig ) config = DiarizationConfig( max_speakers=2, # Two-person conversation use_gpu=False, # CPU-only similarity_threshold=0.8, # Stricter matching ) service = SpeakerDiarizationService(config) await service.initialize()

Streaming Session Management

Session Lifecycle

session = await create_streaming_session("session-123") # Session properties session.session_id # Unique identifier session.is_active # Whether session is running session.current_speaker # Current detected speaker # Process audio await session.process_chunk(audio_bytes) # End session result = await session.stop()

Speaker Change Callbacks

def on_speaker_change(segment: SpeakerSegment): # Update UI update_speaker_indicator(segment.speaker_id) # Log change logger.info(f"Speaker changed: {segment.speaker_id}") # Notify Thinker inject_speaker_context(segment) session.on_speaker_change(on_speaker_change)

Integration with Voice Pipeline

Thinker Context Injection

async def process_with_diarization(audio_data: bytes): # Get diarization result = await diarization_service.process_audio(audio_data) # Build speaker context speaker_context = [] for segment in result.segments: transcript = get_transcript_for_segment(segment) speaker_context.append(f"[{segment.speaker_id}]: {transcript}") # Inject into Thinker context = "\n".join(speaker_context) response = await thinker.generate( messages=[...], system_context=f"Conversation transcript:\n{context}" )

Frontend Integration

See SpeakerAttributedTranscript Component below.

Performance Considerations

GPU vs CPU

ModeLatency (1s audio)MemoryRecommended Use
GPU~100ms~2GB VRAMReal-time streaming
CPU~500ms~1GB RAMBatch processing

Streaming Optimization

# Optimize for real-time config = DiarizationConfig( chunk_duration_ms=500, # Smaller chunks overlap_duration_ms=100, # Less overlap max_processing_time_ms=300, # Strict budget )

Memory Management

# Clear session when done await session.stop() # Clear database periodically if database.profile_count > 100: database.prune_old_profiles(max_age_days=30)

Testing

Integration Test Example

import pytest from app.services.speaker_diarization_service import ( get_speaker_diarization_service, create_streaming_session ) @pytest.mark.asyncio async def test_diarization_two_speakers(): """Test detection of two distinct speakers.""" service = get_speaker_diarization_service() await service.initialize() # Load test audio with two speakers with open("tests/fixtures/two_speakers.wav", "rb") as f: audio = f.read() result = await service.process_audio(audio) assert result.num_speakers == 2 assert len(result.segments) > 0 assert all(s.confidence > 0.5 for s in result.segments) @pytest.mark.asyncio async def test_streaming_session(): """Test streaming diarization session.""" session = await create_streaming_session("test-session") # Simulate audio chunks for chunk in generate_test_chunks(): segment = await session.process_chunk(chunk) result = await session.stop() assert result.num_speakers >= 1

Frontend Component

SpeakerAttributedTranscript

// See: apps/web-app/src/components/voice/SpeakerAttributedTranscript.tsx interface TranscriptSegment { speakerId: string; text: string; startMs: number; endMs: number; confidence: number; } function SpeakerAttributedTranscript({ segments, speakerNames, }: { segments: TranscriptSegment[]; speakerNames: Record<string, string>; }) { return ( <div className="space-y-2"> {segments.map((segment, i) => ( <div key={i} className="flex gap-2"> <span className="font-medium text-blue-600">{speakerNames[segment.speakerId] || segment.speakerId}:</span> <span>{segment.text}</span> </div> ))} </div> ); }

Troubleshooting

Common Issues

Model fails to load:

Error: Failed to initialize diarization service: 401 Unauthorized

→ Check HUGGINGFACE_TOKEN environment variable

Out of memory:

Error: CUDA out of memory

→ Set use_gpu=False or reduce max_speakers

Poor accuracy: → Increase min_segment_duration_ms or adjust similarity_threshold

Beginning of guide
End of guide