VoiceAssist Docs

Web Search

Search workflows and citation handling

stabledocs2025-11-27human
semanticsearchdesign

Semantic Search & Knowledge Base Design

Overview

VoiceAssist uses a sophisticated semantic search system to retrieve relevant medical knowledge from textbooks, journals, and clinical guidelines. This document describes the complete ingestion and query pipeline.

Note: For canonical entity definitions (KnowledgeDocument, KBChunk, IndexingJob), see DATA_MODEL.md. This document describes their usage in the search pipeline.

Architecture Diagram

┌─────────────────────────────────────────────────────────────────┐
│                     INGESTION PIPELINE                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  [PDF Upload] → [Text Extraction] → [Chunking] → [Embedding]   │
│                        ↓                  ↓            ↓         │
│                   [OCR/Parse]      [Metadata]    [Vectors]      │
│                        ↓                  ↓            ↓         │
│                 [Preprocessing]     [Enrichment] [Indexing]     │
│                                                        ↓         │
│                                              [Qdrant Vector DB]  │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│                      QUERY PIPELINE                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  [User Query] → [Intent Detection] → [Query Enhancement]        │
│                                              ↓                   │
│                                    [Vector Search (Hybrid)]      │
│                                    - Dense vectors (Qdrant)      │
│                                    - Sparse (BM25) fallback      │
│                                              ↓                   │
│                                    [Post-processing]             │
│                                    - Reranking                   │
│                                    - Deduplication               │
│                                    - Filtering                   │
│                                              ↓                   │
│                                    [Top-K Results]               │
│                                              ↓                   │
│                                    [RAG: LLM + Citations]        │
└─────────────────────────────────────────────────────────────────┘

Ingestion Pipeline

1. Document Upload & Storage

Process:

  1. User uploads PDF/DOCX via admin panel
  2. File saved to /data/documents/{doc_id}/{filename}
  3. Document record created in PostgreSQL with status uploaded
  4. Background task triggered for processing

Python Implementation:

# app/services/medical/document_processor.py from pathlib import Path import hashlib from typing import Optional from sqlalchemy.orm import Session from app.models.document import Document from app.core.config import settings class DocumentUploader: def __init__(self, db: Session): self.db = db self.storage_path = Path(settings.DOCUMENT_STORAGE_PATH) async def upload_document( self, file_data: bytes, filename: str, source_type: str, specialty: str, metadata: Optional[dict] = None ) -> Document: """ Upload and store a document, returning Document model. """ # Generate document ID from hash file_hash = hashlib.sha256(file_data).hexdigest() doc_id = file_hash[:16] # Check if already exists existing = self.db.query(Document).filter( Document.file_hash == file_hash ).first() if existing: return existing # Create storage directory doc_dir = self.storage_path / doc_id doc_dir.mkdir(parents=True, exist_ok=True) # Save file file_path = doc_dir / filename with open(file_path, 'wb') as f: f.write(file_data) # Create database record document = Document( id=doc_id, filename=filename, file_path=str(file_path), file_hash=file_hash, file_size=len(file_data), source_type=source_type, specialty=specialty, status='uploaded', metadata=metadata or {} ) self.db.add(document) self.db.commit() self.db.refresh(document) return document

2. Text Extraction

Supported Formats:

  • PDF (native text extraction + OCR fallback)
  • DOCX (python-docx)
  • HTML (BeautifulSoup)
  • Plain text

Libraries:

  • PyPDF2: Fast PDF text extraction
  • pdfplumber: Better table/structure handling
  • Tesseract OCR: For scanned documents
  • python-docx: DOCX extraction

Python Implementation:

# app/services/medical/text_extractor.py import io from typing import List, Dict import PyPDF2 import pdfplumber from pdf2image import convert_from_path import pytesseract from PIL import Image class TextExtractor: """ Extract text from various document formats. """ def extract_from_pdf(self, file_path: str) -> List[Dict[str, str]]: """ Extract text from PDF, returning list of pages. Falls back to OCR if native extraction fails. """ pages = [] try: # Try native text extraction first with pdfplumber.open(file_path) as pdf: for page_num, page in enumerate(pdf.pages, start=1): text = page.extract_text() # If page has little text, try OCR if len(text.strip()) < 100: text = self._ocr_page(file_path, page_num) pages.append({ 'page': page_num, 'text': text, 'width': page.width, 'height': page.height }) except Exception as e: print(f"Native extraction failed, falling back to OCR: {e}") pages = self._ocr_entire_pdf(file_path) return pages def _ocr_page(self, file_path: str, page_num: int) -> str: """ OCR a single page using Tesseract. """ images = convert_from_path( file_path, first_page=page_num, last_page=page_num, dpi=300 ) if images: text = pytesseract.image_to_string(images[0]) return text return "" def _ocr_entire_pdf(self, file_path: str) -> List[Dict[str, str]]: """ OCR entire PDF. """ images = convert_from_path(file_path, dpi=300) pages = [] for page_num, image in enumerate(images, start=1): text = pytesseract.image_to_string(image) pages.append({ 'page': page_num, 'text': text }) return pages

3. Text Preprocessing & Cleaning

Steps:

  1. Remove headers/footers (page numbers, running headers)
  2. Fix encoding issues
  3. Normalize whitespace
  4. Remove references section (if at end)
  5. Preserve medical formatting (units, dosages)

Python Implementation:

# app/services/medical/text_preprocessor.py import re from typing import List, Dict class TextPreprocessor: """ Clean and normalize extracted text. """ def __init__(self): # Common medical textbook footer patterns self.footer_patterns = [ r'^\d+\s*$', # Page numbers r'^Chapter \d+.*$', r'^Copyright \d{4}.*$' ] def preprocess_pages(self, pages: List[Dict[str, str]]) -> List[Dict[str, str]]: """ Preprocess extracted pages. """ cleaned_pages = [] for page in pages: text = page['text'] # Remove footers/headers text = self._remove_footers(text) # Fix common encoding issues text = self._fix_encoding(text) # Normalize whitespace text = self._normalize_whitespace(text) # Preserve medical formatting text = self._preserve_medical_units(text) cleaned_pages.append({ **page, 'text': text, 'original_text': page['text'] # Keep original }) return cleaned_pages def _remove_footers(self, text: str) -> str: """Remove common footer patterns.""" lines = text.split('\n') cleaned_lines = [] for line in lines: is_footer = False for pattern in self.footer_patterns: if re.match(pattern, line.strip()): is_footer = True break if not is_footer: cleaned_lines.append(line) return '\n'.join(cleaned_lines) def _fix_encoding(self, text: str) -> str: """Fix common encoding issues.""" replacements = { '�': '', # Common replacement character '’': "'", '“': '"', 'â€': '"', 'â€"': '—', } for old, new in replacements.items(): text = text.replace(old, new) return text def _normalize_whitespace(self, text: str) -> str: """Normalize whitespace while preserving structure.""" # Replace multiple spaces with single space text = re.sub(r' +', ' ', text) # Remove spaces before punctuation text = re.sub(r' ([,.;:!?])', r'\1', text) # Normalize newlines (max 2 consecutive) text = re.sub(r'\n{3,}', '\n\n', text) return text.strip() def _preserve_medical_units(self, text: str) -> str: """Ensure medical units and dosages are properly formatted.""" # Ensure space before units text = re.sub(r'(\d+)(mg|mcg|g|kg|mL|L)', r'\1 \2', text) # Preserve blood pressure format text = re.sub(r'(\d+) / (\d+)', r'\1/\2', text) return text

4. Semantic Chunking

Strategy: Semantic chunking with overlap

Parameters:

  • Chunk size: 500 tokens (~400 words)
  • Overlap: 100 tokens (20%)
  • Max chunk size: 750 tokens
  • Min chunk size: 200 tokens

Chunking Methods:

  1. Sentence-based: Split on sentence boundaries
  2. Heading-aware: Keep sections together when possible
  3. Table/figure extraction: Handle structured content separately

Python Implementation:

# app/services/medical/chunker.py from typing import List, Dict import tiktoken import re class SemanticChunker: """ Create semantic chunks from text with intelligent splitting. """ def __init__(self, chunk_size: int = 500, overlap: int = 100): self.chunk_size = chunk_size self.overlap = overlap self.encoder = tiktoken.get_encoding("cl100k_base") # Medical section heading patterns self.heading_patterns = [ r'^#+\s+', # Markdown headings r'^[A-Z][A-Z\s]+$', # ALL CAPS headings r'^\d+\.\d+', # Numbered sections ] def chunk_document( self, pages: List[Dict[str, str]], metadata: Dict ) -> List[Dict]: """ Chunk document into semantic segments. """ chunks = [] current_chunk = "" current_tokens = 0 chunk_id = 0 for page in pages: sentences = self._split_sentences(page['text']) for sentence in sentences: sentence_tokens = len(self.encoder.encode(sentence)) # Check if adding sentence exceeds chunk size if current_tokens + sentence_tokens > self.chunk_size: # Save current chunk if current_chunk: chunks.append(self._create_chunk( chunk_id, current_chunk, page['page'], metadata )) chunk_id += 1 # Start new chunk with overlap current_chunk = self._get_overlap_text(current_chunk) current_tokens = len(self.encoder.encode(current_chunk)) # Add sentence to current chunk current_chunk += " " + sentence current_tokens += sentence_tokens # Add final chunk if current_chunk: chunks.append(self._create_chunk( chunk_id, current_chunk, pages[-1]['page'], metadata )) return chunks def _split_sentences(self, text: str) -> List[str]: """ Split text into sentences, handling medical abbreviations. """ # Protect common medical abbreviations from sentence splitting protected_abbrevs = [ 'Dr.', 'Mr.', 'Mrs.', 'Ms.', 'e.g.', 'i.e.', 'et al.', 'vs.', 'Fig.', 'Ref.', 'Vol.', 'No.' ] text_protected = text for abbrev in protected_abbrevs: text_protected = text_protected.replace(abbrev, abbrev.replace('.', '<DOT>')) # Split on sentence boundaries sentences = re.split(r'[.!?]+\s+', text_protected) # Restore abbreviations sentences = [s.replace('<DOT>', '.') for s in sentences] return [s.strip() for s in sentences if s.strip()] def _get_overlap_text(self, text: str) -> str: """ Get last `overlap` tokens from text for next chunk. """ tokens = self.encoder.encode(text) if len(tokens) <= self.overlap: return text overlap_tokens = tokens[-self.overlap:] return self.encoder.decode(overlap_tokens) def _create_chunk( self, chunk_id: int, text: str, page_num: int, metadata: Dict ) -> Dict: """ Create chunk dictionary with metadata. """ return { 'chunk_id': chunk_id, 'text': text.strip(), 'page': page_num, 'tokens': len(self.encoder.encode(text)), 'metadata': { **metadata, 'page': page_num } }

5. Embedding Generation

Embedding Model: OpenAI text-embedding-3-large

Specifications:

  • Dimensions: 3072 (can be reduced to 1024/512 for efficiency)
  • Max input: 8191 tokens
  • Cost: $0.13 per 1M tokens
  • Performance: MTEB score 64.6

Alternative Models:

  • Local: sentence-transformers/all-MiniLM-L6-v2 (384 dim)
  • Local Medical: microsoft/BiomedNLP-PubMedBERT-base (768 dim)

Python Implementation:

# app/services/medical/embeddings.py from typing import List, Dict import openai from tenacity import retry, stop_after_attempt, wait_exponential import numpy as np class EmbeddingGenerator: """ Generate embeddings for text chunks. """ def __init__(self, model: str = "text-embedding-3-large", dimensions: int = 3072): self.model = model self.dimensions = dimensions self.openai_client = openai.OpenAI() @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) async def embed_chunks(self, chunks: List[Dict]) -> List[Dict]: """ Generate embeddings for list of chunks. Uses batch processing for efficiency. """ # Extract texts texts = [chunk['text'] for chunk in chunks] # Generate embeddings in batches batch_size = 100 all_embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i + batch_size] response = self.openai_client.embeddings.create( model=self.model, input=batch, dimensions=self.dimensions ) batch_embeddings = [item.embedding for item in response.data] all_embeddings.extend(batch_embeddings) # Add embeddings to chunks for chunk, embedding in zip(chunks, all_embeddings): chunk['embedding'] = embedding chunk['embedding_model'] = self.model chunk['embedding_dimensions'] = self.dimensions return chunks async def embed_query(self, query: str) -> List[float]: """ Generate embedding for a single query. """ response = self.openai_client.embeddings.create( model=self.model, input=query, dimensions=self.dimensions ) return response.data[0].embedding

Idempotency & Deduplication

Document Keys

Each KnowledgeDocument has a stable doc_key that serves as the idempotency key:

  • Format: {source_type}-{identifier} (e.g., textbook-harrisons-21e-ch252, guideline-cdc-heart-failure-2023)
  • Purpose: Prevent duplicate ingestion of the same document
  • Uniqueness: Enforced at database level with unique constraint

Examples:

  • textbook-harrisons-21e-ch252 - Harrison's 21st edition, Chapter 252
  • guideline-cdc-heart-failure-2023 - CDC heart failure guideline (2023 version)
  • journal-nejm-2023-12345 - NEJM article with DOI suffix
  • note-user123-clinical-note-456 - User-uploaded clinical note

Upsert Behavior

When a document is re-ingested (same doc_key):

  1. Check existing document by doc_key
  2. If exists:
    • Compare content_hash (SHA-256 of document content)
    • If hash matches: Skip ingestion, return existing KnowledgeDocument.id
    • If hash differs: Create new version, mark old chunks as superseded
  3. If not exists: Create new document

Database Schema:

CREATE TABLE knowledge_documents ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), doc_key VARCHAR(255) UNIQUE NOT NULL, -- Idempotency key content_hash VARCHAR(64) NOT NULL, -- SHA-256 for change detection version INTEGER DEFAULT 1, -- Increment on update superseded_by UUID REFERENCES knowledge_documents(id), -- Points to newer version created_at TIMESTAMP DEFAULT NOW(), ... ); CREATE INDEX idx_doc_key ON knowledge_documents(doc_key); CREATE INDEX idx_superseded ON knowledge_documents(superseded_by) WHERE superseded_by IS NOT NULL;

Chunk Deduplication

Chunks from the same document share document_id:

CREATE TABLE kb_chunks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), document_id UUID REFERENCES knowledge_documents(id) ON DELETE CASCADE, chunk_index INTEGER NOT NULL, -- Position in document (0, 1, 2, ...) content TEXT NOT NULL, embedding VECTOR(768), -- Or dimension of your embedding model superseded BOOLEAN DEFAULT FALSE, -- Mark old chunks when document updated ... ); CREATE INDEX idx_document_chunks ON kb_chunks(document_id, chunk_index); CREATE INDEX idx_superseded_chunks ON kb_chunks(document_id) WHERE superseded = false;

When document is updated:

  1. Set superseded = true on old chunks
  2. Create new chunks with superseded = false
  3. Old chunks remain for audit but excluded from search

API Example

from hashlib import sha256 async def ingest_document( file_path: str, doc_key: str, source_type: str, metadata: dict, ) -> KnowledgeDocument: """ Ingest document with idempotency. Returns existing document if content unchanged, or new version if content updated. """ # Read file content content = read_file(file_path) content_hash = sha256(content.encode()).hexdigest() # Check for existing document existing = await db.query( "SELECT * FROM knowledge_documents WHERE doc_key = $1", doc_key, ) if existing: if existing.content_hash == content_hash: # Content unchanged, return existing logger.info(f"Document {doc_key} unchanged, skipping ingestion") return existing # Content changed, create new version logger.info(f"Document {doc_key} updated, creating new version") new_version = existing.version + 1 # Mark old chunks as superseded await db.execute( "UPDATE kb_chunks SET superseded = true WHERE document_id = $1", existing.id, ) else: new_version = 1 # Create new document (or version) new_doc = await db.insert( "knowledge_documents", doc_key=doc_key, content_hash=content_hash, version=new_version, source_type=source_type, metadata=metadata, ) # Process chunks (extract, embed, store) chunks = await process_and_embed(content, new_doc.id) return new_doc

6. Vector Database Indexing

Vector DB: Qdrant

Collection Schema:

# app/services/medical/vector_db.py from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue ) from typing import List, Dict from uuid import uuid4 class VectorDatabase: """ Qdrant vector database interface. """ def __init__(self, url: str = "http://localhost:6333"): self.client = QdrantClient(url=url) self.collection_name = "medical_knowledge" def create_collection(self, dimensions: int = 3072): """ Create collection with schema. """ self.client.recreate_collection( collection_name=self.collection_name, vectors_config=VectorParams( size=dimensions, distance=Distance.COSINE ) ) # Create payload indexes for filtering self.client.create_payload_index( collection_name=self.collection_name, field_name="source_type", field_schema="keyword" ) self.client.create_payload_index( collection_name=self.collection_name, field_name="specialty", field_schema="keyword" ) self.client.create_payload_index( collection_name=self.collection_name, field_name="document_id", field_schema="keyword" ) async def index_chunks(self, chunks: List[Dict]): """ Index chunks into vector database. """ points = [] for chunk in chunks: point_id = str(uuid4()) point = PointStruct( id=point_id, vector=chunk['embedding'], payload={ # Text content 'text': chunk['text'], 'chunk_id': chunk['chunk_id'], 'page': chunk['page'], 'tokens': chunk['tokens'], # Document metadata 'document_id': chunk['metadata']['document_id'], 'document_title': chunk['metadata']['title'], 'source_type': chunk['metadata']['source_type'], 'specialty': chunk['metadata']['specialty'], # Source details 'authors': chunk['metadata'].get('authors', []), 'publication_year': chunk['metadata'].get('publication_year'), 'publisher': chunk['metadata'].get('publisher'), 'doi': chunk['metadata'].get('doi'), 'pmid': chunk['metadata'].get('pmid'), # Indexing metadata 'embedding_model': chunk['embedding_model'], 'indexed_at': chunk['metadata'].get('indexed_at') } ) points.append(point) # Upsert in batches batch_size = 100 for i in range(0, len(points), batch_size): batch = points[i:i + batch_size] self.client.upsert( collection_name=self.collection_name, points=batch )

Payload Schema:

{ "text": "Atrial fibrillation (AF) is the most common sustained cardiac arrhythmia...", "chunk_id": 42, "page": 234, "tokens": 487, "document_id": "a1b2c3d4e5f6", "document_title": "Harrison's Principles of Internal Medicine - Chapter 45", "source_type": "textbook", "specialty": "cardiology", "authors": ["Dennis Kasper", "Stephen Hauser"], "publication_year": 2022, "publisher": "McGraw-Hill", "doi": null, "pmid": null, "embedding_model": "text-embedding-3-large", "indexed_at": "2024-11-19T10:34:00Z" }

Indexing Job State Machine

Each document ingestion creates an IndexingJob that tracks progress through these states:

┌─────────────────────────────────────────────────────────────┐
│                  IndexingJob State Machine                   │
└─────────────────────────────────────────────────────────────┘

         ┌──────────┐
    ────▶│ PENDING  │ (Job created, queued)
         └─────┬────┘
               │
               ▼
         ┌──────────┐
         │ RUNNING  │ (Worker processing)
         └─────┬────┘
               │
         ┌─────┴─────┐
         │           │
         ▼           ▼
    ┌──────────┐  ┌─────────┐
    │COMPLETED │  │ FAILED  │
    └────┬─────┘  └────┬────┘
         │             │
         │             │ (Manual retry)
         │             └──────────┐
         │                        ▼
         │                 ┌──────────┐
         └────────────────▶│SUPERSEDED│ (Newer version ingested)
                           └──────────┘

State Definitions

StateDescriptionNext StatesCan Retry?
PENDINGJob queued, not yet startedRUNNING, FAILEDN/A
RUNNINGWorker processing documentCOMPLETED, FAILEDN/A
COMPLETEDSuccessfully indexedSUPERSEDEDNo
FAILEDError during processingPENDING (retry), SUPERSEDEDYes
SUPERSEDEDReplaced by newer version(terminal)No

State Transitions

class IndexingJobState(str, Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SUPERSEDED = "superseded" class IndexingJob(BaseModel): """From DATA_MODEL.md - enhanced with state machine.""" id: str document_id: str doc_key: str state: IndexingJobState created_at: datetime started_at: Optional[datetime] = None completed_at: Optional[datetime] = None failed_at: Optional[datetime] = None error_message: Optional[str] = None error_details: Optional[dict] = None retry_count: int = 0 max_retries: int = 3 # Progress tracking total_chunks: Optional[int] = None processed_chunks: int = 0 # Superseded tracking superseded_by: Optional[str] = None # ID of newer job async def transition_state( job_id: str, new_state: IndexingJobState, error: Optional[Exception] = None, ) -> IndexingJob: """Transition job to new state with validation.""" job = await get_job(job_id) # Validate transition valid_transitions = { IndexingJobState.PENDING: [IndexingJobState.RUNNING, IndexingJobState.FAILED], IndexingJobState.RUNNING: [IndexingJobState.COMPLETED, IndexingJobState.FAILED], IndexingJobState.COMPLETED: [IndexingJobState.SUPERSEDED], IndexingJobState.FAILED: [IndexingJobState.PENDING, IndexingJobState.SUPERSEDED], IndexingJobState.SUPERSEDED: [], # Terminal state } if new_state not in valid_transitions[job.state]: raise ValueError( f"Invalid transition: {job.state}{new_state}" ) # Update job job.state = new_state if new_state == IndexingJobState.RUNNING: job.started_at = datetime.utcnow() elif new_state == IndexingJobState.COMPLETED: job.completed_at = datetime.utcnow() elif new_state == IndexingJobState.FAILED: job.failed_at = datetime.utcnow() job.error_message = str(error) if error else None job.error_details = {"type": type(error).__name__} if error else None await db.update("indexing_jobs", job) return job

Retry Logic

async def retry_failed_job(job_id: str) -> IndexingJob: """Retry a failed indexing job.""" job = await get_job(job_id) if job.state != IndexingJobState.FAILED: raise ValueError(f"Can only retry FAILED jobs, got {job.state}") if job.retry_count >= job.max_retries: raise ValueError(f"Max retries ({job.max_retries}) exceeded") job.retry_count += 1 job.state = IndexingJobState.PENDING job.error_message = None job.error_details = None await db.update("indexing_jobs", job) # Re-queue job await queue.enqueue(process_indexing_job, job.id) return job

Admin API Endpoints

@router.get("/api/admin/kb/jobs", response_model=APIEnvelope) async def list_indexing_jobs( state: Optional[IndexingJobState] = None, limit: int = 50, ) -> APIEnvelope: """List indexing jobs with optional state filter.""" jobs = await db.query_jobs(state=state, limit=limit) return success_response(data=[job.dict() for job in jobs]) @router.post("/api/admin/kb/jobs/{job_id}/retry", response_model=APIEnvelope) async def retry_indexing_job(job_id: str) -> APIEnvelope: """Retry a failed indexing job.""" try: job = await retry_failed_job(job_id) return success_response(data=job.dict()) except ValueError as e: return error_response( code="VALIDATION_ERROR", message=str(e), status_code=422, )

Query Pipeline

1. Query Enhancement

Steps:

  1. Detect user intent (quick lookup vs deep analysis)
  2. Extract medical entities (drugs, conditions, procedures)
  3. Expand abbreviations
  4. Add specialty context

Python Implementation:

# app/services/medical/query_enhancer.py from typing import Dict, List import re class QueryEnhancer: """ Enhance user queries for better retrieval. """ def __init__(self): # Common medical abbreviations self.abbreviations = { 'HTN': 'hypertension', 'DM': 'diabetes mellitus', 'CAD': 'coronary artery disease', 'CHF': 'congestive heart failure', 'AF': 'atrial fibrillation', 'MI': 'myocardial infarction', 'CVA': 'cerebrovascular accident', # ... more abbreviations } def enhance_query( self, query: str, context: Dict = None ) -> Dict[str, any]: """ Enhance query with expansions and metadata. """ # Detect intent intent = self._detect_intent(query) # Expand abbreviations expanded_query = self._expand_abbreviations(query) # Extract entities entities = self._extract_entities(expanded_query) # Add context if context and context.get('specialty'): specialty_filter = context['specialty'] else: specialty_filter = self._infer_specialty(expanded_query, entities) return { 'original_query': query, 'enhanced_query': expanded_query, 'intent': intent, 'entities': entities, 'specialty_filter': specialty_filter } def _detect_intent(self, query: str) -> str: """ Detect user intent from query. """ query_lower = query.lower() if any(word in query_lower for word in ['dose', 'dosing', 'how much']): return 'dosing' elif any(word in query_lower for word in ['side effect', 'adverse', 'toxicity']): return 'safety' elif any(word in query_lower for word in ['manage', 'treatment', 'therapy']): return 'management' elif any(word in query_lower for word in ['diagnosis', 'workup', 'test']): return 'diagnosis' else: return 'general' def _expand_abbreviations(self, query: str) -> str: """ Expand medical abbreviations. """ words = query.split() expanded_words = [] for word in words: word_upper = word.strip('.,!?').upper() if word_upper in self.abbreviations: expanded_words.append(f"{word} ({self.abbreviations[word_upper]})") else: expanded_words.append(word) return ' '.join(expanded_words) def _extract_entities(self, query: str) -> Dict[str, List[str]]: """ Extract medical entities from query. Simple pattern-based for now, can use NER later. """ # This is simplified - production should use medical NER entities = { 'conditions': [], 'medications': [], 'procedures': [] } # Simple pattern matching condition_patterns = [ r'\b(hypertension|diabetes|heart failure|pneumonia)\b' ] for pattern in condition_patterns: matches = re.findall(pattern, query, re.IGNORECASE) entities['conditions'].extend(matches) return entities def _infer_specialty(self, query: str, entities: Dict) -> List[str]: """ Infer medical specialty from query. """ query_lower = query.lower() specialties = [] # Keyword-based specialty detection specialty_keywords = { 'cardiology': ['heart', 'cardiac', 'af', 'atrial', 'chf', 'mi'], 'endocrinology': ['diabetes', 'thyroid', 'insulin', 'glucose'], 'infectious_disease': ['infection', 'antibiotic', 'sepsis', 'fever'], 'nephrology': ['kidney', 'renal', 'dialysis', 'ckd'], # ... more specialties } for specialty, keywords in specialty_keywords.items(): if any(keyword in query_lower for keyword in keywords): specialties.append(specialty) return specialties

2. Hybrid Search (Dense + Sparse)

Strategy:

  • Dense (Vector): Semantic similarity using embeddings
  • Sparse (BM25): Keyword matching for exact terms
  • Fusion: Combine scores with learned weights

Python Implementation:

# app/services/medical/rag.py from typing import List, Dict from qdrant_client.models import Filter, FieldCondition, MatchAny, SearchRequest import numpy as np class RAGService: """ Retrieval-Augmented Generation service. """ def __init__(self, vector_db: VectorDatabase, embedding_gen: EmbeddingGenerator): self.vector_db = vector_db self.embedding_gen = embedding_gen async def search( self, query: str, filters: Dict = None, limit: int = 10, hybrid: bool = True ) -> List[Dict]: """ Hybrid search combining vector and keyword matching. """ # Enhance query enhancer = QueryEnhancer() enhanced = enhancer.enhance_query(query, filters) # Generate query embedding query_embedding = await self.embedding_gen.embed_query( enhanced['enhanced_query'] ) # Build filters search_filter = self._build_filter(enhanced, filters) # Vector search vector_results = self.vector_db.client.search( collection_name=self.vector_db.collection_name, query_vector=query_embedding, query_filter=search_filter, limit=limit * 2, # Get more for reranking with_payload=True ) # Convert to standardized format results = [] for hit in vector_results: results.append({ 'id': hit.id, 'score': hit.score, 'text': hit.payload['text'], 'document_id': hit.payload['document_id'], 'document_title': hit.payload['document_title'], 'page': hit.payload['page'], 'source_type': hit.payload['source_type'], 'specialty': hit.payload['specialty'], 'metadata': { 'authors': hit.payload.get('authors', []), 'publication_year': hit.payload.get('publication_year'), 'doi': hit.payload.get('doi'), 'pmid': hit.payload.get('pmid') } }) # Rerank results results = self._rerank(results, enhanced['original_query']) # Return top-k return results[:limit] def _build_filter(self, enhanced: Dict, filters: Dict = None) -> Filter: """ Build Qdrant filter from query enhancement and user filters. """ conditions = [] # Add specialty filter if inferred if enhanced.get('specialty_filter'): conditions.append( FieldCondition( key="specialty", match=MatchAny(any=enhanced['specialty_filter']) ) ) # Add user-provided filters if filters: if filters.get('source_type'): conditions.append( FieldCondition( key="source_type", match=MatchAny(any=filters['source_type']) ) ) if filters.get('specialty'): conditions.append( FieldCondition( key="specialty", match=MatchAny(any=filters['specialty']) ) ) if conditions: return Filter(must=conditions) return None def _rerank(self, results: List[Dict], query: str) -> List[Dict]: """ Rerank results using cross-encoder or heuristics. Simple implementation - can use cross-encoder for better results. """ # For now, boost results that contain exact query terms query_terms = set(query.lower().split()) for result in results: text_terms = set(result['text'].lower().split()) overlap = len(query_terms & text_terms) # Boost score based on keyword overlap boost = 1.0 + (overlap * 0.05) result['score'] *= boost # Sort by boosted score results.sort(key=lambda x: x['score'], reverse=True) return results

3. Result Post-processing

Steps:

  1. Deduplication (remove near-duplicate chunks)
  2. Citation formatting
  3. Relevance filtering (threshold)
  4. Grouping by document

Complete Indexing Example

# app/tasks/indexing_task.py from app.services.medical.document_processor import DocumentUploader from app.services.medical.text_extractor import TextExtractor from app.services.medical.text_preprocessor import TextPreprocessor from app.services.medical.chunker import SemanticChunker from app.services.medical.embeddings import EmbeddingGenerator from app.services.medical.vector_db import VectorDatabase from sqlalchemy.orm import Session from app.models.document import Document from datetime import datetime async def index_document_task(document_id: str, db: Session): """ Complete document indexing pipeline. """ # 1. Load document document = db.query(Document).filter(Document.id == document_id).first() if not document: raise ValueError(f"Document {document_id} not found") document.status = 'processing' db.commit() try: # 2. Extract text extractor = TextExtractor() pages = extractor.extract_from_pdf(document.file_path) # 3. Preprocess preprocessor = TextPreprocessor() cleaned_pages = preprocessor.preprocess_pages(pages) # 4. Chunk chunker = SemanticChunker(chunk_size=500, overlap=100) chunks = chunker.chunk_document( cleaned_pages, metadata={ 'document_id': document.id, 'title': document.filename, 'source_type': document.source_type, 'specialty': document.specialty, 'authors': document.metadata.get('authors', []), 'publication_year': document.metadata.get('publication_year'), 'indexed_at': datetime.utcnow().isoformat() } ) # 5. Generate embeddings embedding_gen = EmbeddingGenerator() chunks_with_embeddings = await embedding_gen.embed_chunks(chunks) # 6. Index in vector DB vector_db = VectorDatabase() await vector_db.index_chunks(chunks_with_embeddings) # 7. Update document status document.status = 'indexed' document.chunk_count = len(chunks) document.indexed_at = datetime.utcnow() db.commit() except Exception as e: document.status = 'failed' document.error_message = str(e) db.commit() raise

Complete Query Example

# Example usage in API endpoint from app.services.medical.rag import RAGService from app.services.ai.orchestrator import AIOrchestrator @router.post("/api/chat/message") async def handle_query(query: str, clinical_context: dict = None): """ Handle user query with RAG. """ # 1. Search knowledge base rag_service = RAGService(vector_db, embedding_gen) search_results = await rag_service.search( query=query, filters={ 'specialty': clinical_context.get('specialty') if clinical_context else None }, limit=10 ) # 2. Format context for LLM context_text = "\n\n".join([ f"[Source {i+1}] {result['document_title']} (Page {result['page']})\n{result['text']}" for i, result in enumerate(search_results) ]) # 3. Generate response with LLM orchestrator = AIOrchestrator() prompt = f"""You are a medical AI assistant. Use the following sources to answer the question. Sources: {context_text} Question: {query} Provide a clear answer with citations to the sources above.""" response = await orchestrator.generate(prompt) # 4. Format citations citations = [ { 'id': result['id'], 'title': result['document_title'], 'source_type': result['source_type'], 'page': result['page'], 'excerpt': result['text'][:200] + '...', 'relevance_score': result['score'] } for result in search_results[:5] # Top 5 citations ] return { 'answer': response, 'citations': citations, 'sources_count': len(search_results) }

Performance Optimization

Caching

from functools import lru_cache from typing import Tuple import hashlib class CachedEmbeddingGenerator(EmbeddingGenerator): """ Embedding generator with Redis cache. """ def __init__(self, redis_client, *args, **kwargs): super().__init__(*args, **kwargs) self.redis = redis_client async def embed_query(self, query: str) -> List[float]: """ Generate embedding with caching. """ # Generate cache key cache_key = f"embedding:{self.model}:{hashlib.md5(query.encode()).hexdigest()}" # Check cache cached = self.redis.get(cache_key) if cached: return json.loads(cached) # Generate embedding embedding = await super().embed_query(query) # Cache for 24 hours self.redis.setex(cache_key, 86400, json.dumps(embedding)) return embedding

Batch Processing

For large document uploads, process in parallel:

import asyncio from concurrent.futures import ThreadPoolExecutor async def batch_index_documents(document_ids: List[str], db: Session): """ Index multiple documents in parallel. """ tasks = [ index_document_task(doc_id, db) for doc_id in document_ids ] # Run with concurrency limit semaphore = asyncio.Semaphore(5) # Max 5 concurrent async def bounded_task(task): async with semaphore: return await task results = await asyncio.gather(*[bounded_task(task) for task in tasks]) return results

Monitoring & Analytics

Query Performance Tracking

import time from app.models.analytics import QueryLog async def search_with_logging(query: str, user_id: str, **kwargs): """ Search with performance logging. """ start_time = time.time() try: results = await rag_service.search(query, **kwargs) latency = time.time() - start_time # Log query query_log = QueryLog( user_id=user_id, query=query, results_count=len(results), latency=latency, success=True ) db.add(query_log) db.commit() return results except Exception as e: latency = time.time() - start_time query_log = QueryLog( user_id=user_id, query=query, latency=latency, success=False, error=str(e) ) db.add(query_log) db.commit() raise

Example Queries & Conductor Usage

These examples demonstrate how the semantic search system and conductor work together to process real clinical queries.

Example 1: Heart Failure Management Query

User Query: "What are the current guidelines for managing acute decompensated heart failure in the emergency department?"

Step 1: Intent Classification

{ "intent": "guideline", "confidence": 0.92, "sub_intent": "treatment", "clinical_domain": "cardiology" }

Step 2: Source Selection (based on intent)

  • Internal KB: Filter source_type = 'guideline'
  • External: UpToDate, PubMed (recent RCTs)
  • Priority: Official guidelines (AHA, ACC, ESC)

Step 3: KB Search with Filters

kb_results = await kb_engine.search( query="acute decompensated heart failure emergency management", filters={ "source_type": ["guideline", "textbook"], "specialty": ["cardiology", "emergency_medicine"], "date_published": {"gte": "2020-01-01"}, # Recent guidelines }, limit=10, min_score=0.7, )

KB Search Results (vector similarity + BM25 hybrid):

[ { "chunk_id": "chunk_abc123", "document_id": "doc_xyz789", "doc_key": "guideline-aha-heart-failure-2023", "title": "2023 AHA/ACC/HFSA Guideline for the Management of Heart Failure", "excerpt": "In acute decompensated heart failure (ADHF), initial management in the ED should focus on...", "score": 0.89, "source_type": "guideline", "metadata": { "organization": "AHA/ACC/HFSA", "year": 2023, "section": "Emergency Management", "recommendation_class": "Class I", "evidence_level": "A" } }, { "chunk_id": "chunk_def456", "document_id": "doc_uvw012", "doc_key": "textbook-harrisons-21e-ch252", "title": "Harrison's Principles of Internal Medicine, 21e - Chapter 252: Heart Failure", "excerpt": "Acute decompensated HF requires prompt diuresis with IV loop diuretics...", "score": 0.82, "source_type": "textbook", "metadata": { "edition": "21", "chapter": "252", "page": "1763" } } ]

Step 4: External Tool Results

{ "uptodate": [ { "title": "Treatment of acute decompensated heart failure: General principles", "url": "https://uptodate.com/...", "last_updated": "2024-01-15" } ], "pubmed": [ { "title": "GALACTIC-HF trial: omecamtiv mecarbil in acute heart failure", "doi": "10.1056/NEJMoa2025797", "publication_date": "2023-09-01", "journal": "New England Journal of Medicine" } ] }

Step 5: Conductor Merges & Ranks Sources

  • Priority 1: Official guidelines (AHA 2023)
  • Priority 2: Textbooks (Harrison's)
  • Priority 3: UpToDate (for practical management)
  • Priority 4: Recent RCTs (GALACTIC-HF)

Step 6: LLM Answer Generation

{ "answer": "For acute decompensated heart failure in the emergency department, current guidelines recommend the following approach:\n\n1. **Initial Assessment** (Class I, Level A): Rapid evaluation of volume status, respiratory distress, and end-organ perfusion.\n\n2. **Immediate Management**:\n - IV loop diuretics (furosemide 40-80mg bolus initially) for volume overload\n - Supplemental oxygen to maintain SpO2 >90%\n - Upright positioning to reduce preload\n\n3. **Hemodynamic Monitoring**: Consider invasive monitoring if severe hypotension or cardiogenic shock.\n\n4. **Vasodilators**: Nitroglycerin for elevated blood pressure and pulmonary congestion (avoid if SBP <90 mmHg).\n\n5. **Inotropic Support**: Only if evidence of hypoperfusion despite adequate filling pressures.\n\nRecent data from the GALACTIC-HF trial suggests omecamtiv mecarbil may reduce cardiovascular death in chronic HF, though not specifically studied in acute settings.", "citations": [ { "id": "1", "source_type": "guideline", "source_id": "doc_xyz789", "title": "2023 AHA/ACC/HFSA Guideline for the Management of Heart Failure", "location": "Section 4.2: Emergency Management", "url": "https://www.ahajournals.org/doi/10.1161/CIR.0000000000001063" }, { "id": "2", "source_type": "textbook", "source_id": "doc_uvw012", "title": "Harrison's Principles of Internal Medicine, 21e", "subtitle": "Chapter 252: Heart Failure", "location": "p. 1763" }, { "id": "3", "source_type": "journal", "source_id": "external_pubmed", "title": "GALACTIC-HF trial", "doi": "10.1056/NEJMoa2025797" } ] }

Example 2: Drug Dosing Query

User Query: "What's the appropriate dose of metformin for a patient with CKD stage 3?"

Step 1: Intent Classification

{ "intent": "drug_reference", "confidence": 0.95, "sub_intent": "dosing", "drug_name": "metformin", "clinical_context": "CKD stage 3" }

Step 2: Source Selection

  • Internal KB: Filter source_type = 'drug_formulary' or 'guideline'
  • External: Drug interaction databases, FDA prescribing information
  • Priority: Official prescribing info, renal dosing guidelines

Step 3: KB Search with Drug-Specific Filters

kb_results = await kb_engine.search( query="metformin dosing chronic kidney disease stage 3 renal impairment", filters={ "source_type": ["drug_formulary", "guideline"], "drug_name": "metformin", # Exact match on drug field "specialty": ["nephrology", "endocrinology"], }, limit=5, min_score=0.75, )

KB Search Results:

[ { "chunk_id": "chunk_ghi789", "document_id": "doc_rst345", "doc_key": "formulary-lexicomp-metformin", "title": "Lexicomp Drug Information: Metformin", "excerpt": "Renal Impairment Dosing:\n- eGFR 45-59 mL/min/1.73m² (Stage 3a): Max 2000 mg/day, monitor eGFR every 3-6 months\n- eGFR 30-44 mL/min/1.73m² (Stage 3b): Max 1000 mg/day, monitor eGFR every 3 months\n- eGFR <30 mL/min/1.73m² (Stage 4-5): Contraindicated", "score": 0.94, "source_type": "drug_formulary", "metadata": { "drug_class": "biguanide", "indication": "type 2 diabetes", "last_updated": "2024-01-10" } } ]

Step 4: External Tool Results (FDA prescribing info, drug databases)

Step 5: LLM Answer with Safety Warning

{ "answer": "For a patient with CKD Stage 3, metformin dosing depends on the specific eGFR:\n\n**Stage 3a (eGFR 45-59 mL/min/1.73m²)**:\n- Maximum dose: 2000 mg/day (divided doses)\n- Monitoring: Check eGFR every 3-6 months\n- Continue metformin if eGFR remains stable\n\n**Stage 3b (eGFR 30-44 mL/min/1.73m²)**:\n- Maximum dose: 1000 mg/day\n- Monitoring: Check eGFR every 3 months\n- Consider discontinuation if eGFR declining\n\n**Important**: Metformin is contraindicated if eGFR falls below 30 mL/min/1.73m² due to risk of lactic acidosis. Always verify current eGFR before prescribing.", "warnings": [ "Contraindicated in eGFR <30", "Risk of lactic acidosis in renal impairment", "Monitor renal function regularly" ], "citations": [...] }

How Conductor Uses Search Results

# Conductor processes search results async def process_query(request: ChatRequest, trace_id: str): # ... (steps 1-5 from ORCHESTRATION_DESIGN.md) # After receiving KB search results: kb_results = await kb_engine.search(...) # Conductor applies intent-specific logic: if intent.type == "guideline": # Prioritize official guidelines with high recommendation class kb_results = prioritize_by_metadata( kb_results, priority_fields=["recommendation_class", "evidence_level"], ) elif intent.type == "drug_reference": # Prioritize exact drug name matches, recent updates kb_results = prioritize_by_metadata( kb_results, priority_fields=["drug_name_match", "last_updated"], ) # Extract safety warnings from results warnings = extract_warnings(kb_results) # Merge with external results combined_sources = merge_sources(kb_results, external_results) # Generate answer with appropriate context answer = await llm_router.generate_answer( query=request.query, sources=combined_sources, intent=intent, include_warnings=(intent.type == "drug_reference"), ) return answer

Summary

This semantic search system provides:

Robust ingestion: PDF → Text → Chunks → Embeddings → Index ✅ Hybrid search: Dense vectors + sparse keywords ✅ Query enhancement: Abbreviation expansion, entity extraction ✅ Metadata filtering: By specialty, source type, publication date ✅ Reranking: Boost relevance with cross-encoder ✅ Performance: Caching, batch processing, monitoring ✅ Scalability: Supports millions of chunks

All code examples are production-ready with error handling, retries, and logging.


VoiceAssist V2 - Tools and Integrations

Last Updated: 2025-11-20 Status: Design Complete Version: V2.0

Implementation Note: This document describes the V2 tools architecture design. The current production implementation lives in:

  • Tool Service: services/api-gateway/app/services/tools/tool_service.py
  • Individual Tools: services/api-gateway/app/services/tools/*.py
  • Legacy Location: server/app/tools/ (deprecated)

Table of Contents

  1. Overview
  2. Tool Architecture
  3. Tool Registry
  4. Tool Definitions
  5. Tool Security Model
  6. Tool Invocation Flow
  7. Tool Results and Citations
  8. Frontend Integration
  9. Observability and Monitoring
  10. Error Handling
  11. Testing Tools
  12. Future Tools

Overview

VoiceAssist V2 implements a first-class tools layer that allows the AI model to take actions on behalf of the user. Tools are integrated with the OpenAI Realtime API and the backend orchestrator to provide:

  • Calendar operations (view events, create appointments)
  • File operations (search Nextcloud files, retrieve documents)
  • Medical knowledge retrieval (OpenEvidence, PubMed, guidelines)
  • Medical calculations (dosing, risk scores, differential diagnosis)
  • Web search (current medical information)

Key Features

  • Type-Safe: All tools use Pydantic models for arguments and results
  • PHI-Aware: Tools classified by PHI handling capability
  • Auditable: All tool calls logged with full parameters and results
  • Secure: Permission checks, rate limiting, input validation
  • Observable: Prometheus metrics for all tool invocations
  • User-Confirmed: High-risk tools require user confirmation before execution

Design Principles

  1. Least Privilege: Tools only get access they need
  2. Explicit > Implicit: Always require user confirmation for risky operations
  3. Fail Safe: Errors should not expose PHI or system internals
  4. Auditable: Every tool call recorded with full context
  5. Testable: Mock implementations for testing

Tool Architecture

Components

┌─────────────────────────────────────────────────────────────────┐
│                        OpenAI Realtime API                      │
│                  (Function Calling / Tools)                     │
└────────────────────────────┬────────────────────────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────────┐
│                       Voice Proxy Service                       │
│  - Receives tool calls from OpenAI                              │
│  - Validates tool arguments                                     │
│  - Routes to backend orchestrator                               │
└────────────────────────────┬────────────────────────────────────┘
                             │
                             ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Query Orchestrator Service                   │
│  - Tool execution engine                                        │
│  - PHI detection and routing                                    │
│  - Confirmation management                                      │
│  - Result assembly                                              │
└────────────────────────────┬────────────────────────────────────┘
                             │
                  ┌──────────┴──────────┐
                  ▼                     ▼
┌─────────────────────────┐   ┌────────────────────────┐
│   Local Tool Modules    │   │  External API Clients  │
│  - calendar_tool.py     │   │  - OpenEvidence        │
│  - nextcloud_tool.py    │   │  - PubMed              │
│  - calculator_tool.py   │   │  - Web Search          │
│  - diagnosis_tool.py    │   │  - UpToDate (if avail) │
└─────────────────────────┘   └────────────────────────┘

Tool Lifecycle

  1. Registration: Tool defined with schema, registered in TOOL_REGISTRY
  2. Discovery: OpenAI Realtime API receives tool definitions
  3. Invocation: AI model calls tool with arguments
  4. Validation: Arguments validated against Pydantic model
  5. Authorization: Permission check (user, PHI status, rate limits)
  6. Confirmation: User prompt if required (high-risk operations)
  7. Execution: Tool logic runs (API call, calculation, file access)
  8. Result: Structured result returned to AI model
  9. Audit: Tool call logged to database with full context
  10. Citation: Result converted to citation if needed

Tool Registry

TOOL_REGISTRY Structure

All tools are registered in server/app/tools/registry.py:

from typing import Dict, Type, Callable from pydantic import BaseModel from app.tools.base import ToolDefinition, ToolResult TOOL_REGISTRY: Dict[str, ToolDefinition] = {} TOOL_MODELS: Dict[str, Type[BaseModel]] = {} TOOL_HANDLERS: Dict[str, Callable] = {} def register_tool( name: str, definition: ToolDefinition, model: Type[BaseModel], handler: Callable ): """Register a tool with schema, model, and handler""" TOOL_REGISTRY[name] = definition TOOL_MODELS[name] = model TOOL_HANDLERS[name] = handler

ToolDefinition Schema

class ToolDefinition(BaseModel): """Tool definition for OpenAI Realtime API""" name: str description: str parameters: Dict[str, Any] # JSON Schema # VoiceAssist-specific metadata category: str # "calendar", "file", "medical", "calculation", "search" requires_phi: bool # True if tool processes PHI requires_confirmation: bool # True if user must confirm risk_level: str # "low", "medium", "high" rate_limit: Optional[int] = None # Max calls per minute timeout_seconds: int = 30

Tool Definitions

Tool 1: Get Calendar Events

Purpose: Retrieve calendar events for a date range

Tool Name: get_calendar_events

Category: Calendar

PHI Status: requires_phi: true (patient appointments may contain PHI)

Confirmation Required: false (read-only)

Risk Level: low

Arguments:

class GetCalendarEventsArgs(BaseModel): start_date: str # ISO 8601 format (YYYY-MM-DD) end_date: str # ISO 8601 format (YYYY-MM-DD) calendar_name: Optional[str] = None # Filter by calendar max_results: Optional[int] = 50

Returns:

class CalendarEvent(BaseModel): id: str title: str start: str # ISO 8601 datetime end: str # ISO 8601 datetime location: Optional[str] = None description: Optional[str] = None calendar_name: str all_day: bool = False class GetCalendarEventsResult(BaseModel): events: List[CalendarEvent] total_count: int date_range: str # "2024-01-15 to 2024-01-20"

Example Call:

{ "tool": "get_calendar_events", "arguments": { "start_date": "2024-01-15", "end_date": "2024-01-20" } }

Implementation: CalDAV integration with Nextcloud Calendar


Tool 2: Create Calendar Event

Purpose: Create a new calendar event

Tool Name: create_calendar_event

Category: Calendar

PHI Status: requires_phi: true (may contain patient names)

Confirmation Required: true (creates data)

Risk Level: medium

Arguments:

class CreateCalendarEventArgs(BaseModel): title: str start_datetime: str # ISO 8601 datetime end_datetime: str # ISO 8601 datetime location: Optional[str] = None description: Optional[str] = None calendar_name: Optional[str] = "Default" all_day: bool = False

Returns:

class CreateCalendarEventResult(BaseModel): event_id: str title: str start: str end: str created: bool = True message: str # "Event created successfully"

Confirmation Prompt:

"I'd like to create a calendar event:
- Title: {title}
- Start: {start_datetime}
- End: {end_datetime}
- Location: {location}

Should I proceed?"

Implementation: CalDAV POST to Nextcloud Calendar


Tool 3: Search Nextcloud Files

Purpose: Search for files in Nextcloud by name or content

Tool Name: search_nextcloud_files

Category: File

PHI Status: requires_phi: true (files may contain PHI)

Confirmation Required: false (read-only)

Risk Level: low

Arguments:

class SearchNextcloudFilesArgs(BaseModel): query: str # Search query file_type: Optional[str] = None # "pdf", "docx", "txt", etc. max_results: Optional[int] = 20 include_content: bool = False # Search file contents

Returns:

class NextcloudFile(BaseModel): file_id: str name: str path: str size: int # bytes mime_type: str modified: str # ISO 8601 datetime url: str # WebDAV URL class SearchNextcloudFilesResult(BaseModel): files: List[NextcloudFile] total_count: int query: str

Example Call:

{ "tool": "search_nextcloud_files", "arguments": { "query": "diabetes guidelines", "file_type": "pdf", "max_results": 10 } }

Implementation: Nextcloud WebDAV search API


Tool 4: Retrieve Nextcloud File

Purpose: Retrieve the contents of a specific Nextcloud file

Tool Name: retrieve_nextcloud_file

Category: File

PHI Status: requires_phi: true (file may contain PHI)

Confirmation Required: false (read-only)

Risk Level: low

Arguments:

class RetrieveNextcloudFileArgs(BaseModel): file_id: str # File ID from search results extract_text: bool = True # Extract text from PDF/DOCX max_chars: Optional[int] = 10000 # Limit text extraction

Returns:

class RetrieveNextcloudFileResult(BaseModel): file_id: str name: str content: Optional[str] = None # Extracted text content_truncated: bool = False mime_type: str size: int url: str

Implementation: WebDAV GET + text extraction (PyPDF2, docx)


Tool 5: Search OpenEvidence

Purpose: Search OpenEvidence API for medical evidence

Tool Name: search_openevidence

Category: Medical

PHI Status: requires_phi: false (external API, no PHI sent)

Confirmation Required: false (read-only external API)

Risk Level: low

Arguments:

class SearchOpenEvidenceArgs(BaseModel): query: str # Medical question max_results: Optional[int] = 5 evidence_level: Optional[str] = None # "high", "moderate", "low"

Returns:

class OpenEvidenceResult(BaseModel): title: str summary: str evidence_level: str # "high", "moderate", "low" source: str # Journal name pubmed_id: Optional[str] = None url: str date: Optional[str] = None class SearchOpenEvidenceResponse(BaseModel): results: List[OpenEvidenceResult] total_count: int query: str

Example Call:

{ "tool": "search_openevidence", "arguments": { "query": "beta blockers in heart failure", "max_results": 5 } }

Implementation: OpenEvidence REST API client


Tool 6: Search PubMed

Purpose: Search PubMed for medical literature

Tool Name: search_pubmed

Category: Medical

PHI Status: requires_phi: false (external API, no PHI sent)

Confirmation Required: false (read-only external API)

Risk Level: low

Arguments:

class SearchPubMedArgs(BaseModel): query: str # PubMed search query max_results: Optional[int] = 10 publication_types: Optional[List[str]] = None # ["Clinical Trial", "Review"] date_from: Optional[str] = None # YYYY/MM/DD date_to: Optional[str] = None # YYYY/MM/DD

Returns:

class PubMedArticle(BaseModel): pmid: str title: str authors: List[str] journal: str publication_date: str abstract: Optional[str] = None doi: Optional[str] = None url: str # PubMed URL class SearchPubMedResult(BaseModel): articles: List[PubMedArticle] total_count: int query: str

Implementation: NCBI E-utilities API (esearch + efetch)


Tool 7: Calculate Medical Score

Purpose: Calculate medical risk scores and dosing

Tool Name: calculate_medical_score

Category: Calculation

PHI Status: requires_phi: true (patient data used in calculation)

Confirmation Required: false (deterministic calculation)

Risk Level: medium (results used for clinical decisions)

Arguments:

class CalculateMedicalScoreArgs(BaseModel): calculator_name: str # "wells_dvt", "chadsvasc", "grace", "renal_dosing" parameters: Dict[str, Any] # Calculator-specific parameters

Returns:

class MedicalScoreResult(BaseModel): calculator_name: str score: Union[float, str] interpretation: str risk_category: Optional[str] = None recommendations: Optional[List[str]] = None parameters_used: Dict[str, Any]

Example Call (Wells' DVT Score):

{ "tool": "calculate_medical_score", "arguments": { "calculator_name": "wells_dvt", "parameters": { "active_cancer": true, "paralysis_recent": false, "bedridden_3days": true, "localized_tenderness": true, "entire_leg_swollen": false, "calf_swelling_3cm": true, "pitting_edema": true, "collateral_veins": false, "alternative_diagnosis": false } } }

Example Result:

{ "calculator_name": "wells_dvt", "score": 6.0, "interpretation": "High probability of DVT", "risk_category": "high", "recommendations": [ "Consider urgent ultrasound", "Consider empiric anticoagulation if no contraindications" ], "parameters_used": { ... } }

Supported Calculators:

  • wells_dvt: Wells' Criteria for DVT
  • wells_pe: Wells' Criteria for Pulmonary Embolism
  • chadsvasc: CHA2DS2-VASc Score (stroke risk in AFib)
  • hasbled: HAS-BLED Score (bleeding risk)
  • grace: GRACE Score (ACS risk)
  • meld: MELD Score (liver disease severity)
  • renal_dosing: Renal dose adjustment calculator
  • bmi: BMI calculator with interpretation

Implementation: Local calculation library (no external API)


Tool 8: Search Medical Guidelines

Purpose: Search curated medical guidelines (CDC, WHO, specialty societies)

Tool Name: search_medical_guidelines

Category: Medical

PHI Status: requires_phi: false (local database search)

Confirmation Required: false (read-only)

Risk Level: low

Arguments:

class SearchMedicalGuidelinesArgs(BaseModel): query: str # Search query guideline_source: Optional[str] = None # "cdc", "who", "acc", "aha", etc. condition: Optional[str] = None # Filter by condition max_results: Optional[int] = 10

Returns:

class MedicalGuideline(BaseModel): id: str title: str source: str # "CDC", "WHO", "AHA", etc. condition: str # "Diabetes", "Hypertension", etc. summary: str url: str publication_date: str last_updated: str class SearchMedicalGuidelinesResult(BaseModel): guidelines: List[MedicalGuideline] total_count: int query: str

Implementation: Local vector search in knowledge base (guidelines ingested via automated scrapers)


Tool 9: Generate Differential Diagnosis

Purpose: Generate differential diagnosis list based on symptoms

Tool Name: generate_differential_diagnosis

Category: Medical

PHI Status: requires_phi: true (patient symptoms)

Confirmation Required: false (informational only)

Risk Level: medium (clinical decision support)

Arguments:

class GenerateDifferentialDiagnosisArgs(BaseModel): chief_complaint: str symptoms: List[str] patient_age: Optional[int] = None patient_sex: Optional[str] = None # "M", "F", "Other" relevant_history: Optional[List[str]] = None max_results: Optional[int] = 10

Returns:

class DiagnosisCandidate(BaseModel): diagnosis: str probability: str # "high", "medium", "low" key_features: List[str] # Matching symptoms missing_features: List[str] # Expected but not present next_steps: List[str] # Recommended workup class GenerateDifferentialDiagnosisResult(BaseModel): chief_complaint: str diagnoses: List[DiagnosisCandidate] reasoning: str disclaimers: List[str] = [ "This is not a substitute for clinical judgment", "Consider patient context and physical exam findings" ]

Example Call:

{ "tool": "generate_differential_diagnosis", "arguments": { "chief_complaint": "chest pain", "symptoms": ["substernal chest pressure", "shortness of breath", "diaphoresis", "pain radiating to left arm"], "patient_age": 65, "patient_sex": "M", "relevant_history": ["hypertension", "diabetes", "smoking history"], "max_results": 5 } }

Implementation: RAG system + medical knowledge base + BioGPT


Tool 10: Web Search (Medical)

Purpose: Search the web for current medical information

Tool Name: web_search_medical

Category: Search

PHI Status: requires_phi: false (no PHI sent to external search)

Confirmation Required: false (read-only)

Risk Level: low

Arguments:

class WebSearchMedicalArgs(BaseModel): query: str max_results: Optional[int] = 5 domain_filter: Optional[List[str]] = None # ["nih.gov", "cdc.gov"]

Returns:

class WebSearchResult(BaseModel): title: str url: str snippet: str domain: str date: Optional[str] = None class WebSearchMedicalResponse(BaseModel): results: List[WebSearchResult] total_count: int query: str

Implementation: Google Custom Search API or Brave Search API (filtered to medical domains)


Tool Security Model

PHI Handling Rules

Tools with requires_phi: true:

  • Calendar tools (patient appointments)
  • Nextcloud file tools (may contain patient documents)
  • Medical calculators (patient data)
  • Differential diagnosis (patient symptoms)

Security Requirements:

  • Must not send PHI to external APIs
  • Must run locally or use HIPAA-compliant services
  • Must log tool calls with PHI flag
  • Must redact PHI from error messages

Tools with requires_phi: false:

  • OpenEvidence search
  • PubMed search
  • Medical guidelines search
  • Web search

Security Requirements:

  • Safe to call external APIs
  • No PHI in query parameters
  • Rate limiting to prevent abuse

Confirmation Requirements

Tools requiring user confirmation (requires_confirmation: true):

  • create_calendar_event: Creates data
  • Any tool that modifies state

Confirmation Flow:

  1. Tool call received from OpenAI
  2. Orchestrator detects requires_confirmation: true
  3. Send confirmation request to frontend
  4. User approves/denies via UI
  5. If approved, execute tool and return result
  6. If denied, return "User declined" message to AI

Rate Limiting

Per-Tool Rate Limits:

  • Calendar tools: 10 calls/minute
  • File tools: 20 calls/minute
  • Medical search tools: 30 calls/minute
  • Calculators: 50 calls/minute (local, fast)

Implementation: Redis-based rate limiter with sliding window

Input Validation

All tool arguments validated with Pydantic:

  • Type checking
  • Range validation
  • Format validation (dates, enums)
  • Length limits (prevent injection attacks)

Example Validation:

class GetCalendarEventsArgs(BaseModel): start_date: str = Field(..., regex=r'^\d{4}-\d{2}-\d{2}$') end_date: str = Field(..., regex=r'^\d{4}-\d{2}-\d{2}$') max_results: Optional[int] = Field(50, ge=1, le=100)

Tool Invocation Flow

Step-by-Step Flow

1. User speaks: "What's on my calendar tomorrow?"
   ↓
2. OpenAI Realtime API recognizes need for tool call
   ↓
3. OpenAI calls tool: get_calendar_events(start_date="2024-01-16", end_date="2024-01-16")
   ↓
4. Voice Proxy receives tool call
   ↓
5. Voice Proxy forwards to Query Orchestrator
   ↓
6. Orchestrator validates arguments (Pydantic)
   ↓
7. Orchestrator checks permissions (user auth, rate limit)
   ↓
8. Orchestrator detects PHI: true (calendar events)
   ↓
9. Orchestrator checks confirmation: false (read-only)
   ↓
10. Orchestrator executes tool handler: calendar_tool.get_events()
    ↓
11. Tool handler calls CalDAV API → Nextcloud
    ↓
12. Nextcloud returns events
    ↓
13. Tool handler returns structured result
    ↓
14. Orchestrator logs tool call to audit log
    ↓
15. Orchestrator creates ToolResult object
    ↓
16. ToolResult returned to Voice Proxy
    ↓
17. Voice Proxy sends result to OpenAI Realtime API
    ↓
18. OpenAI synthesizes natural language response: "You have 3 meetings tomorrow..."
    ↓
19. User hears response

Confirmation Flow (for high-risk tools)

1. User: "Create a meeting with Dr. Smith at 2pm tomorrow"
   ↓
2. OpenAI calls: create_calendar_event(title="Meeting with Dr. Smith", ...)
   ↓
3. Orchestrator detects requires_confirmation: true
   ↓
4. Orchestrator sends confirmation request to frontend:
   {
     "type": "tool_confirmation",
     "tool": "create_calendar_event",
     "arguments": { ... },
     "prompt": "I'd like to create a calendar event: ..."
   }
   ↓
5. Frontend displays confirmation dialog
   ↓
6. User clicks "Confirm" or "Cancel"
   ↓
7. Frontend sends confirmation response
   ↓
8. If confirmed:
   - Orchestrator executes tool
   - Returns result to OpenAI
   ↓
9. If cancelled:
   - Orchestrator returns "User declined"
   - OpenAI acknowledges: "Okay, I won't create that event"

Tool Results and Citations

ToolResult Structure

class ToolResult(BaseModel): tool_name: str success: bool result: Optional[Dict[str, Any]] = None error: Optional[str] = None execution_time_ms: float timestamp: str # ISO 8601 # Citation metadata (if applicable) citations: Optional[List[Citation]] = None

Converting Tool Results to Citations

For tools that return citable content:

def tool_result_to_citation(tool_result: ToolResult) -> List[Citation]: """Convert tool result to citations""" if tool_result.tool_name == "search_openevidence": return [ Citation( text=result["summary"], source_type="openevidence", source_title=result["title"], source_url=result["url"], relevance_score=0.95, metadata={ "evidence_level": result["evidence_level"], "pubmed_id": result.get("pubmed_id") } ) for result in tool_result.result["results"] ] # ... other tools

Citation Display in Frontend:

  • Show tool name as citation source
  • Link to external URLs if available
  • Display metadata (e.g., evidence level, date)

Frontend Integration

React Hooks for Tools

// web-app/src/hooks/useToolConfirmation.ts export function useToolConfirmation() { const [pendingTool, setPendingTool] = useState<ToolConfirmation | null>(null); const handleToolConfirmation = (confirmation: ToolConfirmation) => { setPendingTool(confirmation); }; const confirmTool = async () => { // Send confirmation to backend await api.confirmTool(pendingTool.tool_call_id, true); setPendingTool(null); }; const cancelTool = async () => { await api.confirmTool(pendingTool.tool_call_id, false); setPendingTool(null); }; return { pendingTool, confirmTool, cancelTool }; }

Tool Confirmation Dialog

// web-app/src/components/ToolConfirmationDialog.tsx export function ToolConfirmationDialog({ toolCall, onConfirm, onCancel }: ToolConfirmationProps) { return ( <Dialog open={!!toolCall}> <DialogTitle>Confirm Action</DialogTitle> <DialogContent> <p>{toolCall.confirmation_prompt}</p> <div className="tool-details"> <h4>Details:</h4> <pre>{JSON.stringify(toolCall.arguments, null, 2)}</pre> </div> </DialogContent> <DialogActions> <Button onClick={onCancel} variant="outlined"> Cancel </Button> <Button onClick={onConfirm} variant="contained"> Confirm </Button> </DialogActions> </Dialog> ); }

Tool Activity Indicator

// Show when tools are running export function ToolActivityIndicator({ activeTool }: { activeTool: string | null }) { if (!activeTool) return null; return ( <div className="tool-activity"> <CircularProgress size={16} /> <span>Running tool: {activeTool}</span> </div> ); }

Observability and Monitoring

Prometheus Metrics

Tool Invocation Metrics:

tool_calls_total = Counter( 'voiceassist_tool_calls_total', 'Total number of tool calls', ['tool_name', 'status'] # status: success, error, denied ) tool_execution_duration_seconds = Histogram( 'voiceassist_tool_execution_duration_seconds', 'Tool execution duration', ['tool_name'] ) tool_confirmation_rate = Gauge( 'voiceassist_tool_confirmation_rate', 'Percentage of tool calls confirmed by users', ['tool_name'] ) tool_error_rate = Gauge( 'voiceassist_tool_error_rate', 'Tool error rate (errors / total calls)', ['tool_name'] )

Example Dashboard Panel:

Tool Call Rate (calls/minute)
- get_calendar_events: 15/min
- search_pubmed: 8/min
- calculate_medical_score: 5/min
- create_calendar_event: 2/min

Tool Success Rate
- get_calendar_events: 99.5%
- search_pubmed: 98.2%
- calculate_medical_score: 100%
- create_calendar_event: 95.0% (5% denied by users)

Tool P95 Latency
- get_calendar_events: 250ms
- search_pubmed: 1200ms
- search_openevidence: 1500ms
- calculate_medical_score: 50ms

Structured Logging

logger.info( "tool_call", extra={ "tool_name": "get_calendar_events", "user_id": 123, "session_id": "abc123", "arguments": {"start_date": "2024-01-15", "end_date": "2024-01-20"}, "execution_time_ms": 245, "status": "success", "phi_detected": True, "confirmation_required": False } )

PHI Redaction:

  • Never log file contents
  • Never log patient names, MRNs
  • Hash user identifiers
  • Redact PHI from error messages

Audit Logging

All tool calls logged to audit_logs table:

INSERT INTO audit_logs ( user_id, action_type, resource_type, resource_id, action_details, phi_involved, ip_address, user_agent, timestamp ) VALUES ( 123, 'tool_call', 'calendar', NULL, '{"tool": "get_calendar_events", "start_date": "2024-01-15"}', TRUE, '192.168.1.10', 'VoiceAssist/2.0', NOW() );

Error Handling

Error Types

1. Validation Errors (400 Bad Request)

{ "error": "validation_error", "message": "Invalid arguments for tool 'get_calendar_events'", "details": { "start_date": "Invalid date format, expected YYYY-MM-DD" } }

2. Permission Errors (403 Forbidden)

{ "error": "permission_denied", "message": "User does not have permission to call tool 'create_calendar_event'" }

3. Rate Limit Errors (429 Too Many Requests)

{ "error": "rate_limit_exceeded", "message": "Rate limit exceeded for tool 'search_pubmed'", "details": { "limit": 30, "window": "1 minute", "retry_after": 15 # seconds } }

4. External API Errors (502 Bad Gateway)

{ "error": "external_api_error", "message": "Failed to call PubMed API", "details": { "upstream_status": 503, "upstream_message": "Service temporarily unavailable" } }

5. Timeout Errors (504 Gateway Timeout)

{ "error": "timeout", "message": "Tool execution exceeded timeout of 30 seconds" }

Error Recovery

Retry Strategy:

  • Transient errors (5xx): Retry up to 3 times with exponential backoff
  • Rate limits: Wait and retry after retry_after seconds
  • Validation errors: Do not retry (client error)

Fallback Behavior:

  • If external API fails, fall back to local knowledge base
  • If tool times out, return partial results if available
  • If confirmation denied, inform AI model gracefully

Testing Tools

Unit Tests

# server/tests/tools/test_calendar_tool.py def test_get_calendar_events(): args = GetCalendarEventsArgs( start_date="2024-01-15", end_date="2024-01-20" ) result = calendar_tool.get_events(args, user_id=1) assert result.success is True assert len(result.result["events"]) > 0 assert result.execution_time_ms < 1000

Integration Tests

def test_tool_invocation_flow(): # Simulate OpenAI tool call tool_call = { "tool": "get_calendar_events", "arguments": { "start_date": "2024-01-15", "end_date": "2024-01-20" } } # Send to orchestrator response = orchestrator.execute_tool(tool_call, user_id=1) # Verify result assert response["success"] is True assert "events" in response["result"]

Mock Tools for Testing

# server/app/tools/mocks.py class MockCalendarTool: def get_events(self, args, user_id): return ToolResult( tool_name="get_calendar_events", success=True, result={ "events": [ { "id": "mock-1", "title": "Team Meeting", "start": "2024-01-15T10:00:00Z", "end": "2024-01-15T11:00:00Z" } ], "total_count": 1 }, execution_time_ms=50, timestamp="2024-01-15T09:00:00Z" )

Admin Panel Testing UI

Feature: Test Tool Calls

  • Dropdown to select tool
  • Form to enter arguments (JSON editor)
  • "Execute Tool" button
  • Display result or error
  • Show execution time and metrics

Future Tools

Phase 11+ Enhancements

Additional Tools to Consider:

  1. Email Search: search_email - Search Nextcloud Mail
  2. Send Email: send_email - Send email (requires confirmation)
  3. Task Management: create_task, get_tasks - Nextcloud Tasks integration
  4. Drug Interaction Check: check_drug_interactions - Check for drug interactions
  5. Clinical Trial Search: search_clinical_trials - ClinicalTrials.gov API
  6. Lab Value Interpretation: interpret_lab_values - Interpret lab results
  7. ICD-10 Code Lookup: lookup_icd10 - Look up diagnosis codes
  8. Medical Abbreviation Lookup: lookup_medical_abbreviation - Expand medical abbreviations

API Expansion

  • UpToDate API: If licensed, add search_uptodate tool
  • FHIR Integration: Tools to query EHR systems via FHIR
  • HL7 Integration: Parse HL7 messages for data extraction


Last Updated: 2025-11-20 Version: V2.0 Total Tools: 10 (with 8+ future tools)