2:I[7012,["4765","static/chunks/4765-f5afdf8061f456f3.js","9856","static/chunks/9856-3b185291364d9bef.js","6687","static/chunks/app/docs/%5B...slug%5D/page-e07536548216bee4.js"],"MarkdownRenderer"] 4:I[9856,["4765","static/chunks/4765-f5afdf8061f456f3.js","9856","static/chunks/9856-3b185291364d9bef.js","6687","static/chunks/app/docs/%5B...slug%5D/page-e07536548216bee4.js"],""] 5:I[4126,[],""] 7:I[9630,[],""] 8:I[4278,["9856","static/chunks/9856-3b185291364d9bef.js","8172","static/chunks/8172-b3a2d6fe4ae10d40.js","3185","static/chunks/app/layout-2814fa5d15b84fe4.js"],"HeadingProvider"] 9:I[1476,["9856","static/chunks/9856-3b185291364d9bef.js","8172","static/chunks/8172-b3a2d6fe4ae10d40.js","3185","static/chunks/app/layout-2814fa5d15b84fe4.js"],"Header"] a:I[3167,["9856","static/chunks/9856-3b185291364d9bef.js","8172","static/chunks/8172-b3a2d6fe4ae10d40.js","3185","static/chunks/app/layout-2814fa5d15b84fe4.js"],"Sidebar"] b:I[7409,["9856","static/chunks/9856-3b185291364d9bef.js","8172","static/chunks/8172-b3a2d6fe4ae10d40.js","3185","static/chunks/app/layout-2814fa5d15b84fe4.js"],"PageFrame"] 3:T269e1, # Part 2: Deferred Backend Features Implementation Plan **Version:** 1.0 **Date:** 2025-11-26 **Status:** Planning Complete - Ready for Implementation **Priority:** High - Enables Full Frontend Functionality **Estimated Total Effort:** 17-21 weeks (2-3 developers) --- ## Executive Summary This document consolidates and expands upon all deferred backend features from the VoiceAssist development phases. It provides detailed implementation plans with improvements, optimizations, comprehensive testing strategies, and security considerations. **Sources Consolidated:** - `BACKEND_IMPLEMENTATION_PLAN.md` - Priority 2-3 features - `CONTINUOUS_IMPROVEMENT_PLAN.md` - Phase 4-6 deferrals - Phase completion reports - Deferred items - Frontend requirements - Backend gaps **Key Deliverables:** 1. Advanced File Processing Pipeline 2. Conversation Sharing & Collaboration 3. Full Voice Pipeline (WebRTC, VAD, Barge-in, Voice Auth) 4. OpenAI Realtime API Integration 5. Advanced Medical AI (BioGPT, PubMedBERT, Medical NER) 6. External Medical Integrations (UpToDate, OpenEvidence, PubMed) 7. Medical Calculators Library 8. OIDC/SSO Authentication with MFA 9. Complete Email Integration (IMAP/SMTP) 10. CardDAV Contacts Synchronization 11. Google Calendar Sync 12. Nextcloud App Store Packaging 13. Advanced RAG (Hybrid Search, Re-ranking) 14. Multi-Hop Reasoning Engine --- ## Table of Contents 1. [Feature Category A: File & Media Processing](#feature-category-a-file--media-processing) 2. [Feature Category B: Collaboration Features](#feature-category-b-collaboration-features) 3. [Feature Category C: Voice Pipeline Completion](#feature-category-c-voice-pipeline-completion) 4. [Feature Category D: Advanced Medical AI](#feature-category-d-advanced-medical-ai) 5. [Feature Category E: External Medical Integrations](#feature-category-e-external-medical-integrations) 6. [Feature Category F: Authentication & Security](#feature-category-f-authentication--security) 7. [Feature Category G: Nextcloud Integration Completion](#feature-category-g-nextcloud-integration-completion) 8. [Feature Category H: Advanced RAG & Reasoning](#feature-category-h-advanced-rag--reasoning) 9. [Comprehensive Testing Strategy](#comprehensive-testing-strategy) 10. [Performance Benchmarks](#performance-benchmarks) 11. [Security Considerations](#security-considerations) 12. [Implementation Timeline](#implementation-timeline) 13. [Dependencies & Prerequisites](#dependencies--prerequisites) 14. [Risk Assessment & Mitigation](#risk-assessment--mitigation) --- ## Feature Category A: File & Media Processing ### A.1 Advanced File Processing Pipeline **Original Scope:** Basic file upload with text extraction **Enhanced Scope:** Full document intelligence pipeline with OCR, medical entity extraction, and contextual indexing #### A.1.1 Implementation Details **Database Schema:** ```sql -- Enhanced file processing tracking CREATE TABLE file_processing_jobs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), attachment_id UUID NOT NULL REFERENCES message_attachments(id) ON DELETE CASCADE, status VARCHAR(50) NOT NULL DEFAULT 'pending', -- pending, processing, completed, failed processing_type VARCHAR(50) NOT NULL, -- ocr, text_extraction, entity_extraction, embedding input_params JSONB, output_data JSONB, error_message TEXT, processing_time_ms INTEGER, worker_id VARCHAR(100), retries INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), started_at TIMESTAMP WITH TIME ZONE, completed_at TIMESTAMP WITH TIME ZONE ); -- Extracted entities from documents CREATE TABLE document_entities ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), attachment_id UUID NOT NULL REFERENCES message_attachments(id) ON DELETE CASCADE, entity_type VARCHAR(100) NOT NULL, -- medication, diagnosis, procedure, lab_value, vital_sign entity_value TEXT NOT NULL, entity_normalized TEXT, -- standardized form (e.g., RxNorm, ICD-10) confidence DECIMAL(3,2), position_start INTEGER, position_end INTEGER, metadata JSONB, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); CREATE INDEX idx_file_processing_jobs_status ON file_processing_jobs(status); CREATE INDEX idx_file_processing_jobs_attachment ON file_processing_jobs(attachment_id); CREATE INDEX idx_document_entities_type ON document_entities(entity_type); CREATE INDEX idx_document_entities_attachment ON document_entities(attachment_id); ``` **Service Implementation:** ```python # services/api-gateway/app/services/file_processing_service.py from abc import ABC, abstractmethod from typing import List, Dict, Any, Optional from dataclasses import dataclass import asyncio from concurrent.futures import ThreadPoolExecutor @dataclass class ProcessingResult: """Result of file processing operation""" success: bool extracted_text: Optional[str] = None entities: Optional[List[Dict[str, Any]]] = None embeddings: Optional[List[float]] = None metadata: Optional[Dict[str, Any]] = None error: Optional[str] = None processing_time_ms: int = 0 class FileProcessor(ABC): """Abstract base class for file processors""" @abstractmethod async def process(self, file_path: str, options: Dict[str, Any]) -> ProcessingResult: pass @abstractmethod def supports_type(self, mime_type: str) -> bool: pass class PDFProcessor(FileProcessor): """PDF processing with OCR fallback""" def __init__(self): self.executor = ThreadPoolExecutor(max_workers=4) def supports_type(self, mime_type: str) -> bool: return mime_type in ['application/pdf', 'application/x-pdf'] async def process(self, file_path: str, options: Dict[str, Any]) -> ProcessingResult: import time start_time = time.time() try: # Try text extraction first (faster) text = await self._extract_text(file_path) # If minimal text, fall back to OCR if len(text.strip()) < 100: text = await self._perform_ocr(file_path) # Extract medical entities entities = await self._extract_medical_entities(text) # Generate embeddings for searchability embeddings = await self._generate_embeddings(text) processing_time = int((time.time() - start_time) * 1000) return ProcessingResult( success=True, extracted_text=text, entities=entities, embeddings=embeddings, metadata={ 'page_count': await self._get_page_count(file_path), 'extraction_method': 'ocr' if len(text.strip()) < 100 else 'text' }, processing_time_ms=processing_time ) except Exception as e: return ProcessingResult( success=False, error=str(e), processing_time_ms=int((time.time() - start_time) * 1000) ) async def _extract_text(self, file_path: str) -> str: """Extract text using PyPDF2""" import pypdf loop = asyncio.get_event_loop() def extract(): reader = pypdf.PdfReader(file_path) text_parts = [] for page in reader.pages: text_parts.append(page.extract_text()) return '\n'.join(text_parts) return await loop.run_in_executor(self.executor, extract) async def _perform_ocr(self, file_path: str) -> str: """Perform OCR using Tesseract via pdf2image""" import pytesseract from pdf2image import convert_from_path loop = asyncio.get_event_loop() def ocr(): images = convert_from_path(file_path, dpi=300) text_parts = [] for image in images: text_parts.append(pytesseract.image_to_string(image)) return '\n'.join(text_parts) return await loop.run_in_executor(self.executor, ocr) async def _extract_medical_entities(self, text: str) -> List[Dict[str, Any]]: """Extract medical entities using spaCy with scispacy models""" # Placeholder - integrate with medical NER model # Consider: scispacy, clinicalBERT, or custom trained model return [] async def _generate_embeddings(self, text: str) -> List[float]: """Generate embeddings for vector search""" from app.services.embedding_service import EmbeddingService embedding_service = EmbeddingService() return await embedding_service.generate_embedding(text[:8000]) # Token limit async def _get_page_count(self, file_path: str) -> int: import pypdf reader = pypdf.PdfReader(file_path) return len(reader.pages) class ImageProcessor(FileProcessor): """Image processing with OCR and medical image analysis""" SUPPORTED_TYPES = [ 'image/png', 'image/jpeg', 'image/jpg', 'image/tiff', 'image/bmp', 'image/gif', 'image/webp' ] def supports_type(self, mime_type: str) -> bool: return mime_type in self.SUPPORTED_TYPES async def process(self, file_path: str, options: Dict[str, Any]) -> ProcessingResult: import time import pytesseract from PIL import Image start_time = time.time() try: image = Image.open(file_path) # Perform OCR text = pytesseract.image_to_string(image) # Image metadata metadata = { 'width': image.width, 'height': image.height, 'format': image.format, 'mode': image.mode } # Optional: Medical image classification (if enabled) if options.get('medical_classification', False): # Integrate with medical vision model pass processing_time = int((time.time() - start_time) * 1000) return ProcessingResult( success=True, extracted_text=text if text.strip() else None, metadata=metadata, processing_time_ms=processing_time ) except Exception as e: return ProcessingResult( success=False, error=str(e), processing_time_ms=int((time.time() - start_time) * 1000) ) class FileProcessingOrchestrator: """Orchestrates file processing across different processors""" def __init__(self): self.processors: List[FileProcessor] = [ PDFProcessor(), ImageProcessor(), # Add more processors: MarkdownProcessor, TextProcessor, etc. ] def get_processor(self, mime_type: str) -> Optional[FileProcessor]: for processor in self.processors: if processor.supports_type(mime_type): return processor return None async def process_file( self, file_path: str, mime_type: str, options: Optional[Dict[str, Any]] = None ) -> ProcessingResult: processor = self.get_processor(mime_type) if not processor: return ProcessingResult( success=False, error=f"Unsupported file type: {mime_type}" ) return await processor.process(file_path, options or {}) ``` **API Endpoints:** ```python # services/api-gateway/app/api/file_processing.py from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from typing import List router = APIRouter(prefix="/api/files", tags=["file-processing"]) @router.post("/{attachment_id}/process") async def trigger_file_processing( attachment_id: str, options: FileProcessingOptions, background_tasks: BackgroundTasks, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ Trigger asynchronous file processing. Options: - extract_text: bool - Extract text content - extract_entities: bool - Extract medical entities - generate_embeddings: bool - Generate searchable embeddings - ocr_fallback: bool - Use OCR if text extraction fails """ attachment = await get_attachment_or_404(attachment_id, current_user, db) # Create processing job job = FileProcessingJob( attachment_id=attachment_id, status='pending', processing_type='full', input_params=options.dict() ) db.add(job) db.commit() # Queue background processing background_tasks.add_task( process_file_task, job_id=str(job.id), file_path=attachment.file_path, mime_type=attachment.mime_type, options=options.dict() ) return {"job_id": str(job.id), "status": "queued"} @router.get("/{attachment_id}/processing-status") async def get_processing_status( attachment_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Get the processing status of all jobs for an attachment""" jobs = db.query(FileProcessingJob)\ .filter(FileProcessingJob.attachment_id == attachment_id)\ .order_by(FileProcessingJob.created_at.desc())\ .all() return {"jobs": [job.to_dict() for job in jobs]} @router.get("/{attachment_id}/entities") async def get_extracted_entities( attachment_id: str, entity_type: Optional[str] = None, min_confidence: float = 0.7, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Get extracted entities from a processed document""" query = db.query(DocumentEntity)\ .filter(DocumentEntity.attachment_id == attachment_id)\ .filter(DocumentEntity.confidence >= min_confidence) if entity_type: query = query.filter(DocumentEntity.entity_type == entity_type) entities = query.all() return {"entities": [e.to_dict() for e in entities]} @router.post("/{attachment_id}/include-in-context") async def include_in_rag_context( attachment_id: str, context_options: ContextInclusionOptions, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ Include processed document in RAG context for a conversation. This adds the document's extracted text and entities to the conversation's context window for improved AI responses. """ attachment = await get_attachment_or_404(attachment_id, current_user, db) # Get processing results job = db.query(FileProcessingJob)\ .filter(FileProcessingJob.attachment_id == attachment_id)\ .filter(FileProcessingJob.status == 'completed')\ .first() if not job: raise HTTPException( status_code=400, detail="File has not been processed yet" ) # Add to conversation context await context_service.add_document_context( conversation_id=context_options.conversation_id, document_text=job.output_data.get('extracted_text'), entities=job.output_data.get('entities'), embeddings=job.output_data.get('embeddings') ) return {"message": "Document added to conversation context"} ``` #### A.1.2 Improvements & Optimizations 1. **Parallel Processing Pipeline:** - Use Celery or Redis Queue for distributed processing - Process multiple pages concurrently - Implement job prioritization based on file size 2. **Intelligent OCR Selection:** - Detect image-only vs text PDFs automatically - Use different OCR engines based on document type - Implement language detection for multi-language OCR 3. **Caching Layer:** - Cache extracted text for repeated access - Store embeddings in vector DB for semantic search - Implement content-based deduplication 4. **Medical Entity Enhancement:** - Integrate scispaCy for medical NER - Map entities to standard ontologies (RxNorm, ICD-10, SNOMED) - Build entity relationship graphs 5. **Resource Management:** - Implement memory-efficient streaming for large files - Add file size limits with graceful degradation - Monitor and limit CPU/memory per job #### A.1.3 Testing Strategy ```python # tests/unit/test_file_processing.py import pytest from unittest.mock import Mock, patch, AsyncMock from app.services.file_processing_service import ( PDFProcessor, ImageProcessor, FileProcessingOrchestrator, ProcessingResult ) class TestPDFProcessor: @pytest.fixture def processor(self): return PDFProcessor() def test_supports_pdf_mime_types(self, processor): assert processor.supports_type('application/pdf') is True assert processor.supports_type('application/x-pdf') is True assert processor.supports_type('image/png') is False @pytest.mark.asyncio async def test_extract_text_from_text_pdf(self, processor, sample_text_pdf): """Test text extraction from a PDF with embedded text""" result = await processor.process(sample_text_pdf, {}) assert result.success is True assert result.extracted_text is not None assert len(result.extracted_text) > 100 assert result.metadata['extraction_method'] == 'text' @pytest.mark.asyncio async def test_ocr_fallback_for_scanned_pdf(self, processor, sample_scanned_pdf): """Test OCR fallback for scanned documents""" result = await processor.process(sample_scanned_pdf, {}) assert result.success is True assert result.extracted_text is not None assert result.metadata['extraction_method'] == 'ocr' @pytest.mark.asyncio async def test_handles_corrupted_pdf(self, processor, corrupted_pdf): """Test graceful handling of corrupted files""" result = await processor.process(corrupted_pdf, {}) assert result.success is False assert result.error is not None @pytest.mark.asyncio async def test_processing_time_recorded(self, processor, sample_text_pdf): result = await processor.process(sample_text_pdf, {}) assert result.processing_time_ms > 0 assert result.processing_time_ms < 60000 # Should complete within 60s class TestImageProcessor: @pytest.fixture def processor(self): return ImageProcessor() @pytest.mark.parametrize("mime_type,expected", [ ('image/png', True), ('image/jpeg', True), ('image/tiff', True), ('application/pdf', False), ('text/plain', False), ]) def test_supports_image_types(self, processor, mime_type, expected): assert processor.supports_type(mime_type) == expected @pytest.mark.asyncio async def test_extract_text_from_image(self, processor, sample_text_image): """Test OCR on an image with text""" result = await processor.process(sample_text_image, {}) assert result.success is True assert result.metadata['width'] > 0 assert result.metadata['height'] > 0 class TestFileProcessingOrchestrator: @pytest.fixture def orchestrator(self): return FileProcessingOrchestrator() def test_selects_correct_processor(self, orchestrator): pdf_processor = orchestrator.get_processor('application/pdf') assert isinstance(pdf_processor, PDFProcessor) image_processor = orchestrator.get_processor('image/png') assert isinstance(image_processor, ImageProcessor) def test_returns_none_for_unsupported_type(self, orchestrator): processor = orchestrator.get_processor('application/unknown') assert processor is None @pytest.mark.asyncio async def test_process_unsupported_type(self, orchestrator): result = await orchestrator.process_file( '/path/to/file', 'application/unknown' ) assert result.success is False assert 'unsupported' in result.error.lower() # Integration tests class TestFileProcessingIntegration: @pytest.mark.integration @pytest.mark.asyncio async def test_end_to_end_pdf_processing( self, test_client, authenticated_user, sample_pdf_upload ): """Test complete file processing workflow""" # Upload file upload_response = await test_client.post( f"/api/messages/{sample_message_id}/attachments", files={"file": sample_pdf_upload}, headers=authenticated_user.headers ) attachment_id = upload_response.json()['id'] # Trigger processing process_response = await test_client.post( f"/api/files/{attachment_id}/process", json={"extract_text": True, "extract_entities": True}, headers=authenticated_user.headers ) job_id = process_response.json()['job_id'] # Wait for completion await wait_for_job_completion(job_id, timeout=60) # Verify results status_response = await test_client.get( f"/api/files/{attachment_id}/processing-status", headers=authenticated_user.headers ) jobs = status_response.json()['jobs'] assert len(jobs) > 0 assert jobs[0]['status'] == 'completed' @pytest.mark.integration @pytest.mark.asyncio async def test_parallel_file_processing( self, test_client, authenticated_user ): """Test processing multiple files concurrently""" attachment_ids = await upload_multiple_files(5) # Trigger all processing jobs tasks = [ test_client.post( f"/api/files/{aid}/process", json={"extract_text": True}, headers=authenticated_user.headers ) for aid in attachment_ids ] responses = await asyncio.gather(*tasks) job_ids = [r.json()['job_id'] for r in responses] # Wait for all to complete await asyncio.gather(*[ wait_for_job_completion(jid, timeout=120) for jid in job_ids ]) # Verify all completed successfully for aid in attachment_ids: status = await test_client.get( f"/api/files/{aid}/processing-status", headers=authenticated_user.headers ) assert status.json()['jobs'][0]['status'] == 'completed' # Load tests class TestFileProcessingLoad: @pytest.mark.load @pytest.mark.asyncio async def test_concurrent_processing_limit(self): """Test system behavior at processing capacity""" # Simulate 50 concurrent file processing requests pass @pytest.mark.load @pytest.mark.asyncio async def test_large_file_processing(self): """Test processing of large files (50MB+)""" pass ``` --- ### A.2 Conversation Sharing & Collaboration **Original Scope:** Not implemented **Enhanced Scope:** Full sharing with permissions, expiration, and collaboration features #### A.2.1 Implementation Details **Database Schema:** ```sql -- Conversation sharing CREATE TABLE conversation_shares ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE, created_by UUID NOT NULL REFERENCES users(id), share_token VARCHAR(64) UNIQUE NOT NULL, -- Secure random token share_type VARCHAR(50) NOT NULL, -- 'public_link', 'user_share', 'team_share' permission_level VARCHAR(50) NOT NULL DEFAULT 'view', -- 'view', 'comment', 'edit' expires_at TIMESTAMP WITH TIME ZONE, max_views INTEGER, current_views INTEGER DEFAULT 0, password_hash VARCHAR(255), -- Optional password protection require_authentication BOOLEAN DEFAULT FALSE, allow_download BOOLEAN DEFAULT FALSE, metadata JSONB, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- Share recipients (for user/team shares) CREATE TABLE share_recipients ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), share_id UUID NOT NULL REFERENCES conversation_shares(id) ON DELETE CASCADE, recipient_type VARCHAR(50) NOT NULL, -- 'user', 'team', 'email' recipient_id VARCHAR(255) NOT NULL, -- user_id, team_id, or email accepted_at TIMESTAMP WITH TIME ZONE, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- Share access logs (audit trail) CREATE TABLE share_access_logs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), share_id UUID NOT NULL REFERENCES conversation_shares(id) ON DELETE CASCADE, accessor_id UUID REFERENCES users(id), -- NULL for anonymous accessor_ip VARCHAR(45), accessor_user_agent TEXT, access_type VARCHAR(50) NOT NULL, -- 'view', 'download', 'comment' accessed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- Conversation comments (for collaboration) CREATE TABLE conversation_comments ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE, message_id UUID REFERENCES messages(id) ON DELETE SET NULL, -- Optional: comment on specific message user_id UUID NOT NULL REFERENCES users(id), parent_comment_id UUID REFERENCES conversation_comments(id), -- For threaded comments content TEXT NOT NULL, is_resolved BOOLEAN DEFAULT FALSE, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); CREATE INDEX idx_shares_token ON conversation_shares(share_token); CREATE INDEX idx_shares_conversation ON conversation_shares(conversation_id); CREATE INDEX idx_share_recipients_share ON share_recipients(share_id); CREATE INDEX idx_share_access_logs_share ON share_access_logs(share_id); CREATE INDEX idx_comments_conversation ON conversation_comments(conversation_id); CREATE INDEX idx_comments_message ON conversation_comments(message_id); ``` **API Endpoints:** ```python # services/api-gateway/app/api/sharing.py from fastapi import APIRouter, Depends, HTTPException, Request from typing import Optional, List import secrets router = APIRouter(prefix="/api/sharing", tags=["sharing"]) @router.post("/conversations/{conversation_id}/share") async def create_share_link( conversation_id: str, share_options: ShareOptions, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ Create a shareable link for a conversation. Options: - share_type: 'public_link' | 'user_share' | 'team_share' - permission_level: 'view' | 'comment' | 'edit' - expires_at: Optional expiration datetime - max_views: Optional maximum number of views - password: Optional password protection - require_authentication: Require login to view - recipients: List of user IDs or emails (for user_share) """ # Verify ownership conversation = await verify_conversation_ownership( conversation_id, current_user, db ) # Generate secure token share_token = secrets.token_urlsafe(32) # Create share record share = ConversationShare( conversation_id=conversation_id, created_by=current_user.id, share_token=share_token, share_type=share_options.share_type, permission_level=share_options.permission_level, expires_at=share_options.expires_at, max_views=share_options.max_views, password_hash=hash_password(share_options.password) if share_options.password else None, require_authentication=share_options.require_authentication, allow_download=share_options.allow_download ) db.add(share) # Add recipients for user/team shares if share_options.recipients: for recipient in share_options.recipients: db.add(ShareRecipient( share_id=share.id, recipient_type=recipient.type, recipient_id=recipient.id )) # Send notification email await send_share_notification(recipient, conversation, share) db.commit() return { "share_id": str(share.id), "share_token": share_token, "share_url": f"{settings.BASE_URL}/shared/{share_token}", "expires_at": share_options.expires_at, "settings": share.to_dict() } @router.get("/shared/{share_token}") async def access_shared_conversation( share_token: str, request: Request, password: Optional[str] = None, current_user: Optional[User] = Depends(get_optional_user), db: Session = Depends(get_db) ): """ Access a shared conversation via share token. - Validates share is active and not expired - Checks password if required - Logs access for audit trail - Returns conversation content based on permission level """ share = db.query(ConversationShare)\ .filter(ConversationShare.share_token == share_token)\ .filter(ConversationShare.is_active == True)\ .first() if not share: raise HTTPException(status_code=404, detail="Share not found or expired") # Check expiration if share.expires_at and share.expires_at < datetime.utcnow(): raise HTTPException(status_code=410, detail="Share link has expired") # Check max views if share.max_views and share.current_views >= share.max_views: raise HTTPException(status_code=410, detail="Maximum views exceeded") # Check authentication requirement if share.require_authentication and not current_user: raise HTTPException( status_code=401, detail="Authentication required to view this conversation" ) # Check password if share.password_hash: if not password or not verify_password(password, share.password_hash): raise HTTPException(status_code=401, detail="Invalid password") # Log access access_log = ShareAccessLog( share_id=share.id, accessor_id=current_user.id if current_user else None, accessor_ip=request.client.host, accessor_user_agent=request.headers.get('user-agent'), access_type='view' ) db.add(access_log) # Increment view counter share.current_views += 1 db.commit() # Fetch conversation with messages conversation = await get_conversation_for_share( share.conversation_id, share.permission_level, db ) return { "conversation": conversation, "permission_level": share.permission_level, "allow_download": share.allow_download, "can_comment": share.permission_level in ['comment', 'edit'] } @router.put("/shares/{share_id}") async def update_share_settings( share_id: str, updates: ShareUpdateOptions, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Update share settings (permission level, expiration, etc.)""" share = await get_share_or_404(share_id, current_user, db) for field, value in updates.dict(exclude_unset=True).items(): if field == 'password': share.password_hash = hash_password(value) if value else None else: setattr(share, field, value) share.updated_at = datetime.utcnow() db.commit() return share.to_dict() @router.delete("/shares/{share_id}") async def revoke_share( share_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Revoke a share link""" share = await get_share_or_404(share_id, current_user, db) share.is_active = False share.updated_at = datetime.utcnow() db.commit() return {"message": "Share revoked successfully"} @router.get("/conversations/{conversation_id}/shares") async def list_conversation_shares( conversation_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """List all shares for a conversation""" await verify_conversation_ownership(conversation_id, current_user, db) shares = db.query(ConversationShare)\ .filter(ConversationShare.conversation_id == conversation_id)\ .order_by(ConversationShare.created_at.desc())\ .all() return {"shares": [s.to_dict() for s in shares]} @router.get("/shares/{share_id}/access-logs") async def get_share_access_logs( share_id: str, page: int = 1, page_size: int = 50, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Get access logs for a share (audit trail)""" share = await get_share_or_404(share_id, current_user, db) logs = db.query(ShareAccessLog)\ .filter(ShareAccessLog.share_id == share_id)\ .order_by(ShareAccessLog.accessed_at.desc())\ .offset((page - 1) * page_size)\ .limit(page_size)\ .all() total = db.query(ShareAccessLog)\ .filter(ShareAccessLog.share_id == share_id)\ .count() return { "logs": [log.to_dict() for log in logs], "total": total, "page": page, "page_size": page_size } # Collaboration endpoints @router.post("/conversations/{conversation_id}/comments") async def add_comment( conversation_id: str, comment: CommentCreate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Add a comment to a conversation or specific message""" # Verify access (owner or shared with edit/comment permission) await verify_comment_access(conversation_id, current_user, db) new_comment = ConversationComment( conversation_id=conversation_id, message_id=comment.message_id, user_id=current_user.id, parent_comment_id=comment.parent_comment_id, content=comment.content ) db.add(new_comment) db.commit() # Notify conversation owner and other commenters await notify_comment_participants(conversation_id, new_comment, current_user) return new_comment.to_dict() @router.get("/conversations/{conversation_id}/comments") async def list_comments( conversation_id: str, message_id: Optional[str] = None, include_resolved: bool = False, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """List comments for a conversation""" await verify_comment_access(conversation_id, current_user, db) query = db.query(ConversationComment)\ .filter(ConversationComment.conversation_id == conversation_id) if message_id: query = query.filter(ConversationComment.message_id == message_id) if not include_resolved: query = query.filter(ConversationComment.is_resolved == False) comments = query.order_by(ConversationComment.created_at.asc()).all() return {"comments": [c.to_dict() for c in comments]} ``` #### A.2.2 Improvements & Optimizations 1. **Security Enhancements:** - Rate limiting on share access attempts - IP-based access restrictions option - Automatic share expiration for inactive links - Watermarking for downloaded exports 2. **Performance Optimizations:** - Cache shared conversation data - Lazy load messages for long conversations - Compress response payloads - CDN for static assets in shared views 3. **Collaboration Features:** - Real-time collaboration via WebSocket - @mentions in comments - Comment threading with infinite depth - Comment resolution workflow 4. **Analytics:** - View count and engagement metrics - Geographic access distribution - Time-based access patterns - Referrer tracking --- ## Feature Category B: Collaboration Features _(Covered in A.2 above - Conversation Sharing)_ --- ## Feature Category C: Voice Pipeline Completion **Timeline:** 3-4 weeks **Priority:** HIGH ### C.1 Full Voice Pipeline with OpenAI Realtime API **Original Scope:** Basic text-based streaming **Enhanced Scope:** Complete voice assistant with VAD, WebRTC, barge-in, voice authentication, and Realtime API **Features Included:** - OpenAI Realtime API integration - WebRTC audio streaming - Voice Activity Detection (VAD) - Echo cancellation and noise suppression - Barge-in support for natural conversation - Voice authentication #### C.1.1 Implementation Details **Database Schema:** ```sql -- Voice session tracking CREATE TABLE voice_sessions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), conversation_id UUID NOT NULL REFERENCES conversations(id) ON DELETE CASCADE, user_id UUID NOT NULL REFERENCES users(id), session_token VARCHAR(255) UNIQUE NOT NULL, status VARCHAR(50) NOT NULL DEFAULT 'initializing', -- initializing, connected, speaking, listening, processing, ended voice_config JSONB, -- voice settings, language, etc. metrics JSONB, -- latency, audio quality metrics started_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), ended_at TIMESTAMP WITH TIME ZONE, total_duration_ms INTEGER, audio_bytes_sent BIGINT DEFAULT 0, audio_bytes_received BIGINT DEFAULT 0 ); -- Voice transcripts CREATE TABLE voice_transcripts ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), voice_session_id UUID NOT NULL REFERENCES voice_sessions(id) ON DELETE CASCADE, message_id UUID REFERENCES messages(id), transcript_type VARCHAR(50) NOT NULL, -- 'user_speech', 'assistant_speech' content TEXT NOT NULL, confidence DECIMAL(3,2), language_detected VARCHAR(10), audio_duration_ms INTEGER, timestamps JSONB, -- word-level timestamps created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- Voice quality metrics CREATE TABLE voice_quality_metrics ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), voice_session_id UUID NOT NULL REFERENCES voice_sessions(id) ON DELETE CASCADE, metric_type VARCHAR(50) NOT NULL, -- 'latency', 'jitter', 'packet_loss', 'audio_level' metric_value DECIMAL(10,4), recorded_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); CREATE INDEX idx_voice_sessions_conversation ON voice_sessions(conversation_id); CREATE INDEX idx_voice_sessions_user ON voice_sessions(user_id); CREATE INDEX idx_voice_transcripts_session ON voice_transcripts(voice_session_id); CREATE INDEX idx_voice_metrics_session ON voice_quality_metrics(voice_session_id); ``` **WebSocket Voice Handler:** ```python # services/api-gateway/app/api/voice_realtime.py from fastapi import WebSocket, WebSocketDisconnect, Depends import asyncio import json from typing import Optional from dataclasses import dataclass from enum import Enum class VoiceSessionState(Enum): INITIALIZING = "initializing" CONNECTED = "connected" LISTENING = "listening" PROCESSING = "processing" SPEAKING = "speaking" ENDED = "ended" @dataclass class VoiceConfig: voice_id: str = "alloy" language: str = "en" speech_rate: float = 1.0 enable_vad: bool = True vad_threshold: float = 0.5 enable_barge_in: bool = True audio_format: str = "pcm16" sample_rate: int = 24000 class VoiceSessionManager: """Manages WebRTC/WebSocket voice sessions with OpenAI Realtime API""" def __init__(self): self.active_sessions: dict[str, 'VoiceSession'] = {} self.openai_client = OpenAIRealtimeClient() async def create_session( self, websocket: WebSocket, user: User, conversation_id: str, config: VoiceConfig ) -> 'VoiceSession': session = VoiceSession( websocket=websocket, user=user, conversation_id=conversation_id, config=config, openai_client=self.openai_client ) self.active_sessions[session.id] = session return session async def end_session(self, session_id: str): if session_id in self.active_sessions: session = self.active_sessions[session_id] await session.cleanup() del self.active_sessions[session_id] class VoiceSession: """Individual voice session handling""" def __init__( self, websocket: WebSocket, user: User, conversation_id: str, config: VoiceConfig, openai_client: 'OpenAIRealtimeClient' ): self.id = str(uuid.uuid4()) self.websocket = websocket self.user = user self.conversation_id = conversation_id self.config = config self.openai_client = openai_client self.state = VoiceSessionState.INITIALIZING self.openai_ws: Optional[WebSocket] = None self.vad = VoiceActivityDetector(config.vad_threshold) self.audio_buffer = AudioBuffer() self.metrics = VoiceMetrics() async def start(self): """Initialize voice session and connect to OpenAI Realtime""" try: # Connect to OpenAI Realtime API self.openai_ws = await self.openai_client.connect( model="gpt-4o-realtime-preview", voice=self.config.voice_id, language=self.config.language ) # Configure session await self.openai_ws.send(json.dumps({ "type": "session.update", "session": { "modalities": ["text", "audio"], "instructions": self._build_system_instructions(), "voice": self.config.voice_id, "input_audio_format": self.config.audio_format, "output_audio_format": self.config.audio_format, "input_audio_transcription": { "model": "whisper-1" }, "turn_detection": { "type": "server_vad", "threshold": self.config.vad_threshold, "prefix_padding_ms": 300, "silence_duration_ms": 500 } } })) self.state = VoiceSessionState.CONNECTED # Start bidirectional streaming await asyncio.gather( self._handle_client_audio(), self._handle_openai_responses() ) except Exception as e: await self._handle_error(e) async def _handle_client_audio(self): """Process incoming audio from client""" try: while self.state != VoiceSessionState.ENDED: try: data = await asyncio.wait_for( self.websocket.receive(), timeout=30.0 ) if "bytes" in data: audio_data = data["bytes"] # Track metrics self.metrics.bytes_received += len(audio_data) # VAD processing (if client-side VAD disabled) if not self.config.enable_vad: is_speech = self.vad.process(audio_data) if not is_speech: continue # Handle barge-in if self.state == VoiceSessionState.SPEAKING and self.config.enable_barge_in: if self.vad.process(audio_data): await self._interrupt_response() # Forward to OpenAI await self.openai_ws.send(json.dumps({ "type": "input_audio_buffer.append", "audio": base64.b64encode(audio_data).decode() })) elif "text" in data: message = json.loads(data["text"]) await self._handle_client_message(message) except asyncio.TimeoutError: # Send keepalive await self.websocket.send_json({"type": "ping"}) except WebSocketDisconnect: self.state = VoiceSessionState.ENDED async def _handle_openai_responses(self): """Process responses from OpenAI Realtime API""" try: async for message in self.openai_ws: data = json.loads(message) event_type = data.get("type") if event_type == "response.audio.delta": # Stream audio to client audio_data = base64.b64decode(data["delta"]) await self.websocket.send_bytes(audio_data) self.state = VoiceSessionState.SPEAKING self.metrics.bytes_sent += len(audio_data) elif event_type == "response.audio.done": self.state = VoiceSessionState.LISTENING await self.websocket.send_json({ "type": "audio_complete", "response_id": data.get("response_id") }) elif event_type == "conversation.item.input_audio_transcription.completed": # User speech transcribed await self._save_transcript( "user_speech", data.get("transcript"), data.get("confidence") ) await self.websocket.send_json({ "type": "user_transcript", "text": data.get("transcript") }) elif event_type == "response.text.delta": # Streaming text response await self.websocket.send_json({ "type": "text_delta", "delta": data.get("delta") }) elif event_type == "error": await self._handle_openai_error(data) except Exception as e: await self._handle_error(e) async def _interrupt_response(self): """Handle barge-in - interrupt current response""" await self.openai_ws.send(json.dumps({ "type": "response.cancel" })) await self.websocket.send_json({ "type": "response_interrupted" }) self.state = VoiceSessionState.LISTENING def _build_system_instructions(self) -> str: """Build system instructions for the voice assistant""" return """You are a helpful medical AI assistant. Speak naturally and conversationally. Keep responses concise and focused. Ask clarifying questions when needed. Always cite sources when providing medical information. If unsure, recommend consulting a healthcare provider.""" async def cleanup(self): """Cleanup session resources""" self.state = VoiceSessionState.ENDED if self.openai_ws: await self.openai_ws.close() # Save session metrics to database await self._save_session_metrics() class VoiceActivityDetector: """Voice Activity Detection using WebRTC VAD or similar""" def __init__(self, threshold: float = 0.5): self.threshold = threshold # Initialize VAD model (e.g., silero-vad, webrtcvad) import webrtcvad self.vad = webrtcvad.Vad() self.vad.set_mode(2) # Moderate aggressiveness def process(self, audio_data: bytes) -> bool: """Check if audio contains speech""" try: # WebRTC VAD expects 10, 20, or 30ms frames at 8/16/32/48 kHz frame_duration = 30 # ms sample_rate = 16000 frame_length = int(sample_rate * frame_duration / 1000) * 2 # 16-bit # Process frames speech_frames = 0 total_frames = 0 for i in range(0, len(audio_data) - frame_length, frame_length): frame = audio_data[i:i + frame_length] if len(frame) == frame_length: is_speech = self.vad.is_speech(frame, sample_rate) if is_speech: speech_frames += 1 total_frames += 1 if total_frames == 0: return False return (speech_frames / total_frames) >= self.threshold except Exception: return True # Assume speech on error class OpenAIRealtimeClient: """Client for OpenAI Realtime API""" def __init__(self): self.api_key = settings.OPENAI_API_KEY self.base_url = "wss://api.openai.com/v1/realtime" async def connect( self, model: str = "gpt-4o-realtime-preview", **kwargs ): """Establish WebSocket connection to OpenAI Realtime API""" import websockets headers = { "Authorization": f"Bearer {self.api_key}", "OpenAI-Beta": "realtime=v1" } ws = await websockets.connect( f"{self.base_url}?model={model}", extra_headers=headers, ping_interval=30, ping_timeout=10 ) return ws # FastAPI WebSocket endpoint @router.websocket("/api/voice/realtime") async def voice_realtime_websocket( websocket: WebSocket, token: str, conversation_id: str, db: Session = Depends(get_db) ): """ WebSocket endpoint for real-time voice interactions. Protocol: - Client sends audio as binary WebSocket messages - Server sends audio responses as binary messages - JSON messages for control and metadata """ # Authenticate user = await authenticate_websocket(token, db) if not user: await websocket.close(code=4001, reason="Unauthorized") return await websocket.accept() # Parse voice config from query params or initial message config = VoiceConfig() session_manager = VoiceSessionManager() session = await session_manager.create_session( websocket=websocket, user=user, conversation_id=conversation_id, config=config ) try: await session.start() finally: await session_manager.end_session(session.id) ``` #### C.1.2 Improvements & Optimizations 1. **Latency Optimization:** - Implement audio streaming with chunked transfer - Use Opus codec for lower bandwidth - Pre-warm WebSocket connections - Edge deployment for regional latency reduction 2. **Quality Enhancements:** - Echo cancellation using WebRTC AEC - Noise suppression using RNNoise - Automatic gain control - Audio level normalization 3. **Reliability:** - Automatic reconnection with state preservation - Graceful degradation to text mode - Audio buffer management for network jitter - Connection quality monitoring 4. **Advanced Features:** - Multi-language support with auto-detection - Custom wake words - Speaker diarization for multi-user scenarios - Emotion detection in voice #### C.1.3 Testing Strategy ```python # tests/integration/test_voice_realtime.py import pytest import asyncio import websockets import json import base64 from pathlib import Path class TestVoiceRealtimeAPI: @pytest.fixture def sample_audio_file(self): return Path("tests/fixtures/sample_speech.wav") @pytest.fixture async def voice_websocket(self, test_server, auth_token): async with websockets.connect( f"ws://{test_server}/api/voice/realtime?token={auth_token}&conversation_id=test-conv-1" ) as ws: yield ws @pytest.mark.asyncio async def test_voice_session_connection(self, voice_websocket): """Test WebSocket connection establishment""" # Should receive session confirmation response = await asyncio.wait_for( voice_websocket.recv(), timeout=5.0 ) data = json.loads(response) assert data["type"] == "session.created" assert "session_id" in data @pytest.mark.asyncio async def test_audio_streaming(self, voice_websocket, sample_audio_file): """Test sending audio and receiving response""" # Wait for session ready await voice_websocket.recv() # Send audio chunks audio_data = sample_audio_file.read_bytes() chunk_size = 4096 for i in range(0, len(audio_data), chunk_size): chunk = audio_data[i:i + chunk_size] await voice_websocket.send(chunk) await asyncio.sleep(0.02) # Simulate real-time # Wait for transcription transcript = None timeout = 10.0 start = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start < timeout: response = await voice_websocket.recv() if isinstance(response, str): data = json.loads(response) if data.get("type") == "user_transcript": transcript = data.get("text") break assert transcript is not None assert len(transcript) > 0 @pytest.mark.asyncio async def test_barge_in(self, voice_websocket, sample_audio_file): """Test interrupting assistant speech""" await voice_websocket.recv() # Session ready # Trigger a response (text input for simplicity) await voice_websocket.send(json.dumps({ "type": "text_input", "text": "Tell me a long story about medicine" })) # Wait for response to start await asyncio.sleep(1) # Send interrupt audio (simulating user speaking) audio_chunk = sample_audio_file.read_bytes()[:8192] await voice_websocket.send(audio_chunk) # Should receive interruption event found_interrupt = False timeout = 5.0 start = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start < timeout: response = await voice_websocket.recv() if isinstance(response, str): data = json.loads(response) if data.get("type") == "response_interrupted": found_interrupt = True break assert found_interrupt @pytest.mark.asyncio async def test_session_metrics(self, voice_websocket, db_session): """Test that session metrics are recorded""" # Get session ID from connection response = await voice_websocket.recv() session_id = json.loads(response)["session_id"] # Send some audio await voice_websocket.send(b"\x00" * 4096) await asyncio.sleep(1) # Close connection await voice_websocket.close() await asyncio.sleep(0.5) # Verify metrics in database session = db_session.query(VoiceSession)\ .filter(VoiceSession.id == session_id)\ .first() assert session is not None assert session.audio_bytes_received > 0 assert session.ended_at is not None class TestVoiceActivityDetection: @pytest.fixture def vad(self): return VoiceActivityDetector(threshold=0.5) def test_detects_speech(self, vad, speech_audio_sample): """Test VAD correctly identifies speech""" result = vad.process(speech_audio_sample) assert result is True def test_detects_silence(self, vad, silence_audio_sample): """Test VAD correctly identifies silence""" result = vad.process(silence_audio_sample) assert result is False def test_handles_noise(self, vad, noise_audio_sample): """Test VAD handles background noise""" result = vad.process(noise_audio_sample) # Should not trigger on pure noise assert result is False # Performance tests class TestVoicePerformance: @pytest.mark.performance @pytest.mark.asyncio async def test_audio_latency(self, voice_websocket, sample_audio_file): """Test end-to-end latency for audio processing""" import time await voice_websocket.recv() # Session ready audio_chunk = sample_audio_file.read_bytes()[:4096] start_time = time.time() await voice_websocket.send(audio_chunk) # Wait for first response response = await voice_websocket.recv() end_time = time.time() latency_ms = (end_time - start_time) * 1000 # Latency should be under 500ms for good UX assert latency_ms < 500, f"Latency too high: {latency_ms}ms" @pytest.mark.performance @pytest.mark.asyncio async def test_concurrent_voice_sessions(self, test_server, auth_token): """Test multiple concurrent voice sessions""" num_sessions = 20 async def create_session(): async with websockets.connect( f"ws://{test_server}/api/voice/realtime?token={auth_token}&conversation_id=test" ) as ws: response = await ws.recv() return json.loads(response).get("session_id") # Create concurrent sessions tasks = [create_session() for _ in range(num_sessions)] session_ids = await asyncio.gather(*tasks) # All should succeed assert len(session_ids) == num_sessions assert all(sid is not None for sid in session_ids) ``` ### C.2 Voice Authentication **Purpose:** Verify user identity using voice biometrics as an additional authentication factor #### C.2.1 Implementation Details ```python # services/api-gateway/app/services/voice_auth_service.py from typing import Optional, Tuple import numpy as np from dataclasses import dataclass @dataclass class VoicePrint: """User voice biometric data""" user_id: str embedding: np.ndarray created_at: datetime quality_score: float sample_count: int class VoiceAuthService: """Voice biometric authentication service""" def __init__(self): # Using speaker verification model (e.g., speechbrain, resemblyzer) from resemblyzer import VoiceEncoder self.encoder = VoiceEncoder() self.similarity_threshold = 0.85 self.min_audio_duration = 3.0 # seconds async def enroll_voice( self, user_id: str, audio_samples: list[bytes], db: Session ) -> VoicePrint: """ Enroll user's voice for authentication. Requires multiple samples for better accuracy. """ if len(audio_samples) < 3: raise ValueError("At least 3 audio samples required for enrollment") embeddings = [] for audio in audio_samples: # Validate audio quality quality = self._assess_audio_quality(audio) if quality < 0.7: continue # Generate voice embedding embedding = await self._generate_embedding(audio) embeddings.append(embedding) if len(embeddings) < 3: raise ValueError("Insufficient quality audio samples") # Average embeddings for robustness avg_embedding = np.mean(embeddings, axis=0) avg_embedding = avg_embedding / np.linalg.norm(avg_embedding) # Store voice print voice_print = UserVoicePrint( user_id=user_id, embedding=avg_embedding.tobytes(), quality_score=np.mean([self._assess_audio_quality(a) for a in audio_samples]), sample_count=len(embeddings) ) db.add(voice_print) db.commit() return VoicePrint( user_id=user_id, embedding=avg_embedding, created_at=voice_print.created_at, quality_score=voice_print.quality_score, sample_count=len(embeddings) ) async def verify_voice( self, user_id: str, audio: bytes, db: Session ) -> Tuple[bool, float]: """ Verify if audio matches user's enrolled voice. Returns (is_match, confidence_score) """ # Get stored voice print stored = db.query(UserVoicePrint).filter( UserVoicePrint.user_id == user_id ).first() if not stored: return False, 0.0 # Check audio quality quality = self._assess_audio_quality(audio) if quality < 0.5: return False, 0.0 # Generate embedding for verification audio verify_embedding = await self._generate_embedding(audio) # Compare with stored embedding stored_embedding = np.frombuffer(stored.embedding, dtype=np.float32) similarity = self._cosine_similarity(verify_embedding, stored_embedding) is_match = similarity >= self.similarity_threshold # Log verification attempt db.add(VoiceAuthLog( user_id=user_id, success=is_match, similarity_score=similarity, audio_quality=quality )) db.commit() return is_match, float(similarity) async def _generate_embedding(self, audio: bytes) -> np.ndarray: """Generate voice embedding from audio""" import soundfile as sf from io import BytesIO # Convert bytes to numpy array audio_buffer = BytesIO(audio) waveform, sample_rate = sf.read(audio_buffer) # Resample if needed if sample_rate != 16000: import librosa waveform = librosa.resample(waveform, orig_sr=sample_rate, target_sr=16000) # Generate embedding embedding = self.encoder.embed_utterance(waveform) return embedding def _assess_audio_quality(self, audio: bytes) -> float: """Assess audio quality for voice authentication""" import soundfile as sf from io import BytesIO audio_buffer = BytesIO(audio) waveform, sample_rate = sf.read(audio_buffer) # Check duration duration = len(waveform) / sample_rate if duration < self.min_audio_duration: return 0.0 # Check signal-to-noise ratio (simplified) signal_power = np.mean(waveform ** 2) noise_floor = np.percentile(np.abs(waveform), 10) snr = 10 * np.log10(signal_power / (noise_floor ** 2 + 1e-10)) # Normalize to 0-1 range quality = min(1.0, max(0.0, snr / 30)) return quality def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float: """Calculate cosine similarity between two embeddings""" return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) # API Endpoints @router.post("/api/voice-auth/enroll") async def enroll_voice_auth( files: list[UploadFile] = File(...), current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ Enroll voice for authentication. Requires at least 3 audio samples of user speaking. """ if len(files) < 3: raise HTTPException( status_code=400, detail="At least 3 audio samples required" ) audio_samples = [await f.read() for f in files] voice_auth = VoiceAuthService() voice_print = await voice_auth.enroll_voice( user_id=str(current_user.id), audio_samples=audio_samples, db=db ) return { "message": "Voice authentication enrolled successfully", "quality_score": voice_print.quality_score, "sample_count": voice_print.sample_count } @router.post("/api/voice-auth/verify") async def verify_voice_auth( file: UploadFile = File(...), user_id: str = Query(...), db: Session = Depends(get_db) ): """Verify voice for authentication""" audio = await file.read() voice_auth = VoiceAuthService() is_match, confidence = await voice_auth.verify_voice( user_id=user_id, audio=audio, db=db ) return { "verified": is_match, "confidence": confidence, "threshold": voice_auth.similarity_threshold } ``` #### C.2.2 Testing for Voice Authentication ```python class TestVoiceAuthentication: @pytest.mark.asyncio async def test_voice_enrollment(self, voice_auth_service, audio_samples): """Test voice enrollment with multiple samples""" voice_print = await voice_auth_service.enroll_voice( user_id="test-user", audio_samples=audio_samples, db=mock_db ) assert voice_print.sample_count >= 3 assert voice_print.quality_score > 0.7 @pytest.mark.asyncio async def test_voice_verification_success(self, voice_auth_service, enrolled_user): """Test successful voice verification""" is_match, confidence = await voice_auth_service.verify_voice( user_id=enrolled_user.id, audio=enrolled_user.verification_sample, db=mock_db ) assert is_match is True assert confidence >= 0.85 @pytest.mark.asyncio async def test_voice_verification_failure(self, voice_auth_service, enrolled_user, different_user_audio): """Test voice verification with different user""" is_match, confidence = await voice_auth_service.verify_voice( user_id=enrolled_user.id, audio=different_user_audio, db=mock_db ) assert is_match is False assert confidence < 0.85 @pytest.mark.asyncio async def test_low_quality_audio_rejection(self, voice_auth_service, noisy_audio): """Test rejection of low quality audio""" is_match, confidence = await voice_auth_service.verify_voice( user_id="test-user", audio=noisy_audio, db=mock_db ) assert confidence == 0.0 ``` --- ## Feature Category D: Advanced Medical AI **Timeline:** 4-5 weeks **Priority:** HIGH ### D.1 BioGPT/PubMedBERT Integration **Purpose:** Medical-specific embeddings and language models for improved accuracy #### D.1.1 Implementation Details ```python # services/api-gateway/app/services/medical_ai_service.py from typing import List, Optional, Dict, Any from transformers import AutoTokenizer, AutoModel import torch class MedicalEmbeddingService: """Medical-specific embeddings using BioGPT and PubMedBERT""" def __init__(self): self.models = {} self._load_models() def _load_models(self): """Load medical language models""" # PubMedBERT for medical embeddings self.models['pubmedbert'] = { 'tokenizer': AutoTokenizer.from_pretrained( "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext" ), 'model': AutoModel.from_pretrained( "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract-fulltext" ) } # BioGPT for medical text generation from transformers import BioGptTokenizer, BioGptForCausalLM self.models['biogpt'] = { 'tokenizer': BioGptTokenizer.from_pretrained("microsoft/biogpt"), 'model': BioGptForCausalLM.from_pretrained("microsoft/biogpt") } # SciBERT for scientific text self.models['scibert'] = { 'tokenizer': AutoTokenizer.from_pretrained("allenai/scibert_scivocab_uncased"), 'model': AutoModel.from_pretrained("allenai/scibert_scivocab_uncased") } async def generate_medical_embedding( self, text: str, model_type: str = "pubmedbert" ) -> List[float]: """Generate embeddings using medical language model""" if model_type not in self.models: raise ValueError(f"Unknown model type: {model_type}") model_config = self.models[model_type] tokenizer = model_config['tokenizer'] model = model_config['model'] # Tokenize and encode inputs = tokenizer( text, return_tensors="pt", truncation=True, max_length=512, padding=True ) with torch.no_grad(): outputs = model(**inputs) # Use [CLS] token embedding or mean pooling embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy() return embedding.tolist() async def generate_medical_text( self, prompt: str, max_length: int = 200, temperature: float = 0.7 ) -> str: """Generate medical text using BioGPT""" tokenizer = self.models['biogpt']['tokenizer'] model = self.models['biogpt']['model'] inputs = tokenizer(prompt, return_tensors="pt") outputs = model.generate( **inputs, max_length=max_length, temperature=temperature, do_sample=True, top_p=0.9 ) generated = tokenizer.decode(outputs[0], skip_special_tokens=True) return generated class MedicalNERService: """Medical Named Entity Recognition""" def __init__(self): import spacy # Load scispacy models self.nlp = spacy.load("en_core_sci_lg") # Add entity linker for medical ontologies from scispacy.linking import EntityLinker self.nlp.add_pipe( "scispacy_linker", config={ "resolve_abbreviations": True, "linker_name": "umls" # Links to UMLS concepts } ) async def extract_entities(self, text: str) -> List[Dict[str, Any]]: """Extract medical entities from text""" doc = self.nlp(text) entities = [] for ent in doc.ents: entity_data = { "text": ent.text, "label": ent.label_, "start": ent.start_char, "end": ent.end_char, "kb_ids": [] } # Add knowledge base links if hasattr(ent._, 'kb_ents') and ent._.kb_ents: for kb_ent in ent._.kb_ents: entity_data["kb_ids"].append({ "cui": kb_ent[0], # UMLS CUI "score": kb_ent[1] }) entities.append(entity_data) return entities async def normalize_entities( self, entities: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """Normalize entities to standard medical ontologies""" normalized = [] for entity in entities: norm_entity = entity.copy() # Map to standard codes if entity.get("kb_ids"): cui = entity["kb_ids"][0]["cui"] # Look up ICD-10, RxNorm, SNOMED mappings mappings = await self._get_ontology_mappings(cui) norm_entity["mappings"] = mappings normalized.append(norm_entity) return normalized async def _get_ontology_mappings(self, cui: str) -> Dict[str, str]: """Get ontology mappings for a UMLS CUI""" # Query UMLS API or local cache for mappings # Returns: {"icd10": "I10", "snomed": "38341003", "rxnorm": "..."} return {} # API Endpoints @router.post("/api/medical-ai/embed") async def generate_medical_embedding( request: EmbeddingRequest, current_user: User = Depends(get_current_user) ): """Generate medical-specific embeddings""" service = MedicalEmbeddingService() embedding = await service.generate_medical_embedding( text=request.text, model_type=request.model_type or "pubmedbert" ) return {"embedding": embedding, "model": request.model_type} @router.post("/api/medical-ai/extract-entities") async def extract_medical_entities( request: TextRequest, current_user: User = Depends(get_current_user) ): """Extract medical entities from text""" ner_service = MedicalNERService() entities = await ner_service.extract_entities(request.text) normalized = await ner_service.normalize_entities(entities) return {"entities": normalized} ``` ### D.2 Domain-Specific Language Models ```python # services/api-gateway/app/services/domain_llm_service.py class DomainLLMService: """Specialized LLM routing for different medical domains""" DOMAIN_PROMPTS = { "cardiology": """You are a specialized cardiology AI assistant. Focus on cardiovascular conditions, treatments, and diagnostics. Reference ACC/AHA guidelines when applicable.""", "oncology": """You are a specialized oncology AI assistant. Focus on cancer diagnosis, staging, and treatment protocols. Reference NCCN guidelines when applicable.""", "neurology": """You are a specialized neurology AI assistant. Focus on neurological conditions and treatments. Reference AAN guidelines when applicable.""", "general": """You are a general medical AI assistant. Provide comprehensive medical information with appropriate citations.""" } def __init__(self): self.embedding_service = MedicalEmbeddingService() self.domain_classifier = self._load_domain_classifier() async def detect_domain(self, query: str) -> str: """Classify query into medical domain""" embedding = await self.embedding_service.generate_medical_embedding(query) # Use classifier to determine domain domain = self.domain_classifier.predict([embedding])[0] return domain async def get_domain_response( self, query: str, context: Optional[str] = None, domain: Optional[str] = None ) -> Dict[str, Any]: """Generate response using domain-specific prompt""" if not domain: domain = await self.detect_domain(query) system_prompt = self.DOMAIN_PROMPTS.get(domain, self.DOMAIN_PROMPTS["general"]) # Call LLM with domain-specific prompt response = await self.llm.generate( system_prompt=system_prompt, user_message=query, context=context ) return { "response": response, "domain": domain, "confidence": await self._get_confidence(query, response) } def _load_domain_classifier(self): """Load domain classification model""" from sklearn.ensemble import RandomForestClassifier import joblib # Load pre-trained classifier return joblib.load("models/domain_classifier.joblib") ``` --- ## Feature Category E: External Medical Integrations **Timeline:** 6-8 weeks **Priority:** HIGH ### E.1 UpToDate API Integration **Purpose:** Real-time clinical decision support, drug interactions, diagnostic algorithms ```python # services/api-gateway/app/services/uptodate_service.py import httpx from typing import Optional, List, Dict, Any from dataclasses import dataclass @dataclass class UpToDateResult: """UpToDate search result""" topic_id: str title: str summary: str url: str last_updated: str grade_of_evidence: Optional[str] class UpToDateService: """Integration with UpToDate clinical decision support""" def __init__(self): self.api_key = settings.UPTODATE_API_KEY self.base_url = "https://api.uptodate.com/v1" self.rate_limiter = RateLimiter(requests_per_minute=60) async def search( self, query: str, specialty: Optional[str] = None, max_results: int = 10 ) -> List[UpToDateResult]: """Search UpToDate knowledge base""" await self.rate_limiter.acquire() async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/search", params={ "query": query, "specialty": specialty, "limit": max_results }, headers={"Authorization": f"Bearer {self.api_key}"} ) response.raise_for_status() results = [] for item in response.json().get("results", []): results.append(UpToDateResult( topic_id=item["topicId"], title=item["title"], summary=item.get("summary", ""), url=item["url"], last_updated=item.get("lastUpdated"), grade_of_evidence=item.get("gradeOfEvidence") )) return results async def get_drug_interactions( self, drugs: List[str] ) -> List[Dict[str, Any]]: """Check drug-drug interactions""" await self.rate_limiter.acquire() async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/drug-interactions", json={"drugs": drugs}, headers={"Authorization": f"Bearer {self.api_key}"} ) response.raise_for_status() return response.json().get("interactions", []) async def get_diagnostic_calculator( self, calculator_name: str, inputs: Dict[str, Any] ) -> Dict[str, Any]: """Run a clinical diagnostic calculator""" await self.rate_limiter.acquire() async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/calculators/{calculator_name}", json=inputs, headers={"Authorization": f"Bearer {self.api_key}"} ) response.raise_for_status() return response.json() # API Endpoints @router.get("/api/uptodate/search") async def search_uptodate( query: str, specialty: Optional[str] = None, current_user: User = Depends(get_current_user) ): """Search UpToDate knowledge base""" service = UpToDateService() results = await service.search(query, specialty) return {"results": [r.__dict__ for r in results]} @router.post("/api/uptodate/drug-interactions") async def check_drug_interactions( request: DrugInteractionRequest, current_user: User = Depends(get_current_user) ): """Check for drug-drug interactions""" service = UpToDateService() interactions = await service.get_drug_interactions(request.drugs) return {"interactions": interactions} ``` ### E.2 OpenEvidence API Integration ```python # services/api-gateway/app/services/openevidence_service.py class OpenEvidenceService: """Integration with OpenEvidence for evidence-based medicine""" def __init__(self): self.api_key = settings.OPENEVIDENCE_API_KEY self.base_url = "https://api.openevidence.com/v1" async def query( self, question: str, include_trials: bool = True, include_guidelines: bool = True ) -> Dict[str, Any]: """Query OpenEvidence for medical evidence""" async with httpx.AsyncClient() as client: response = await client.post( f"{self.base_url}/query", json={ "question": question, "include_clinical_trials": include_trials, "include_guidelines": include_guidelines }, headers={"Authorization": f"Bearer {self.api_key}"} ) response.raise_for_status() data = response.json() return { "answer": data.get("answer"), "evidence_grade": data.get("evidenceGrade"), "sources": data.get("sources", []), "clinical_trials": data.get("clinicalTrials", []), "guidelines": data.get("guidelines", []) } async def get_systematic_reviews( self, topic: str, limit: int = 10 ) -> List[Dict[str, Any]]: """Get systematic reviews for a topic""" async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/systematic-reviews", params={"topic": topic, "limit": limit}, headers={"Authorization": f"Bearer {self.api_key}"} ) response.raise_for_status() return response.json().get("reviews", []) ``` ### E.3 PubMed Integration ```python # services/api-gateway/app/services/pubmed_service.py from Bio import Entrez from typing import List, Dict, Any, Optional from dataclasses import dataclass @dataclass class PubMedArticle: """PubMed article metadata""" pmid: str title: str authors: List[str] abstract: str journal: str publication_date: str doi: Optional[str] mesh_terms: List[str] citation_count: int class PubMedService: """Integration with PubMed for literature search""" def __init__(self): Entrez.email = settings.PUBMED_EMAIL Entrez.api_key = settings.PUBMED_API_KEY async def search( self, query: str, max_results: int = 20, sort: str = "relevance", date_range: Optional[tuple] = None ) -> List[PubMedArticle]: """Search PubMed literature""" import asyncio loop = asyncio.get_event_loop() def _search(): # Build search query search_query = query if date_range: search_query += f" AND {date_range[0]}:{date_range[1]}[dp]" # Search PubMed handle = Entrez.esearch( db="pubmed", term=search_query, retmax=max_results, sort=sort ) results = Entrez.read(handle) handle.close() pmids = results.get("IdList", []) if not pmids: return [] # Fetch article details handle = Entrez.efetch( db="pubmed", id=",".join(pmids), rettype="xml", retmode="xml" ) records = Entrez.read(handle) handle.close() articles = [] for record in records.get("PubmedArticle", []): article = self._parse_article(record) if article: articles.append(article) return articles return await loop.run_in_executor(None, _search) def _parse_article(self, record: Dict) -> Optional[PubMedArticle]: """Parse PubMed XML record into article object""" try: medline = record.get("MedlineCitation", {}) article_data = medline.get("Article", {}) # Extract authors authors = [] for author in article_data.get("AuthorList", []): if "LastName" in author and "ForeName" in author: authors.append(f"{author['LastName']} {author['ForeName']}") # Extract MeSH terms mesh_terms = [] for mesh in medline.get("MeshHeadingList", []): mesh_terms.append(mesh.get("DescriptorName", {}).get("#text", "")) return PubMedArticle( pmid=str(medline.get("PMID")), title=article_data.get("ArticleTitle", ""), authors=authors, abstract=article_data.get("Abstract", {}).get("AbstractText", [""])[0], journal=article_data.get("Journal", {}).get("Title", ""), publication_date=self._extract_date(article_data), doi=self._extract_doi(record), mesh_terms=mesh_terms, citation_count=0 # Would require additional API call ) except Exception: return None async def get_citations(self, pmid: str) -> List[str]: """Get articles that cite this article""" import asyncio loop = asyncio.get_event_loop() def _get_citations(): handle = Entrez.elink( dbfrom="pubmed", db="pubmed", id=pmid, linkname="pubmed_pubmed_citedin" ) results = Entrez.read(handle) handle.close() citing_ids = [] for linkset in results: for link in linkset.get("LinkSetDb", []): for item in link.get("Link", []): citing_ids.append(item.get("Id")) return citing_ids return await loop.run_in_executor(None, _get_citations) async def format_citation( self, pmid: str, style: str = "AMA" ) -> str: """Generate formatted citation""" articles = await self.search(f"{pmid}[uid]", max_results=1) if not articles: return "" article = articles[0] if style == "AMA": # AMA style citation authors_str = ", ".join(article.authors[:3]) if len(article.authors) > 3: authors_str += ", et al" return f"{authors_str}. {article.title}. {article.journal}. {article.publication_date}." return "" # API Endpoints @router.get("/api/pubmed/search") async def search_pubmed( query: str, max_results: int = 20, current_user: User = Depends(get_current_user) ): """Search PubMed literature""" service = PubMedService() articles = await service.search(query, max_results) return {"articles": [a.__dict__ for a in articles]} @router.get("/api/pubmed/{pmid}/citations") async def get_pubmed_citations( pmid: str, current_user: User = Depends(get_current_user) ): """Get articles citing a specific article""" service = PubMedService() citing_ids = await service.get_citations(pmid) return {"citing_pmids": citing_ids, "count": len(citing_ids)} ``` ### E.4 Medical Calculators Library ```python # services/api-gateway/app/services/medical_calculators.py from typing import Dict, Any, Optional from dataclasses import dataclass from enum import Enum class RiskLevel(Enum): LOW = "low" MODERATE = "moderate" HIGH = "high" VERY_HIGH = "very_high" @dataclass class CalculatorResult: """Result from a medical calculator""" calculator_name: str score: float risk_level: RiskLevel interpretation: str recommendations: list[str] inputs_used: Dict[str, Any] references: list[str] class MedicalCalculatorService: """Library of clinical scoring tools""" async def wells_dvt( self, active_cancer: bool = False, paralysis: bool = False, bedridden: bool = False, tenderness: bool = False, leg_swelling: bool = False, calf_swelling: bool = False, pitting_edema: bool = False, collateral_veins: bool = False, previous_dvt: bool = False, alternative_diagnosis: bool = False ) -> CalculatorResult: """Wells Score for DVT probability""" score = 0 if active_cancer: score += 1 if paralysis or bedridden: score += 1 if bedridden: score += 1 if tenderness: score += 1 if leg_swelling: score += 1 if calf_swelling: score += 1 if pitting_edema: score += 1 if collateral_veins: score += 1 if previous_dvt: score += 1 if alternative_diagnosis: score -= 2 if score <= 0: risk = RiskLevel.LOW interpretation = "Low probability of DVT (3%)" recommendations = [ "Consider D-dimer testing", "If D-dimer negative, DVT is unlikely" ] elif score <= 2: risk = RiskLevel.MODERATE interpretation = "Moderate probability of DVT (17%)" recommendations = [ "Perform D-dimer testing", "If positive, proceed to ultrasound" ] else: risk = RiskLevel.HIGH interpretation = "High probability of DVT (75%)" recommendations = [ "Proceed directly to venous ultrasound", "Consider empiric anticoagulation" ] return CalculatorResult( calculator_name="Wells Score for DVT", score=score, risk_level=risk, interpretation=interpretation, recommendations=recommendations, inputs_used={ "active_cancer": active_cancer, "paralysis": paralysis, "bedridden": bedridden, "tenderness": tenderness, "leg_swelling": leg_swelling, "calf_swelling": calf_swelling, "pitting_edema": pitting_edema, "collateral_veins": collateral_veins, "previous_dvt": previous_dvt, "alternative_diagnosis": alternative_diagnosis }, references=[ "Wells PS, et al. Lancet. 1997;350(9094):1795-1798.", "Wells PS, et al. JAMA. 2006;295(2):199-207." ] ) async def cha2ds2_vasc( self, age: int, sex: str, chf: bool = False, hypertension: bool = False, stroke_tia: bool = False, vascular_disease: bool = False, diabetes: bool = False ) -> CalculatorResult: """CHA2DS2-VASc Score for AFib stroke risk""" score = 0 if chf: score += 1 if hypertension: score += 1 if age >= 75: score += 2 elif age >= 65: score += 1 if diabetes: score += 1 if stroke_tia: score += 2 if vascular_disease: score += 1 if sex.lower() == "female": score += 1 # Annual stroke risk risk_map = { 0: (0.2, RiskLevel.LOW), 1: (0.6, RiskLevel.LOW), 2: (2.2, RiskLevel.MODERATE), 3: (3.2, RiskLevel.MODERATE), 4: (4.8, RiskLevel.HIGH), 5: (7.2, RiskLevel.HIGH), 6: (9.7, RiskLevel.VERY_HIGH), 7: (11.2, RiskLevel.VERY_HIGH), 8: (10.8, RiskLevel.VERY_HIGH), 9: (12.2, RiskLevel.VERY_HIGH) } annual_risk, risk_level = risk_map.get(min(score, 9), (12.2, RiskLevel.VERY_HIGH)) if score == 0: recommendations = ["No anticoagulation recommended"] elif score == 1 and sex.lower() == "female": recommendations = ["Low risk, consider no anticoagulation"] else: recommendations = [ "Oral anticoagulation recommended", "Consider DOACs over warfarin if no contraindication", "Assess bleeding risk with HAS-BLED" ] return CalculatorResult( calculator_name="CHA2DS2-VASc Score", score=score, risk_level=risk_level, interpretation=f"Annual stroke risk: {annual_risk}%", recommendations=recommendations, inputs_used={ "age": age, "sex": sex, "chf": chf, "hypertension": hypertension, "stroke_tia": stroke_tia, "vascular_disease": vascular_disease, "diabetes": diabetes }, references=[ "Lip GY, et al. Chest. 2010;137(2):263-272." ] ) async def gfr_ckd_epi( self, creatinine: float, age: int, sex: str, race: Optional[str] = None ) -> CalculatorResult: """eGFR calculation using CKD-EPI equation (2021, race-free)""" import math # CKD-EPI 2021 equation (race-free) if sex.lower() == "female": if creatinine <= 0.7: gfr = 142 * pow(creatinine / 0.7, -0.241) * pow(0.9938, age) * 1.012 else: gfr = 142 * pow(creatinine / 0.7, -1.2) * pow(0.9938, age) * 1.012 else: if creatinine <= 0.9: gfr = 142 * pow(creatinine / 0.9, -0.302) * pow(0.9938, age) else: gfr = 142 * pow(creatinine / 0.9, -1.2) * pow(0.9938, age) gfr = round(gfr, 1) # CKD staging if gfr >= 90: stage = "G1" risk = RiskLevel.LOW interpretation = "Normal or high GFR" elif gfr >= 60: stage = "G2" risk = RiskLevel.LOW interpretation = "Mildly decreased GFR" elif gfr >= 45: stage = "G3a" risk = RiskLevel.MODERATE interpretation = "Mildly to moderately decreased GFR" elif gfr >= 30: stage = "G3b" risk = RiskLevel.MODERATE interpretation = "Moderately to severely decreased GFR" elif gfr >= 15: stage = "G4" risk = RiskLevel.HIGH interpretation = "Severely decreased GFR" else: stage = "G5" risk = RiskLevel.VERY_HIGH interpretation = "Kidney failure" recommendations = [] if gfr < 60: recommendations.append("Monitor kidney function regularly") recommendations.append("Review medications for renal dosing") if gfr < 30: recommendations.append("Nephrology referral recommended") recommendations.append("Prepare for renal replacement therapy if progressing") return CalculatorResult( calculator_name="eGFR (CKD-EPI 2021)", score=gfr, risk_level=risk, interpretation=f"CKD Stage {stage}: {interpretation}", recommendations=recommendations, inputs_used={ "creatinine": creatinine, "age": age, "sex": sex }, references=[ "Inker LA, et al. N Engl J Med. 2021;385(19):1737-1749." ] ) # Add more calculators: MELD, CURB-65, APACHE II, etc. # API Endpoints @router.post("/api/calculators/{calculator_name}") async def run_calculator( calculator_name: str, inputs: Dict[str, Any], current_user: User = Depends(get_current_user) ): """Run a medical calculator""" service = MedicalCalculatorService() calculator_method = getattr(service, calculator_name, None) if not calculator_method: raise HTTPException( status_code=404, detail=f"Calculator '{calculator_name}' not found" ) result = await calculator_method(**inputs) return result.__dict__ @router.get("/api/calculators") async def list_calculators( current_user: User = Depends(get_current_user) ): """List available medical calculators""" return { "calculators": [ { "name": "wells_dvt", "description": "Wells Score for DVT probability", "category": "Hematology" }, { "name": "cha2ds2_vasc", "description": "CHA2DS2-VASc for AFib stroke risk", "category": "Cardiology" }, { "name": "gfr_ckd_epi", "description": "eGFR using CKD-EPI 2021 equation", "category": "Nephrology" } # Add more calculators ] } ``` --- ## Feature Category F: Authentication & Security **Timeline:** 4-5 weeks **Priority:** HIGH ### F.1 OIDC/SSO Authentication **Original Scope:** JWT-only authentication **Enhanced Scope:** Full OIDC with multiple providers, MFA, and session management **Features Included:** - OIDC Single Sign-On with Nextcloud - Google and Microsoft OAuth support - Multi-factor authentication (MFA/TOTP) - Backup codes generation #### F.1.1 Implementation Details ```python # services/api-gateway/app/core/oidc.py from authlib.integrations.starlette_client import OAuth from starlette.config import Config class OIDCProvider: """OIDC Provider configuration""" def __init__(self, name: str, config: dict): self.name = name self.client_id = config['client_id'] self.client_secret = config['client_secret'] self.authorization_endpoint = config['authorization_endpoint'] self.token_endpoint = config['token_endpoint'] self.userinfo_endpoint = config['userinfo_endpoint'] self.jwks_uri = config['jwks_uri'] self.scopes = config.get('scopes', ['openid', 'email', 'profile']) class OIDCManager: """Manages OIDC authentication flows""" def __init__(self): self.oauth = OAuth() self.providers: dict[str, OIDCProvider] = {} self._setup_providers() def _setup_providers(self): """Configure OIDC providers""" # Nextcloud if settings.NEXTCLOUD_OIDC_ENABLED: self.register_provider('nextcloud', { 'client_id': settings.NEXTCLOUD_CLIENT_ID, 'client_secret': settings.NEXTCLOUD_CLIENT_SECRET, 'authorization_endpoint': f'{settings.NEXTCLOUD_URL}/apps/oauth2/authorize', 'token_endpoint': f'{settings.NEXTCLOUD_URL}/apps/oauth2/api/v1/token', 'userinfo_endpoint': f'{settings.NEXTCLOUD_URL}/ocs/v2.php/cloud/user', 'jwks_uri': f'{settings.NEXTCLOUD_URL}/apps/oauth2/jwks', }) # Google if settings.GOOGLE_OAUTH_ENABLED: self.register_provider('google', { 'client_id': settings.GOOGLE_CLIENT_ID, 'client_secret': settings.GOOGLE_CLIENT_SECRET, 'authorization_endpoint': 'https://accounts.google.com/o/oauth2/v2/auth', 'token_endpoint': 'https://oauth2.googleapis.com/token', 'userinfo_endpoint': 'https://openidconnect.googleapis.com/v1/userinfo', 'jwks_uri': 'https://www.googleapis.com/oauth2/v3/certs', }) # Microsoft if settings.MICROSOFT_OAUTH_ENABLED: tenant = settings.MICROSOFT_TENANT_ID self.register_provider('microsoft', { 'client_id': settings.MICROSOFT_CLIENT_ID, 'client_secret': settings.MICROSOFT_CLIENT_SECRET, 'authorization_endpoint': f'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize', 'token_endpoint': f'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token', 'userinfo_endpoint': 'https://graph.microsoft.com/oidc/userinfo', 'jwks_uri': f'https://login.microsoftonline.com/{tenant}/discovery/v2.0/keys', }) def register_provider(self, name: str, config: dict): provider = OIDCProvider(name, config) self.providers[name] = provider self.oauth.register( name=name, client_id=provider.client_id, client_secret=provider.client_secret, authorize_url=provider.authorization_endpoint, access_token_url=provider.token_endpoint, userinfo_endpoint=provider.userinfo_endpoint, jwks_uri=provider.jwks_uri, client_kwargs={'scope': ' '.join(provider.scopes)} ) async def get_authorization_url( self, provider_name: str, redirect_uri: str, state: str, nonce: str ) -> str: """Generate authorization URL for OIDC flow""" provider = self.oauth.create_client(provider_name) return await provider.create_authorization_url( redirect_uri, state=state, nonce=nonce ) async def handle_callback( self, provider_name: str, request, db: Session ) -> User: """Handle OIDC callback and create/update user""" provider = self.oauth.create_client(provider_name) # Exchange code for tokens token = await provider.authorize_access_token(request) # Get user info userinfo = await provider.userinfo(token=token) # Find or create user user = db.query(User).filter( User.email == userinfo['email'] ).first() if not user: user = User( email=userinfo['email'], full_name=userinfo.get('name'), avatar_url=userinfo.get('picture'), is_active=True, email_verified=userinfo.get('email_verified', False) ) db.add(user) # Update OIDC link oidc_link = db.query(UserOIDCLink).filter( UserOIDCLink.user_id == user.id, UserOIDCLink.provider == provider_name ).first() if not oidc_link: oidc_link = UserOIDCLink( user_id=user.id, provider=provider_name, provider_user_id=userinfo['sub'] ) db.add(oidc_link) oidc_link.last_login = datetime.utcnow() oidc_link.access_token = token['access_token'] oidc_link.refresh_token = token.get('refresh_token') oidc_link.token_expires_at = datetime.utcnow() + timedelta( seconds=token.get('expires_in', 3600) ) db.commit() return user # API Endpoints @router.get("/auth/providers") async def list_auth_providers(): """List available authentication providers""" oidc_manager = OIDCManager() return { "providers": [ { "name": name, "login_url": f"/api/auth/login/{name}" } for name in oidc_manager.providers.keys() ] } @router.get("/auth/login/{provider}") async def initiate_oidc_login( provider: str, request: Request, redirect_uri: Optional[str] = None ): """Initiate OIDC login flow""" oidc_manager = OIDCManager() if provider not in oidc_manager.providers: raise HTTPException(status_code=400, detail="Unknown provider") state = secrets.token_urlsafe(32) nonce = secrets.token_urlsafe(32) # Store in session/cache await cache.set(f"oidc_state:{state}", { "nonce": nonce, "redirect_uri": redirect_uri or settings.DEFAULT_REDIRECT_URI }, ttl=600) auth_url = await oidc_manager.get_authorization_url( provider, redirect_uri=f"{settings.BASE_URL}/api/auth/callback/{provider}", state=state, nonce=nonce ) return RedirectResponse(auth_url) @router.get("/auth/callback/{provider}") async def oidc_callback( provider: str, request: Request, db: Session = Depends(get_db) ): """Handle OIDC callback""" state = request.query_params.get('state') # Verify state state_data = await cache.get(f"oidc_state:{state}") if not state_data: raise HTTPException(status_code=400, detail="Invalid state") await cache.delete(f"oidc_state:{state}") oidc_manager = OIDCManager() user = await oidc_manager.handle_callback(provider, request, db) # Generate JWT tokens access_token = create_access_token({"sub": str(user.id)}) refresh_token = create_refresh_token({"sub": str(user.id)}) # Redirect to frontend with tokens redirect_uri = state_data['redirect_uri'] return RedirectResponse( f"{redirect_uri}?access_token={access_token}&refresh_token={refresh_token}" ) ``` #### F.1.2 MFA Implementation ```python # services/api-gateway/app/core/mfa.py import pyotp import qrcode from io import BytesIO import base64 class MFAManager: """Multi-Factor Authentication Manager""" def __init__(self): self.issuer = "VoiceAssist" def generate_totp_secret(self) -> str: """Generate a new TOTP secret""" return pyotp.random_base32() def get_totp_uri(self, user_email: str, secret: str) -> str: """Generate TOTP URI for QR code""" totp = pyotp.TOTP(secret) return totp.provisioning_uri( name=user_email, issuer_name=self.issuer ) def generate_qr_code(self, uri: str) -> str: """Generate QR code image as base64""" qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(uri) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = BytesIO() img.save(buffer, format='PNG') return base64.b64encode(buffer.getvalue()).decode() def verify_totp(self, secret: str, code: str) -> bool: """Verify TOTP code""" totp = pyotp.TOTP(secret) return totp.verify(code, valid_window=1) def generate_backup_codes(self, count: int = 10) -> list[str]: """Generate backup codes""" return [secrets.token_hex(4).upper() for _ in range(count)] # API Endpoints @router.post("/auth/mfa/setup") async def setup_mfa( current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Initialize MFA setup""" mfa_manager = MFAManager() secret = mfa_manager.generate_totp_secret() uri = mfa_manager.get_totp_uri(current_user.email, secret) qr_code = mfa_manager.generate_qr_code(uri) # Store pending secret (not activated until verified) current_user.mfa_secret_pending = secret db.commit() return { "secret": secret, "qr_code": f"data:image/png;base64,{qr_code}", "manual_entry_key": secret } @router.post("/auth/mfa/verify-setup") async def verify_mfa_setup( verification: MFAVerification, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Verify and activate MFA""" mfa_manager = MFAManager() if not current_user.mfa_secret_pending: raise HTTPException(status_code=400, detail="MFA setup not initiated") if not mfa_manager.verify_totp(current_user.mfa_secret_pending, verification.code): raise HTTPException(status_code=400, detail="Invalid verification code") # Activate MFA current_user.mfa_secret = current_user.mfa_secret_pending current_user.mfa_secret_pending = None current_user.mfa_enabled = True # Generate backup codes backup_codes = mfa_manager.generate_backup_codes() current_user.mfa_backup_codes = [hash_code(code) for code in backup_codes] db.commit() return { "message": "MFA enabled successfully", "backup_codes": backup_codes # Show once, never again } @router.post("/auth/mfa/verify") async def verify_mfa( verification: MFAVerification, token: str, # Temporary token from initial login db: Session = Depends(get_db) ): """Verify MFA code during login""" # Decode temporary token payload = decode_mfa_pending_token(token) user = db.query(User).get(payload['user_id']) if not user or not user.mfa_enabled: raise HTTPException(status_code=400, detail="Invalid request") mfa_manager = MFAManager() # Try TOTP first if mfa_manager.verify_totp(user.mfa_secret, verification.code): access_token = create_access_token({"sub": str(user.id)}) refresh_token = create_refresh_token({"sub": str(user.id)}) return {"access_token": access_token, "refresh_token": refresh_token} # Try backup codes for i, hashed_code in enumerate(user.mfa_backup_codes or []): if verify_code_hash(verification.code, hashed_code): # Remove used backup code user.mfa_backup_codes.pop(i) db.commit() access_token = create_access_token({"sub": str(user.id)}) refresh_token = create_refresh_token({"sub": str(user.id)}) return { "access_token": access_token, "refresh_token": refresh_token, "warning": "Backup code used. Consider generating new codes." } raise HTTPException(status_code=401, detail="Invalid MFA code") ``` --- ## Feature Category G: Nextcloud Integration Completion **Timeline:** 4-5 weeks **Priority:** HIGH **Features Included:** - OIDC Single Sign-On with Nextcloud - Complete email integration (IMAP/SMTP) - CardDAV contacts synchronization - Nextcloud app store packaging - Google Calendar sync ### G.1 Complete Email Integration **Original Scope:** Email skeleton only **Enhanced Scope:** Full IMAP/SMTP support with AI-powered email processing ```python # services/api-gateway/app/services/email_service.py import imaplib import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.parser import BytesParser from email.policy import default from typing import List, Optional import asyncio class EmailService: """Complete email integration service""" def __init__(self, user_email_config: dict): self.imap_host = user_email_config['imap_host'] self.imap_port = user_email_config.get('imap_port', 993) self.smtp_host = user_email_config['smtp_host'] self.smtp_port = user_email_config.get('smtp_port', 587) self.username = user_email_config['username'] self.password = user_email_config['password'] # Encrypted in DB async def fetch_emails( self, folder: str = "INBOX", limit: int = 50, since_date: Optional[datetime] = None, search_criteria: Optional[str] = None ) -> List[dict]: """Fetch emails from mailbox""" loop = asyncio.get_event_loop() def _fetch(): with imaplib.IMAP4_SSL(self.imap_host, self.imap_port) as imap: imap.login(self.username, self.password) imap.select(folder) # Build search criteria criteria = [] if since_date: criteria.append(f'SINCE {since_date.strftime("%d-%b-%Y")}') if search_criteria: criteria.append(search_criteria) search_string = ' '.join(criteria) if criteria else 'ALL' _, message_ids = imap.search(None, search_string) emails = [] for msg_id in message_ids[0].split()[-limit:]: _, msg_data = imap.fetch(msg_id, '(RFC822)') email_body = msg_data[0][1] message = BytesParser(policy=default).parsebytes(email_body) emails.append({ 'id': msg_id.decode(), 'subject': message['subject'], 'from': message['from'], 'to': message['to'], 'date': message['date'], 'body': self._get_email_body(message), 'attachments': self._get_attachments(message) }) return emails return await loop.run_in_executor(None, _fetch) async def send_email( self, to: List[str], subject: str, body: str, html_body: Optional[str] = None, attachments: Optional[List[dict]] = None, cc: Optional[List[str]] = None, bcc: Optional[List[str]] = None ) -> bool: """Send email via SMTP""" loop = asyncio.get_event_loop() def _send(): msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = self.username msg['To'] = ', '.join(to) if cc: msg['Cc'] = ', '.join(cc) # Add text body msg.attach(MIMEText(body, 'plain')) # Add HTML body if provided if html_body: msg.attach(MIMEText(html_body, 'html')) # Add attachments if attachments: for attachment in attachments: # Handle attachment pass # Send with smtplib.SMTP(self.smtp_host, self.smtp_port) as smtp: smtp.starttls() smtp.login(self.username, self.password) recipients = to + (cc or []) + (bcc or []) smtp.sendmail(self.username, recipients, msg.as_string()) return True return await loop.run_in_executor(None, _send) async def summarize_email(self, email_content: dict) -> dict: """Use AI to summarize email content""" from app.services.llm_service import LLMService llm = LLMService() summary = await llm.generate( prompt=f"Summarize this email in 2-3 sentences:\n\n{email_content['body']}", max_tokens=150 ) # Extract action items actions = await llm.generate( prompt=f"Extract any action items or requests from this email:\n\n{email_content['body']}", max_tokens=200 ) return { 'summary': summary, 'action_items': actions, 'priority': await self._classify_priority(email_content) } def _get_email_body(self, message) -> str: """Extract email body text""" if message.is_multipart(): for part in message.walk(): if part.get_content_type() == 'text/plain': return part.get_payload(decode=True).decode() return message.get_payload(decode=True).decode() def _get_attachments(self, message) -> List[dict]: """Extract attachment metadata""" attachments = [] if message.is_multipart(): for part in message.walk(): if part.get_content_disposition() == 'attachment': attachments.append({ 'filename': part.get_filename(), 'content_type': part.get_content_type(), 'size': len(part.get_payload(decode=True)) }) return attachments # API Endpoints @router.get("/api/email/messages") async def list_emails( folder: str = "INBOX", limit: int = 50, since: Optional[datetime] = None, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """List emails from connected mailbox""" email_config = await get_user_email_config(current_user, db) if not email_config: raise HTTPException(status_code=400, detail="Email not configured") email_service = EmailService(email_config) emails = await email_service.fetch_emails(folder, limit, since) return {"emails": emails} @router.post("/api/email/send") async def send_email( email_request: EmailSendRequest, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Send email""" email_config = await get_user_email_config(current_user, db) email_service = EmailService(email_config) success = await email_service.send_email( to=email_request.to, subject=email_request.subject, body=email_request.body, html_body=email_request.html_body, attachments=email_request.attachments ) if success: return {"message": "Email sent successfully"} raise HTTPException(status_code=500, detail="Failed to send email") @router.post("/api/email/{message_id}/summarize") async def summarize_email_endpoint( message_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Get AI summary of an email""" email_config = await get_user_email_config(current_user, db) email_service = EmailService(email_config) # Fetch specific email emails = await email_service.fetch_emails(search_criteria=f'UID {message_id}') if not emails: raise HTTPException(status_code=404, detail="Email not found") summary = await email_service.summarize_email(emails[0]) return summary ``` ### G.2 CardDAV Contacts Synchronization ```python # services/api-gateway/app/services/carddav_service.py from typing import List, Dict, Any, Optional import vobject import httpx class CardDAVService: """CardDAV contacts synchronization service""" def __init__(self, config: dict): self.server_url = config['server_url'] self.username = config['username'] self.password = config['password'] self.addressbook_url = f"{self.server_url}/remote.php/dav/addressbooks/users/{self.username}/contacts/" async def get_contacts(self) -> List[Dict[str, Any]]: """Fetch all contacts from CardDAV server""" async with httpx.AsyncClient() as client: response = await client.request( method="PROPFIND", url=self.addressbook_url, auth=(self.username, self.password), headers={ "Depth": "1", "Content-Type": "application/xml" }, content=''' ''' ) contacts = [] # Parse response and extract vCards for vcard_data in self._parse_propfind_response(response.text): contact = self._parse_vcard(vcard_data) if contact: contacts.append(contact) return contacts async def create_contact(self, contact: Dict[str, Any]) -> str: """Create a new contact""" vcard = self._create_vcard(contact) uid = contact.get('uid', str(uuid.uuid4())) async with httpx.AsyncClient() as client: response = await client.put( f"{self.addressbook_url}{uid}.vcf", auth=(self.username, self.password), headers={"Content-Type": "text/vcard; charset=utf-8"}, content=vcard.serialize() ) response.raise_for_status() return uid async def update_contact(self, uid: str, contact: Dict[str, Any]) -> bool: """Update an existing contact""" vcard = self._create_vcard(contact) async with httpx.AsyncClient() as client: response = await client.put( f"{self.addressbook_url}{uid}.vcf", auth=(self.username, self.password), headers={"Content-Type": "text/vcard; charset=utf-8"}, content=vcard.serialize() ) return response.status_code == 204 async def delete_contact(self, uid: str) -> bool: """Delete a contact""" async with httpx.AsyncClient() as client: response = await client.delete( f"{self.addressbook_url}{uid}.vcf", auth=(self.username, self.password) ) return response.status_code == 204 def _parse_vcard(self, vcard_text: str) -> Optional[Dict[str, Any]]: """Parse vCard text into contact dictionary""" try: vcard = vobject.readOne(vcard_text) contact = { 'uid': str(vcard.uid.value) if hasattr(vcard, 'uid') else None, 'full_name': str(vcard.fn.value) if hasattr(vcard, 'fn') else None, 'emails': [], 'phones': [], 'organization': None, 'title': None } if hasattr(vcard, 'email'): for email in vcard.email_list: contact['emails'].append(str(email.value)) if hasattr(vcard, 'tel'): for tel in vcard.tel_list: contact['phones'].append(str(tel.value)) if hasattr(vcard, 'org'): contact['organization'] = str(vcard.org.value[0]) if hasattr(vcard, 'title'): contact['title'] = str(vcard.title.value) return contact except Exception: return None def _create_vcard(self, contact: Dict[str, Any]) -> vobject.vCard: """Create vCard from contact dictionary""" vcard = vobject.vCard() vcard.add('fn').value = contact.get('full_name', '') vcard.add('uid').value = contact.get('uid', str(uuid.uuid4())) if contact.get('emails'): for email in contact['emails']: vcard.add('email').value = email if contact.get('phones'): for phone in contact['phones']: vcard.add('tel').value = phone if contact.get('organization'): vcard.add('org').value = [contact['organization']] if contact.get('title'): vcard.add('title').value = contact['title'] return vcard # API Endpoints @router.get("/api/carddav/contacts") async def list_contacts( current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """List all contacts from CardDAV""" config = await get_user_carddav_config(current_user, db) service = CardDAVService(config) contacts = await service.get_contacts() return {"contacts": contacts} @router.post("/api/carddav/contacts") async def create_contact( contact: ContactCreate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Create a new contact""" config = await get_user_carddav_config(current_user, db) service = CardDAVService(config) uid = await service.create_contact(contact.dict()) return {"uid": uid, "message": "Contact created"} ``` ### G.3 Google Calendar Sync ```python # services/api-gateway/app/services/google_calendar_service.py from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import Flow from googleapiclient.discovery import build from typing import List, Dict, Any, Optional from datetime import datetime, timedelta class GoogleCalendarService: """Google Calendar synchronization service""" SCOPES = ['https://www.googleapis.com/auth/calendar'] def __init__(self, credentials: Credentials): self.service = build('calendar', 'v3', credentials=credentials) @classmethod async def get_authorization_url(cls, redirect_uri: str) -> tuple[str, str]: """Get OAuth authorization URL""" flow = Flow.from_client_secrets_file( settings.GOOGLE_CLIENT_SECRETS_FILE, scopes=cls.SCOPES, redirect_uri=redirect_uri ) authorization_url, state = flow.authorization_url( access_type='offline', include_granted_scopes='true' ) return authorization_url, state @classmethod async def exchange_code(cls, code: str, redirect_uri: str) -> Credentials: """Exchange authorization code for credentials""" flow = Flow.from_client_secrets_file( settings.GOOGLE_CLIENT_SECRETS_FILE, scopes=cls.SCOPES, redirect_uri=redirect_uri ) flow.fetch_token(code=code) return flow.credentials async def list_calendars(self) -> List[Dict[str, Any]]: """List all calendars""" calendars = [] page_token = None while True: calendar_list = self.service.calendarList().list( pageToken=page_token ).execute() for calendar in calendar_list.get('items', []): calendars.append({ 'id': calendar['id'], 'summary': calendar['summary'], 'description': calendar.get('description'), 'primary': calendar.get('primary', False), 'accessRole': calendar.get('accessRole') }) page_token = calendar_list.get('nextPageToken') if not page_token: break return calendars async def get_events( self, calendar_id: str = 'primary', time_min: Optional[datetime] = None, time_max: Optional[datetime] = None, max_results: int = 100 ) -> List[Dict[str, Any]]: """Get events from calendar""" if not time_min: time_min = datetime.utcnow() if not time_max: time_max = time_min + timedelta(days=30) events_result = self.service.events().list( calendarId=calendar_id, timeMin=time_min.isoformat() + 'Z', timeMax=time_max.isoformat() + 'Z', maxResults=max_results, singleEvents=True, orderBy='startTime' ).execute() events = [] for event in events_result.get('items', []): events.append({ 'id': event['id'], 'summary': event.get('summary'), 'description': event.get('description'), 'start': event['start'].get('dateTime', event['start'].get('date')), 'end': event['end'].get('dateTime', event['end'].get('date')), 'location': event.get('location'), 'attendees': event.get('attendees', []), 'status': event.get('status') }) return events async def create_event( self, calendar_id: str, event: Dict[str, Any] ) -> Dict[str, Any]: """Create a calendar event""" event_body = { 'summary': event['summary'], 'description': event.get('description'), 'location': event.get('location'), 'start': { 'dateTime': event['start'].isoformat(), 'timeZone': event.get('timezone', 'UTC') }, 'end': { 'dateTime': event['end'].isoformat(), 'timeZone': event.get('timezone', 'UTC') } } if event.get('attendees'): event_body['attendees'] = [ {'email': email} for email in event['attendees'] ] created_event = self.service.events().insert( calendarId=calendar_id, body=event_body, sendUpdates='all' if event.get('notify_attendees') else 'none' ).execute() return { 'id': created_event['id'], 'htmlLink': created_event.get('htmlLink') } async def sync_to_nextcloud( self, calendar_id: str, nextcloud_calendar_service ) -> Dict[str, int]: """Sync Google Calendar events to Nextcloud CalDAV""" events = await self.get_events(calendar_id) stats = {'created': 0, 'updated': 0, 'skipped': 0} for event in events: # Check if event already exists in Nextcloud existing = await nextcloud_calendar_service.find_event_by_google_id( event['id'] ) if existing: # Update if modified await nextcloud_calendar_service.update_event( existing['uid'], event ) stats['updated'] += 1 else: # Create new event await nextcloud_calendar_service.create_event(event) stats['created'] += 1 return stats # API Endpoints @router.get("/api/google-calendar/auth") async def google_calendar_auth( current_user: User = Depends(get_current_user) ): """Initiate Google Calendar OAuth flow""" redirect_uri = f"{settings.BASE_URL}/api/google-calendar/callback" auth_url, state = await GoogleCalendarService.get_authorization_url(redirect_uri) return {"authorization_url": auth_url, "state": state} @router.get("/api/google-calendar/callback") async def google_calendar_callback( code: str, state: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Handle Google Calendar OAuth callback""" redirect_uri = f"{settings.BASE_URL}/api/google-calendar/callback" credentials = await GoogleCalendarService.exchange_code(code, redirect_uri) # Store credentials securely await store_user_google_credentials(current_user, credentials, db) return {"message": "Google Calendar connected successfully"} @router.get("/api/google-calendar/events") async def list_google_events( calendar_id: str = "primary", current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """List events from Google Calendar""" credentials = await get_user_google_credentials(current_user, db) service = GoogleCalendarService(credentials) events = await service.get_events(calendar_id) return {"events": events} @router.post("/api/google-calendar/sync") async def sync_google_to_nextcloud( calendar_id: str = "primary", current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Sync Google Calendar to Nextcloud""" google_creds = await get_user_google_credentials(current_user, db) nextcloud_config = await get_user_nextcloud_config(current_user, db) google_service = GoogleCalendarService(google_creds) nextcloud_service = NextcloudCalendarService(nextcloud_config) stats = await google_service.sync_to_nextcloud(calendar_id, nextcloud_service) return {"message": "Sync completed", "stats": stats} ``` ### G.4 Nextcloud App Store Packaging **Purpose:** Package VoiceAssist apps for Nextcloud App Store distribution #### Implementation Guide ```bash # Nextcloud App Structure voiceassist-nextcloud/ ├── appinfo/ │ ├── info.xml # App metadata │ ├── routes.php # Route definitions │ └── app.php # App initialization ├── lib/ │ ├── Controller/ │ │ └── PageController.php │ ├── Service/ │ │ └── VoiceAssistService.php │ └── AppInfo/ │ └── Application.php ├── templates/ │ └── index.php # Main template ├── js/ │ └── voiceassist.js # React app bundle ├── css/ │ └── voiceassist.css └── img/ └── app.svg # App icon ``` **App Metadata (info.xml):** ```xml voiceassist VoiceAssist Medical AI Voice Assistant 1.0.0 agpl VoiceAssist Team VoiceAssist office tools https://github.com/voiceassist/nextcloud-app/issues https://github.com/voiceassist/nextcloud-app VoiceAssist voiceassist.page.index app.svg OCA\VoiceAssist\Settings\AdminSettings OCA\VoiceAssist\Settings\AdminSection ``` **Build and Package Script:** ```python # scripts/package_nextcloud_app.py import os import shutil import subprocess from pathlib import Path def package_nextcloud_app(): """Package VoiceAssist as Nextcloud app""" # Build React app print("Building React app...") subprocess.run( ["pnpm", "build"], cwd="clients/web-app", check=True ) # Create Nextcloud app directory app_dir = Path("dist/voiceassist-nextcloud") if app_dir.exists(): shutil.rmtree(app_dir) app_dir.mkdir(parents=True) # Copy app structure shutil.copytree("nextcloud-app/appinfo", app_dir / "appinfo") shutil.copytree("nextcloud-app/lib", app_dir / "lib") shutil.copytree("nextcloud-app/templates", app_dir / "templates") shutil.copytree("nextcloud-app/img", app_dir / "img") # Copy built assets (app_dir / "js").mkdir() (app_dir / "css").mkdir() for js_file in Path("clients/web-app/dist/assets").glob("*.js"): shutil.copy(js_file, app_dir / "js" / "voiceassist.js") break for css_file in Path("clients/web-app/dist/assets").glob("*.css"): shutil.copy(css_file, app_dir / "css" / "voiceassist.css") break # Create tarball for app store print("Creating tarball...") subprocess.run( ["tar", "-czf", "voiceassist.tar.gz", "voiceassist-nextcloud"], cwd="dist", check=True ) # Sign the app (requires Nextcloud certificate) if os.environ.get("NEXTCLOUD_CERT"): print("Signing app...") subprocess.run([ "openssl", "dgst", "-sha512", "-sign", os.environ["NEXTCLOUD_CERT"], "-out", "dist/voiceassist.tar.gz.signature", "dist/voiceassist.tar.gz" ], check=True) print(f"App packaged: dist/voiceassist.tar.gz") if __name__ == "__main__": package_nextcloud_app() ``` #### Testing for Nextcloud Integration ```python class TestNextcloudIntegration: @pytest.mark.integration async def test_carddav_contact_sync(self, carddav_service): """Test CardDAV contact synchronization""" # Create contact contact = { 'full_name': 'Test Doctor', 'emails': ['doctor@hospital.org'], 'phones': ['+1234567890'], 'organization': 'Test Hospital' } uid = await carddav_service.create_contact(contact) assert uid is not None # Fetch and verify contacts = await carddav_service.get_contacts() found = next((c for c in contacts if c['uid'] == uid), None) assert found is not None assert found['full_name'] == 'Test Doctor' # Cleanup await carddav_service.delete_contact(uid) @pytest.mark.integration async def test_google_calendar_sync(self, google_service, nextcloud_service): """Test Google to Nextcloud calendar sync""" stats = await google_service.sync_to_nextcloud('primary', nextcloud_service) assert stats['created'] >= 0 assert stats['updated'] >= 0 @pytest.mark.integration async def test_nextcloud_app_health(self, test_client): """Test Nextcloud app is accessible""" response = await test_client.get("/apps/voiceassist/") assert response.status_code == 200 ``` --- ## Feature Category H: Advanced RAG & Reasoning **Timeline:** 4-5 weeks **Priority:** HIGH ### H.1 Advanced RAG Techniques **Original Scope:** Basic semantic search **Enhanced Scope:** Hybrid search, re-ranking, query expansion, multi-hop reasoning ```python # services/api-gateway/app/services/advanced_rag.py from typing import List, Optional, Dict, Any from dataclasses import dataclass import asyncio @dataclass class SearchResult: doc_id: str content: str score: float metadata: Dict[str, Any] source: str # 'semantic', 'keyword', 'hybrid' class HybridSearchEngine: """Combines semantic and keyword search with re-ranking""" def __init__(self): self.embedding_service = EmbeddingService() self.vector_db = QdrantClient() self.bm25_index = BM25Index() self.reranker = CrossEncoderReranker() async def search( self, query: str, top_k: int = 10, alpha: float = 0.5, # Weight between semantic (1) and keyword (0) filters: Optional[Dict[str, Any]] = None, rerank: bool = True ) -> List[SearchResult]: """ Perform hybrid search combining semantic and keyword search. Args: query: Search query top_k: Number of results to return alpha: Balance between semantic (1.0) and keyword (0.0) search filters: Metadata filters (e.g., source_type, date_range) rerank: Whether to apply cross-encoder re-ranking """ # Expand query with medical synonyms expanded_query = await self._expand_query(query) # Run semantic and keyword search in parallel semantic_results, keyword_results = await asyncio.gather( self._semantic_search(expanded_query, top_k * 2, filters), self._keyword_search(expanded_query, top_k * 2, filters) ) # Fuse results using Reciprocal Rank Fusion fused_results = self._reciprocal_rank_fusion( semantic_results, keyword_results, alpha=alpha ) # Re-rank with cross-encoder if rerank and len(fused_results) > 0: fused_results = await self.reranker.rerank( query=query, documents=fused_results, top_k=top_k ) return fused_results[:top_k] async def _expand_query(self, query: str) -> str: """Expand query with medical synonyms and related terms""" # Use medical ontology for synonym expansion # e.g., "heart attack" -> "heart attack myocardial infarction MI" from app.services.medical_ontology import MedicalOntology ontology = MedicalOntology() expanded_terms = await ontology.get_synonyms(query) return f"{query} {' '.join(expanded_terms)}" async def _semantic_search( self, query: str, top_k: int, filters: Optional[Dict[str, Any]] ) -> List[SearchResult]: """Vector similarity search""" query_embedding = await self.embedding_service.generate_embedding(query) results = await self.vector_db.search( collection_name="medical_kb", query_vector=query_embedding, limit=top_k, query_filter=self._build_qdrant_filter(filters) ) return [ SearchResult( doc_id=r.id, content=r.payload['content'], score=r.score, metadata=r.payload.get('metadata', {}), source='semantic' ) for r in results ] async def _keyword_search( self, query: str, top_k: int, filters: Optional[Dict[str, Any]] ) -> List[SearchResult]: """BM25 keyword search""" results = await self.bm25_index.search( query=query, top_k=top_k, filters=filters ) return [ SearchResult( doc_id=r['id'], content=r['content'], score=r['score'], metadata=r.get('metadata', {}), source='keyword' ) for r in results ] def _reciprocal_rank_fusion( self, semantic_results: List[SearchResult], keyword_results: List[SearchResult], alpha: float = 0.5, k: int = 60 ) -> List[SearchResult]: """ Fuse results using Reciprocal Rank Fusion. RRF score = sum(1 / (k + rank_i)) """ scores = {} # Score semantic results for rank, result in enumerate(semantic_results): rrf_score = alpha * (1 / (k + rank + 1)) if result.doc_id not in scores: scores[result.doc_id] = {'result': result, 'score': 0} scores[result.doc_id]['score'] += rrf_score # Score keyword results for rank, result in enumerate(keyword_results): rrf_score = (1 - alpha) * (1 / (k + rank + 1)) if result.doc_id not in scores: scores[result.doc_id] = {'result': result, 'score': 0} scores[result.doc_id]['score'] += rrf_score # Sort by fused score sorted_results = sorted( scores.values(), key=lambda x: x['score'], reverse=True ) return [ SearchResult( doc_id=item['result'].doc_id, content=item['result'].content, score=item['score'], metadata=item['result'].metadata, source='hybrid' ) for item in sorted_results ] class CrossEncoderReranker: """Re-rank results using a cross-encoder model""" def __init__(self): from sentence_transformers import CrossEncoder self.model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') async def rerank( self, query: str, documents: List[SearchResult], top_k: int ) -> List[SearchResult]: """Re-rank documents using cross-encoder""" if not documents: return [] # Prepare pairs for cross-encoder pairs = [(query, doc.content) for doc in documents] # Get cross-encoder scores loop = asyncio.get_event_loop() scores = await loop.run_in_executor( None, lambda: self.model.predict(pairs) ) # Combine with original results for doc, score in zip(documents, scores): doc.score = float(score) # Sort by reranked score reranked = sorted(documents, key=lambda x: x.score, reverse=True) return reranked[:top_k] class MultiHopReasoner: """Multi-hop reasoning for complex queries""" def __init__(self): self.search_engine = HybridSearchEngine() self.llm = LLMService() async def reason( self, query: str, max_hops: int = 3, context: Optional[str] = None ) -> Dict[str, Any]: """ Perform multi-hop reasoning. 1. Decompose complex query into sub-questions 2. Answer each sub-question with retrieval 3. Synthesize final answer """ # Step 1: Decompose query sub_questions = await self._decompose_query(query) reasoning_chain = [] accumulated_context = context or "" # Step 2: Answer sub-questions iteratively for i, sub_q in enumerate(sub_questions[:max_hops]): # Search with accumulated context search_results = await self.search_engine.search( query=sub_q, top_k=5 ) # Generate intermediate answer intermediate_answer = await self._generate_intermediate_answer( question=sub_q, context=accumulated_context, retrieved_docs=search_results ) reasoning_chain.append({ 'step': i + 1, 'question': sub_q, 'retrieved_docs': [r.doc_id for r in search_results], 'answer': intermediate_answer }) # Update context for next hop accumulated_context += f"\n\nQ: {sub_q}\nA: {intermediate_answer}" # Step 3: Synthesize final answer final_answer = await self._synthesize_answer( original_query=query, reasoning_chain=reasoning_chain ) return { 'answer': final_answer, 'reasoning_chain': reasoning_chain, 'confidence': self._calculate_confidence(reasoning_chain) } async def _decompose_query(self, query: str) -> List[str]: """Decompose complex query into sub-questions""" prompt = f"""Break down this complex medical question into simpler sub-questions that can be answered independently: Question: {query} Generate 2-4 sub-questions that together would help answer the main question. Format: One question per line, no numbering.""" response = await self.llm.generate(prompt, max_tokens=200) sub_questions = [q.strip() for q in response.split('\n') if q.strip()] return sub_questions async def _generate_intermediate_answer( self, question: str, context: str, retrieved_docs: List[SearchResult] ) -> str: """Generate answer for a sub-question""" doc_context = "\n\n".join([ f"Source: {doc.metadata.get('source', 'Unknown')}\n{doc.content}" for doc in retrieved_docs ]) prompt = f"""Based on the following context, answer the question concisely. Previous Context: {context} Retrieved Information: {doc_context} Question: {question} Answer (be concise and cite sources):""" return await self.llm.generate(prompt, max_tokens=300) async def _synthesize_answer( self, original_query: str, reasoning_chain: List[Dict] ) -> str: """Synthesize final answer from reasoning chain""" chain_text = "\n".join([ f"Step {step['step']}: {step['question']}\nAnswer: {step['answer']}" for step in reasoning_chain ]) prompt = f"""Based on the following reasoning chain, provide a comprehensive answer to the original question. Original Question: {original_query} Reasoning Chain: {chain_text} Synthesized Answer (comprehensive, well-structured, with citations):""" return await self.llm.generate(prompt, max_tokens=500) def _calculate_confidence(self, reasoning_chain: List[Dict]) -> float: """Calculate confidence score for the reasoning""" # Simple heuristic based on chain completeness if not reasoning_chain: return 0.0 return min(1.0, len(reasoning_chain) * 0.3) ``` --- ## Comprehensive Testing Strategy ### Testing Categories ```python # tests/conftest.py - Shared fixtures import pytest import asyncio from typing import Generator from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker @pytest.fixture(scope="session") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close() @pytest.fixture(scope="session") def test_db(): """Create test database""" engine = create_engine("postgresql://test:test@localhost:5432/voiceassist_test") Base.metadata.create_all(engine) yield engine Base.metadata.drop_all(engine) @pytest.fixture def db_session(test_db) -> Generator: """Create new database session for each test""" Session = sessionmaker(bind=test_db) session = Session() yield session session.rollback() session.close() @pytest.fixture async def test_client(db_session): """Async test client""" from httpx import AsyncClient from app.main import app app.dependency_overrides[get_db] = lambda: db_session async with AsyncClient(app=app, base_url="http://test") as client: yield client @pytest.fixture async def authenticated_user(test_client, db_session): """Create and authenticate test user""" # Create user user = User(email="test@example.com", hashed_password=hash_password("testpass")) db_session.add(user) db_session.commit() # Login response = await test_client.post("/api/auth/login", json={ "email": "test@example.com", "password": "testpass" }) token = response.json()["access_token"] class AuthenticatedUser: def __init__(self): self.user = user self.token = token self.headers = {"Authorization": f"Bearer {token}"} return AuthenticatedUser() ``` ### Test Categories Summary | Category | Description | Coverage Target | | ----------------- | ------------------------------- | --------------- | | Unit Tests | Individual function/class tests | 85% | | Integration Tests | API endpoint tests | 80% | | E2E Tests | Full workflow tests | Key paths | | Load Tests | Performance under load | P95 < targets | | Security Tests | Vulnerability scanning | 0 critical/high | | Chaos Tests | Failure mode testing | Recovery < 30s | --- ## Performance Benchmarks ### Target Metrics | Feature | Metric | Target | Critical Threshold | | ------------------- | ---------------- | ------- | ------------------ | | File Processing | 10MB PDF | < 30s | < 60s | | Voice Latency | First audio byte | < 300ms | < 500ms | | Hybrid Search | Query response | < 200ms | < 500ms | | Multi-hop Reasoning | 3-hop query | < 5s | < 10s | | Share Link Access | Time to render | < 500ms | < 1s | | Email Fetch | 50 messages | < 2s | < 5s | | MFA Verification | TOTP check | < 50ms | < 100ms | ### Load Test Scenarios ```yaml # k6 load test configuration scenarios: file_upload: executor: ramping-vus startVUs: 1 stages: - duration: 1m target: 10 - duration: 3m target: 50 - duration: 1m target: 0 thresholds: http_req_duration: - p(95)<30000 - p(99)<60000 voice_sessions: executor: constant-vus vus: 100 duration: 5m thresholds: ws_connecting: - p(95)<500 ws_session_duration: - p(95)<300000 hybrid_search: executor: constant-arrival-rate rate: 100 timeUnit: 1s duration: 5m thresholds: http_req_duration: - p(95)<200 - p(99)<500 ``` --- ## Security Considerations ### Security Checklist - [ ] **File Upload Security** - [ ] File type validation (whitelist) - [ ] File size limits (configurable) - [ ] Virus scanning (ClamAV) - [ ] Secure file storage (S3 with signed URLs) - [ ] No path traversal vulnerabilities - [ ] **Voice Session Security** - [ ] Encrypted WebSocket connections - [ ] Session token rotation - [ ] Rate limiting per user - [ ] Audio data not persisted (unless configured) - [ ] **Sharing Security** - [ ] Cryptographically secure share tokens - [ ] Password protection option - [ ] Expiration enforcement - [ ] Access logging for audit - [ ] Rate limiting on access attempts - [ ] **OIDC/MFA Security** - [ ] State parameter validation (CSRF prevention) - [ ] Nonce validation - [ ] Token encryption at rest - [ ] Backup code hashing - [ ] Brute force protection - [ ] **Email Security** - [ ] Credentials encrypted at rest - [ ] TLS for IMAP/SMTP - [ ] PHI detection in email content - [ ] Attachment scanning --- ## Implementation Timeline **Total Duration:** 17-21 weeks (4-5 months) ### Phase 1: Voice Pipeline Completion (Weeks 1-4) | Week | Focus | Deliverables | | ---- | -------------------- | ------------------------------------------------ | | 1 | Voice Infrastructure | WebSocket handler, VAD setup | | 2 | OpenAI Realtime | Realtime API integration, streaming | | 3 | Voice Features | Barge-in, echo cancellation, noise suppression | | 4 | Voice Auth | Voice authentication enrollment and verification | **Milestone:** Full voice pipeline operational ### Phase 2: Advanced Medical AI (Weeks 5-9) | Week | Focus | Deliverables | | ---- | ------------------- | --------------------------------------------- | | 5 | Medical Models | BioGPT/PubMedBERT integration | | 6 | Medical NER | Entity extraction, UMLS linking | | 7 | Domain LLM | Domain-specific prompts, classification | | 8-9 | Multi-hop Reasoning | Query decomposition, cross-document synthesis | **Milestone:** Medical AI capabilities enhanced ### Phase 3: External Medical Integrations (Weeks 10-16) | Week | Focus | Deliverables | | ----- | ------------------- | -------------------------------------- | | 10-11 | UpToDate | Search, drug interactions, calculators | | 12-13 | OpenEvidence | Evidence queries, systematic reviews | | 14-15 | PubMed | Literature search, citation management | | 16 | Medical Calculators | Wells, CHA2DS2-VASc, eGFR, etc. | **Milestone:** External integrations complete ### Phase 4: Nextcloud Integration (Weeks 17-20) | Week | Focus | Deliverables | | ---- | -------------- | ------------------------------ | | 17 | OIDC/SSO | Nextcloud SSO, MFA integration | | 18 | Email/Contacts | IMAP/SMTP, CardDAV sync | | 19 | Calendar | Google Calendar sync, CalDAV | | 20 | App Packaging | Nextcloud App Store packaging | **Milestone:** Nextcloud integration complete ### Phase 5: Advanced RAG & Polish (Weeks 21-24) | Week | Focus | Deliverables | | ---- | --------------- | --------------------------------------------- | | 21 | Hybrid Search | Semantic + keyword, re-ranking | | 22 | File Processing | PDF/OCR, entity extraction | | 23 | Testing | Integration tests, load tests, security audit | | 24 | Documentation | API docs, deployment guides | **Milestone:** Production-ready system ### Timeline Summary | Category | Duration | Priority | | -------------------------------- | --------------- | -------- | | Voice Pipeline Completion | 3-4 weeks | HIGH | | Advanced Medical AI | 4-5 weeks | HIGH | | External Medical Integrations | 6-8 weeks | HIGH | | Nextcloud Integration Completion | 4-5 weeks | HIGH | | Advanced RAG & Polish | 3-4 weeks | MEDIUM | | **Total** | **17-21 weeks** | - | --- ## Dependencies & Prerequisites ### Required Packages ```bash # Python - Core pip install python-jose[cryptography] # JWT pip install passlib[bcrypt] # Password hashing pip install authlib # OIDC pip install pyotp # TOTP pip install qrcode[pil] # QR codes # File Processing pip install pypdf # PDF text extraction pip install pdf2image # PDF to image pip install pytesseract # OCR pip install pillow # Image processing # Voice Pipeline pip install webrtcvad # Voice activity detection pip install websockets # WebSocket client pip install resemblyzer # Voice authentication pip install soundfile # Audio processing pip install librosa # Audio resampling # Medical AI pip install transformers # BioGPT, PubMedBERT pip install torch # Deep learning pip install scispacy # Medical NER pip install spacy # NLP foundation pip install en_core_sci_lg # SciSpacy model # RAG & Search pip install sentence-transformers # Cross-encoder re-ranking pip install rank-bm25 # BM25 search pip install qdrant-client # Vector DB # External Integrations pip install biopython # PubMed/Entrez pip install httpx # Async HTTP client pip install vobject # vCard/CardDAV pip install google-api-python-client # Google Calendar pip install google-auth-oauthlib # Google OAuth ``` ### Infrastructure - **Redis**: Session storage, caching - **PostgreSQL**: Primary database with pgvector - **Qdrant**: Vector storage for embeddings - **Tesseract**: OCR engine - **ClamAV**: Virus scanning - **GPU (Optional)**: For medical model inference ### External Services - **OpenAI**: GPT-4, Realtime API, Whisper, Embeddings - **OIDC Providers**: Nextcloud, Google, Microsoft - **UpToDate API**: Clinical decision support (requires license) - **OpenEvidence API**: Evidence-based medicine - **PubMed/NCBI**: Literature search (free with API key) - **Google Calendar API**: Calendar sync --- ## Risk Assessment & Mitigation | Risk | Impact | Probability | Mitigation | | ---------------------------- | ------ | ----------- | ------------------------------ | | OpenAI Realtime API latency | High | Medium | Fallback to Whisper + TTS | | File processing memory usage | Medium | High | Streaming, size limits | | OIDC provider downtime | Medium | Low | JWT fallback auth | | Voice quality issues | High | Medium | Quality monitoring, fallbacks | | MFA adoption resistance | Low | Medium | Clear onboarding, backup codes | --- ## Success Criteria ### Functional - [ ] All file types processed correctly - [ ] Voice sessions maintain < 500ms latency - [ ] Share links work with all permission levels - [ ] OIDC works with all configured providers - [ ] MFA reduces unauthorized access to 0 ### Performance - [ ] P95 latencies within targets - [ ] 100 concurrent voice sessions - [ ] 1000 concurrent search queries - [ ] < 1% error rate ### Security - [ ] 0 critical vulnerabilities - [ ] 0 high vulnerabilities - [ ] All PHI properly protected - [ ] Audit logs complete --- ## Appendix: API Reference _Detailed OpenAPI specifications will be generated from the implemented endpoints._ --- **Document Version:** 1.0 **Created:** 2025-11-26 **Last Updated:** 2025-11-26 **Author:** Claude (AI Assistant) **Status:** Ready for Review 6:["slug","PART2_DEFERRED_BACKEND_FEATURES_PLAN","c"] 0:["X7oMT3VrOffzp0qvbeOas",[[["",{"children":["docs",{"children":[["slug","PART2_DEFERRED_BACKEND_FEATURES_PLAN","c"],{"children":["__PAGE__?{\"slug\":[\"PART2_DEFERRED_BACKEND_FEATURES_PLAN\"]}",{}]}]}]},"$undefined","$undefined",true],["",{"children":["docs",{"children":[["slug","PART2_DEFERRED_BACKEND_FEATURES_PLAN","c"],{"children":["__PAGE__",{},[["$L1",["$","div",null,{"children":[["$","div",null,{"className":"mb-6 flex items-center justify-between gap-4","children":[["$","div",null,{"children":[["$","p",null,{"className":"text-sm text-gray-500 dark:text-gray-400","children":"Docs / Raw"}],["$","h1",null,{"className":"text-3xl font-bold text-gray-900 dark:text-white","children":"Part2 Deferred Backend Features Plan"}],["$","p",null,{"className":"text-sm text-gray-600 dark:text-gray-400","children":["Sourced from"," ",["$","code",null,{"className":"font-mono text-xs","children":["docs/","PART2_DEFERRED_BACKEND_FEATURES_PLAN.md"]}]]}]]}],["$","a",null,{"href":"https://github.com/mohammednazmy/VoiceAssist/edit/main/docs/PART2_DEFERRED_BACKEND_FEATURES_PLAN.md","target":"_blank","rel":"noreferrer","className":"inline-flex items-center gap-2 rounded-md border border-gray-200 dark:border-gray-700 px-3 py-1.5 text-sm text-gray-700 dark:text-gray-200 hover:border-primary-500 dark:hover:border-primary-400 hover:text-primary-700 dark:hover:text-primary-300","children":"Edit on GitHub"}]]}],["$","div",null,{"className":"rounded-lg border border-gray-200 dark:border-gray-800 bg-white dark:bg-gray-900 p-6","children":["$","$L2",null,{"content":"$3"}]}],["$","div",null,{"className":"mt-6 flex flex-wrap gap-2 text-sm","children":[["$","$L4",null,{"href":"/reference/all-docs","className":"inline-flex items-center gap-1 rounded-md bg-gray-100 px-3 py-1 text-gray-700 hover:bg-gray-200 dark:bg-gray-800 dark:text-gray-200 dark:hover:bg-gray-700","children":"← All documentation"}],["$","$L4",null,{"href":"/","className":"inline-flex items-center gap-1 rounded-md bg-gray-100 px-3 py-1 text-gray-700 hover:bg-gray-200 dark:bg-gray-800 dark:text-gray-200 dark:hover:bg-gray-700","children":"Home"}]]}]]}],null],null],null]},[null,["$","$L5",null,{"parallelRouterKey":"children","segmentPath":["children","docs","children","$6","children"],"error":"$undefined","errorStyles":"$undefined","errorScripts":"$undefined","template":["$","$L7",null,{}],"templateStyles":"$undefined","templateScripts":"$undefined","notFound":"$undefined","notFoundStyles":"$undefined"}]],null]},[null,["$","$L5",null,{"parallelRouterKey":"children","segmentPath":["children","docs","children"],"error":"$undefined","errorStyles":"$undefined","errorScripts":"$undefined","template":["$","$L7",null,{}],"templateStyles":"$undefined","templateScripts":"$undefined","notFound":"$undefined","notFoundStyles":"$undefined"}]],null]},[[[["$","link","0",{"rel":"stylesheet","href":"/_next/static/css/7f586cdbbaa33ff7.css","precedence":"next","crossOrigin":"$undefined"}]],["$","html",null,{"lang":"en","className":"h-full","children":["$","body",null,{"className":"__className_f367f3 h-full bg-white dark:bg-gray-900","children":[["$","a",null,{"href":"#main-content","className":"skip-to-content","children":"Skip to main content"}],["$","$L8",null,{"children":[["$","$L9",null,{}],["$","$La",null,{}],["$","main",null,{"id":"main-content","className":"lg:pl-64","role":"main","aria-label":"Documentation content","children":["$","$Lb",null,{"children":["$","$L5",null,{"parallelRouterKey":"children","segmentPath":["children"],"error":"$undefined","errorStyles":"$undefined","errorScripts":"$undefined","template":["$","$L7",null,{}],"templateStyles":"$undefined","templateScripts":"$undefined","notFound":[["$","title",null,{"children":"404: This page could not be found."}],["$","div",null,{"style":{"fontFamily":"system-ui,\"Segoe UI\",Roboto,Helvetica,Arial,sans-serif,\"Apple Color Emoji\",\"Segoe UI Emoji\"","height":"100vh","textAlign":"center","display":"flex","flexDirection":"column","alignItems":"center","justifyContent":"center"},"children":["$","div",null,{"children":[["$","style",null,{"dangerouslySetInnerHTML":{"__html":"body{color:#000;background:#fff;margin:0}.next-error-h1{border-right:1px solid rgba(0,0,0,.3)}@media (prefers-color-scheme:dark){body{color:#fff;background:#000}.next-error-h1{border-right:1px solid rgba(255,255,255,.3)}}"}}],["$","h1",null,{"className":"next-error-h1","style":{"display":"inline-block","margin":"0 20px 0 0","padding":"0 23px 0 0","fontSize":24,"fontWeight":500,"verticalAlign":"top","lineHeight":"49px"},"children":"404"}],["$","div",null,{"style":{"display":"inline-block"},"children":["$","h2",null,{"style":{"fontSize":14,"fontWeight":400,"lineHeight":"49px","margin":0},"children":"This page could not be found."}]}]]}]}]],"notFoundStyles":[]}]}]}]]}]]}]}]],null],null],["$Lc",null]]]] c:[["$","meta","0",{"name":"viewport","content":"width=device-width, initial-scale=1"}],["$","meta","1",{"charSet":"utf-8"}],["$","title","2",{"children":"Part2 Deferred Backend Features Plan | Docs | VoiceAssist Docs"}],["$","meta","3",{"name":"description","content":"**Date:** 2025-11-26"}],["$","meta","4",{"name":"keywords","content":"VoiceAssist,documentation,medical AI,voice assistant,healthcare,HIPAA,API"}],["$","meta","5",{"name":"robots","content":"index, follow"}],["$","meta","6",{"name":"googlebot","content":"index, follow"}],["$","link","7",{"rel":"canonical","href":"https://assistdocs.asimo.io"}],["$","meta","8",{"property":"og:title","content":"VoiceAssist Documentation"}],["$","meta","9",{"property":"og:description","content":"Comprehensive documentation for VoiceAssist - Enterprise Medical AI Assistant"}],["$","meta","10",{"property":"og:url","content":"https://assistdocs.asimo.io"}],["$","meta","11",{"property":"og:site_name","content":"VoiceAssist Docs"}],["$","meta","12",{"property":"og:type","content":"website"}],["$","meta","13",{"name":"twitter:card","content":"summary"}],["$","meta","14",{"name":"twitter:title","content":"VoiceAssist Documentation"}],["$","meta","15",{"name":"twitter:description","content":"Comprehensive documentation for VoiceAssist - Enterprise Medical AI Assistant"}],["$","meta","16",{"name":"next-size-adjust"}]] 1:null