VoiceAssist V2 - Unified Architecture Documentation
Last Updated: 2025-12-02 (All 16 Phases Complete) Status: Canonical Reference Purpose: Comprehensive system architecture covering all components, data flows, and integration points
Table of Contents
- Executive Summary
- System Overview
- Architecture Principles
- Current Implementation Status
- Component Architecture
- Data Architecture
- Integration Architecture
- Security Architecture
- Deployment Architecture
- Observability Architecture
- Data Flow Examples
- Technology Stack
- Architecture Evolution
- Design Decisions and Trade-offs
Executive Summary
VoiceAssist V2 is an enterprise-grade, HIPAA-compliant medical AI assistant designed to support clinical decision-making through voice and text interfaces. The system has completed all 16 phases (0-15) with progressive architecture:
- Phases 0-10: Monorepo-first backend with Docker Compose orchestration
- Phases 11-14: Security hardening, HA/DR, testing, production deployment
- Phase 15: Final review and handoff
Current Capabilities (all phases complete):
- ✅ JWT-based authentication with token revocation
- ✅ Role-based access control (RBAC) for admin operations
- ✅ RAG-powered medical knowledge base with semantic search
- ✅ Real-time WebSocket communication for streaming responses
- ✅ Nextcloud integration (CalDAV, WebDAV, file auto-indexing)
- ✅ Multi-level caching (L1 in-memory + L2 Redis)
- ✅ Comprehensive observability (Prometheus metrics, structured logging, SLOs)
- ✅ Admin panel with system monitoring dashboard
- ✅ Async background job processing for document indexing
Design Philosophy: Start simple (monorepo), maintain clear boundaries (logical services), scale when needed (microservices extraction).
System Overview
High-Level Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Users (Web/Mobile) │
│ Browser / Mobile Apps / Web UI │
└────────────────┬────────────────────┬────────────────────────────┘
│ │
┌──────┴──────┐ ┌──────┴──────┐
│ │ │ │
v │ v │
┌───────────────────┐ │ ┌──────────────────────────────────────┐
│ Nextcloud Stack │ │ │ VoiceAssist Backend Stack │
│ (Separate) │ │ │ (This Repository) │
│ │ │ │ │
│ - Identity/SSO │◄──┼──│ API Gateway (FastAPI) │
│ - File Storage │ │ │ Port: 8000 │
│ - Calendar │ │ │ │
│ - Email │ │ │ Logical Services (Phases 0-7): │
│ - User Directory │ │ │ - Auth Service (JWT + RBAC) │
│ │ │ │ - Realtime Service (WebSocket) │
│ Local Dev: │ │ │ - RAG Service (QueryOrchestrator) │
│ Port 8080 │ │ │ - Admin Service (Dashboard + Mgmt) │
│ │ │ │ - KB Indexer (Document Ingestion) │
│ Production: │ │ │ - Integration Service (CalDAV/File) │
│ cloud.asimo.io │ │ │ - Cache Service (L1+L2) │
└───────────────────┘ │ │ - Audit Service (Compliance) │
│ │ │
│ │ Background Workers (ARQ): │
│ │ - Document Indexing Jobs │
│ │ - File Auto-Indexing │
│ └──────────────────────────────────────┘
│
│ HTTPS / OIDC / WebDAV / CalDAV APIs
│
v
┌──────────────────────────────────────────────────────────────────┐
│ Data Layer (Docker Compose) │
│ │
│ ┌──────────────────┐ ┌──────────────┐ ┌─────────────────┐ │
│ │ PostgreSQL │ │ Redis │ │ Qdrant │ │
│ │ (pgvector) │ │ (6 DBs) │ │ (Vectors) │ │
│ │ │ │ │ │ │ │
│ │ Tables: │ │ DB 0: Cache │ │ Collection: │ │
│ │ - users │ │ DB 1: Queue │ │ - medical_kb │ │
│ │ - sessions │ │ DB 2: L2 │ │ │ │
│ │ - messages │ │ DB 3: Token │ │ Embedding: │ │
│ │ - documents │ │ DB 15: Test │ │ - 1536 dims │ │
│ │ - audit_logs │ │ │ │ - Cosine sim │ │
│ └──────────────────┘ └──────────────┘ └─────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
│
┌───────────────────────────┴───────────────────────────────────┐
│ Observability Stack (Docker Compose) │
│ ┌────────────┬────────────┬────────────┬───────────────┐ │
│ │ Prometheus │ Grafana │ (Jaeger) │ Loki (Logs) │ │
│ │ │ │ (Future) │ (Future) │ │
│ │ Metrics: │ Dashboards:│ │ │ │
│ │ - SLOs │ - Health │ │ │ │
│ │ - Cache │ - SLOs │ │ │ │
│ │ - RAG │ - Security │ │ │ │
│ │ - RBAC │ │ │ │ │
│ └────────────┴────────────┴────────────┴───────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Key Architectural Separation
Nextcloud is a separate stack, not part of VoiceAssist deployment.
Local Development:
MacBook Pro
├── ~/Nextcloud-Dev/ # Separate Nextcloud Stack
│ ├── docker-compose.yml # Nextcloud + DB
│ └── Running at: http://localhost:8080
│
└── ~/VoiceAssist/ # VoiceAssist Stack
├── docker-compose.yml # All VoiceAssist services
└── Running at: http://localhost:8000
└── Connects via: NEXTCLOUD_BASE_URL=http://localhost:8080
Integration Pattern:
- VoiceAssist services are clients of Nextcloud
- Communication via HTTP/HTTPS APIs (OIDC, WebDAV, CalDAV, CardDAV)
- No shared Docker Compose project, no shared databases
- Environment variables configure the connection
Architecture Principles
1. Progressive Complexity
Start Simple: Begin with monorepo for rapid development Maintain Boundaries: Enforce logical service boundaries even in monorepo Scale When Needed: Extract to microservices only when scaling requires it
Decision Matrix:
| Factor | Monorepo (Current) | Microservices (Future) |
|---|---|---|
| Team Size | < 5 developers | > 5 developers |
| Concurrent Users | < 50 users | > 50 users |
| Deployment | Single server | Multi-node K8s cluster |
| Development Speed | Faster (single codebase) | Slower (coordination overhead) |
| Operational Complexity | Low (Docker Compose) | High (K8s, service mesh) |
2. Security by Design
- Zero-trust model: Never trust, always verify
- PHI protection: Never log PHI, automatic redaction
- Least privilege: RBAC with granular permissions
- Encryption everywhere: TLS in transit, encryption at rest
- Audit everything: Immutable audit logs for all sensitive operations
3. Observability First
- Metrics: Prometheus for performance and SLO tracking
- Logs: Structured JSON with correlation IDs
- Tracing: Request context propagation (future: OpenTelemetry)
- Dashboards: Grafana for real-time system health
- Alerts: Multi-window, multi-burn-rate SLO alerting
4. API-First Design
- Standard envelope: Consistent response format across all endpoints
- Error codes: Typed error codes for client error handling
- Versioning: API version in URL path (
/api/v1/...) - Documentation: OpenAPI/Swagger auto-generated from code
5. Performance Optimization
- Multi-level caching: L1 (LRU in-memory) + L2 (Redis distributed)
- Connection pooling: Efficient database and API client connections
- Async processing: Background jobs for long-running tasks
- Query optimization: Indexed database queries, vector search tuning
Current Implementation Status
Phase Completion Summary
All 16 project phases (0-15) are complete. See Implementation Status for detailed component status.
| Phase | Status | Key Deliverables |
|---|---|---|
| Phase 0 | ✅ Complete | Project structure, Docker Compose, base infrastructure |
| Phase 1 | ✅ Complete | PostgreSQL, Redis, Qdrant, health endpoints, Alembic migrations |
| Phase 2 | ✅ Complete | JWT auth, password validation, token revocation, Nextcloud integration |
| Phase 3 | ✅ Complete | API Gateway solidified, core endpoints, service boundaries |
| Phase 4 | ✅ Complete | WebSocket realtime communication, QueryOrchestrator integration |
| Phase 5 | ✅ Complete | RAG pipeline, semantic search, document ingestion, OpenAI embeddings |
| Phase 6 | ✅ Complete | CalDAV calendar, WebDAV file indexing, email skeleton |
| Phase 7 | ✅ Complete | RBAC enforcement, admin panel dashboard, smoke tests |
| Phase 8 | ✅ Complete | Distributed tracing, observability infrastructure |
| Phase 9 | ✅ Complete | Infrastructure as code, CI/CD pipelines |
| Phase 10 | ✅ Complete | Load testing, performance optimization |
| Phase 11 | ✅ Complete | Security hardening, HIPAA compliance |
| Phase 12 | ✅ Complete | High availability, disaster recovery |
| Phase 13 | ✅ Complete | Final testing, documentation |
| Phase 14 | ✅ Complete | Production deployment |
| Phase 15 | ✅ Complete | Final review and handoff |
Completed Features
Authentication & Authorization:
- ✅ User registration with password strength validation
- ✅ JWT access tokens (15-min) + refresh tokens (7-day)
- ✅ Token revocation via Redis (dual-level: individual + all-user)
- ✅ Role-based access control (admin vs user)
- ✅ Admin-only endpoints protected with
get_current_admin_userdependency - ✅ Comprehensive audit logging (SHA-256 integrity verification)
Medical AI & Knowledge Base:
- ✅ Document upload (PDF, TXT support)
- ✅ Text extraction and intelligent chunking
- ✅ OpenAI embeddings (text-embedding-3-small, 1536 dimensions)
- ✅ Qdrant vector storage with cosine similarity
- ✅ RAG pipeline with context retrieval and citation tracking
- ✅ QueryOrchestrator with LLM integration
- ✅ Streaming responses via WebSocket
Nextcloud Integration:
- ✅ CalDAV calendar operations (list, create, update, delete events)
- ✅ WebDAV file discovery and auto-indexing
- ✅ Automatic knowledge base population from Nextcloud files
- ✅ Duplicate prevention for re-indexing
Observability & Operations:
- ✅ Prometheus metrics (cache, RAG, RBAC, HTTP, DB)
- ✅ Multi-level caching with hit/miss tracking
- ✅ SLO definitions (availability, latency, cache performance)
- ✅ SLO recording rules and alerting (Prometheus)
- ✅ Grafana dashboards (health, SLOs, security audit)
- ✅ Admin panel dashboard with system summary
Infrastructure:
- ✅ Docker Compose orchestration
- ✅ PostgreSQL with pgvector extension
- ✅ Redis with multiple databases (cache, queue, L2, token revocation)
- ✅ Qdrant vector database
- ✅ ARQ async job queue for background processing
- ✅ Alembic database migrations
Future Enhancements (Optional)
The following features are candidates for future enhancement beyond the current implementation:
- ⏳ OIDC authentication integration (Nextcloud SSO)
- ⏳ Per-user credential management
- ⏳ Complete email integration (threading, search, attachments)
- ⏳ CardDAV contacts integration
- ⏳ BioGPT/PubMedBERT specialized medical models
- ⏳ Multi-hop reasoning and complex retrieval strategies
- ⏳ External integrations (UpToDate, OpenEvidence, PubMed live APIs)
- ⏳ Microservices extraction (when scaling requires)
Component Architecture
Monorepo Structure
VoiceAssist/
├── services/
│ └── api-gateway/ # Main FastAPI application
│ ├── app/
│ │ ├── main.py # Application entry point
│ │ ├── api/ # API routes (FastAPI routers)
│ │ │ ├── auth.py # Authentication endpoints
│ │ │ ├── users.py # User management
│ │ │ ├── realtime.py # WebSocket endpoint
│ │ │ ├── admin_kb.py # Admin KB management
│ │ │ ├── admin_panel.py # Admin dashboard
│ │ │ ├── integrations.py # Nextcloud integrations
│ │ │ └── metrics.py # Prometheus metrics
│ │ ├── services/ # Business logic layer
│ │ │ ├── rag_service.py # QueryOrchestrator (RAG pipeline)
│ │ │ ├── llm_client.py # LLM interface
│ │ │ ├── kb_indexer.py # Document ingestion
│ │ │ ├── search_aggregator.py # Semantic search
│ │ │ ├── cache_service.py # Multi-level caching
│ │ │ ├── audit_service.py # Audit logging
│ │ │ ├── caldav_service.py # Calendar integration
│ │ │ ├── nextcloud_file_indexer.py # File auto-indexing
│ │ │ ├── email_service.py # Email skeleton
│ │ │ └── token_revocation.py # Token blacklist
│ │ ├── models/ # SQLAlchemy ORM models
│ │ │ ├── user.py
│ │ │ ├── session.py
│ │ │ ├── message.py
│ │ │ └── audit_log.py
│ │ ├── core/ # Core infrastructure
│ │ │ ├── config.py # Settings (Pydantic)
│ │ │ ├── database.py # DB session management
│ │ │ ├── security.py # JWT, password hashing
│ │ │ ├── dependencies.py # FastAPI dependencies
│ │ │ ├── api_envelope.py # Standard response format
│ │ │ ├── metrics.py # Prometheus metrics definitions
│ │ │ ├── request_id.py # Request correlation
│ │ │ └── password_validator.py # Password strength
│ │ └── worker/ # Background job processing
│ │ ├── tasks.py # ARQ tasks (document indexing)
│ │ └── worker.py # ARQ worker entrypoint
│ ├── tests/
│ │ ├── unit/
│ │ ├── integration/
│ │ └── e2e/ # End-to-end tests (Phase 7)
│ ├── alembic/ # Database migrations
│ ├── requirements.txt
│ └── Dockerfile
├── infrastructure/
│ └── observability/
│ ├── prometheus/
│ │ ├── prometheus.yml
│ │ └── rules/
│ │ ├── slo_recording_rules.yml
│ │ └── slo_alerts.yml
│ └── grafana/
│ └── dashboards/
│ ├── health-monitoring.json
│ ├── slo-overview.json
│ └── security-audit.json
├── docs/ # Documentation
│ ├── UNIFIED_ARCHITECTURE.md # This document
│ ├── SERVICE_CATALOG.md
│ ├── DATA_MODEL.md
│ ├── operations/
│ │ └── SLO_DEFINITIONS.md
│ └── testing/
│ └── E2E_TESTING_GUIDE.md
├── docker-compose.yml # Service orchestration
├── .env # Environment configuration
└── PHASE_STATUS.md # Development status
Logical Service Boundaries
Even in monorepo, services maintain strict boundaries:
| Service | Module Location | Responsibility | Dependencies |
|---|---|---|---|
| Auth Service | app/api/auth.py + app/core/security.py | User registration, login, JWT tokens, RBAC | PostgreSQL, Redis (token revocation) |
| Realtime Service | app/api/realtime.py | WebSocket endpoint, streaming responses | QueryOrchestrator, LLMClient |
| Voice Pipeline Service | app/services/voice_pipeline_service.py | Thinker-Talker voice orchestration | ThinkerService, TalkerService |
| Thinker Service | app/services/thinker_service.py | LLM processing with tool/RAG support | LLMClient, RAGService |
| Talker Service | app/services/talker_service.py | TTS audio generation (ElevenLabs) | ElevenLabsService |
| RAG Service | app/services/rag_service.py | Query orchestration, RAG pipeline | SearchAggregator, LLMClient, Qdrant |
| KB Indexer | app/services/kb_indexer.py | Document ingestion, chunking, embedding | OpenAI API, Qdrant, PostgreSQL |
| Search Aggregator | app/services/search_aggregator.py | Semantic search, citation extraction | Qdrant, CacheService |
| Cache Service | app/services/cache_service.py | Multi-level caching (L1 + L2) | Redis |
| Admin Service | app/api/admin_kb.py + app/api/admin_panel.py | System management, dashboard | All services (monitoring) |
| Integration Service | app/api/integrations.py + app/services/caldav_service.py | Nextcloud integrations | Nextcloud APIs (CalDAV, WebDAV) |
| Audit Service | app/services/audit_service.py | Compliance logging, integrity verification | PostgreSQL |
| Worker Service | app/worker/ | Async background jobs | Redis (ARQ), KBIndexer |
Voice Architecture: The Thinker-Talker pipeline is the primary voice implementation. See Voice Mode Pipeline for details.
Service Communication Patterns
Synchronous (Direct Function Calls in Monorepo):
- API routes → Service layer
- Service → Service (internal imports)
- Service → Database (SQLAlchemy)
- Service → External APIs (HTTP clients)
Asynchronous (Background Jobs via ARQ):
- Document indexing jobs
- File auto-indexing from Nextcloud
- Future: Email sending, scheduled tasks
Future (Microservices - Phases 11-14):
- HTTP/REST between services
- gRPC for high-performance internal communication
- Message queue (RabbitMQ/Kafka) for async events
Data Architecture
Database Schema
PostgreSQL Tables (Alembic managed):
-- User Management CREATE TABLE users ( id UUID PRIMARY KEY, email VARCHAR(255) UNIQUE NOT NULL, hashed_password VARCHAR(255) NOT NULL, is_active BOOLEAN DEFAULT TRUE, is_admin BOOLEAN DEFAULT FALSE, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL ); -- Session Management CREATE TABLE sessions ( id UUID PRIMARY KEY, user_id UUID REFERENCES users(id), created_at TIMESTAMP NOT NULL, last_activity TIMESTAMP NOT NULL ); -- Conversation Messages CREATE TABLE messages ( id UUID PRIMARY KEY, session_id UUID REFERENCES sessions(id), user_id UUID REFERENCES users(id), role VARCHAR(50) NOT NULL, -- user, assistant, system content TEXT NOT NULL, created_at TIMESTAMP NOT NULL ); -- Audit Logs (HIPAA Compliance) CREATE TABLE audit_logs ( id UUID PRIMARY KEY, user_id UUID REFERENCES users(id), action VARCHAR(100) NOT NULL, resource_type VARCHAR(100), resource_id VARCHAR(255), ip_address VARCHAR(45), user_agent TEXT, request_id VARCHAR(255), service_name VARCHAR(100), endpoint VARCHAR(255), status_code INTEGER, success BOOLEAN NOT NULL, error_message TEXT, metadata JSONB, integrity_hash VARCHAR(64) NOT NULL, -- SHA-256 created_at TIMESTAMP NOT NULL ); CREATE INDEX idx_audit_logs_user_id ON audit_logs(user_id); CREATE INDEX idx_audit_logs_action ON audit_logs(action); CREATE INDEX idx_audit_logs_created_at ON audit_logs(created_at); CREATE INDEX idx_audit_logs_resource ON audit_logs(resource_type, resource_id);
Redis Database Organization
Redis Databases (0-15):
| DB | Purpose | TTL | Keys |
|---|---|---|---|
| 0 | General caching | Varies (15min-24h) | cache:*, user:* |
| 1 | ARQ job queue | N/A | arq:* |
| 2 | L2 cache (multi-level) | Varies | cache:l2:* |
| 3 | Token revocation | Token expiry | token:revoked:*, user:revoked:* |
| 15 | Test database | N/A | (cleared after tests) |
Qdrant Vector Database
Collection: medical_knowledge
{ "collection_name": "medical_knowledge", "vectors": { "size": 1536, # OpenAI text-embedding-3-small "distance": "Cosine" }, "payload_schema": { "document_id": "keyword", "chunk_index": "integer", "source_type": "keyword", # textbook, journal, guideline, note "title": "text", "content": "text", "metadata": "json" } }
Data Flow Architecture
Document Ingestion Flow:
File Upload → KBIndexer →
1. Text Extraction (PyPDF2/pdfplumber)
2. Chunking (500 chars, 50 overlap)
3. Embedding Generation (OpenAI API)
4. Vector Storage (Qdrant)
5. Metadata Storage (PostgreSQL - future)
6. Cache Invalidation
RAG Query Flow:
User Query → QueryOrchestrator →
1. Check L1 Cache (embedding)
2. Check L2 Cache (embedding)
3. Generate Embedding (OpenAI API)
4. Store in Cache (L2 + L1)
5. Vector Search (Qdrant)
6. Format Context
7. LLM Generation (OpenAI GPT-4)
8. Citation Extraction
9. Response Streaming (WebSocket)
Authentication Flow:
Login Request → Auth API →
1. Validate Credentials (bcrypt)
2. Generate JWT Tokens (access + refresh)
3. Store Session (PostgreSQL)
4. Audit Log (audit_logs table)
5. Return Tokens
Integration Architecture
Nextcloud Integration Pattern
Architecture Decision: Nextcloud is a separate deployment, VoiceAssist is a client.
Integration Points:
-
CalDAV (Calendar)
- Protocol: CalDAV (RFC 4791)
- Library:
caldavPython library - Operations: List calendars, create/update/delete events
- Location:
app/services/caldav_service.py
-
WebDAV (Files)
- Protocol: WebDAV (RFC 4918)
- Library:
webdavclient3 - Operations: Discover files, download for indexing
- Location:
app/services/nextcloud_file_indexer.py
-
OIDC (Authentication - Future)
- Protocol: OpenID Connect
- Flow: Authorization code flow
- Provider: Nextcloud OIDC app
- Status: Deferred to Phase 8+
Environment Configuration:
# Nextcloud Connection NEXTCLOUD_BASE_URL=http://localhost:8080 # or https://cloud.asimo.io NEXTCLOUD_ADMIN_USER=admin NEXTCLOUD_ADMIN_PASSWORD=secure_password # CalDAV NEXTCLOUD_CALDAV_URL=${NEXTCLOUD_BASE_URL}/remote.php/dav/calendars # WebDAV NEXTCLOUD_WEBDAV_URL=${NEXTCLOUD_BASE_URL}/remote.php/dav/files # OIDC (Future) NEXTCLOUD_OIDC_ISSUER=${NEXTCLOUD_BASE_URL}/apps/oidc NEXTCLOUD_CLIENT_ID=voiceassist NEXTCLOUD_CLIENT_SECRET=<from_nextcloud>
External API Integrations
OpenAI API:
- Embeddings:
text-embedding-3-small(1536 dimensions) - LLM:
gpt-4-turbo-preview(configurable) - Usage: Document embedding, RAG response generation
- Rate limiting: Handled by OpenAI client
Future Integrations (Phases 8+):
- PubMed E-utilities API (medical literature search)
- UpToDate API (evidence-based clinical references)
- OpenEvidence API (guideline summaries)
- Medical calculator libraries
Security Architecture
Authentication & Authorization
JWT Token Strategy:
- Access Token: 15-minute expiry, HS256 algorithm
- Refresh Token: 7-day expiry, HS256 algorithm
- Token Revocation: Redis-based blacklist (individual + all-user-tokens)
- Claims:
sub(user_id),email,role,exp,iat,type
Password Security:
- Hashing: bcrypt via passlib
- Validation: Multi-criteria (8+ chars, upper, lower, digit, special)
- Strength Scoring: 0-100 scale with Weak/Medium/Strong classification
- Common Password Rejection: Blocks password, 123456, qwerty, etc.
RBAC (Role-Based Access Control):
- Roles:
admin,user(more roles in future phases) - Admin Enforcement:
get_current_admin_userdependency - Protected Endpoints:
/api/admin/kb/*- Knowledge base management/api/admin/panel/*- System dashboard/api/integrations/*- Nextcloud integrations
Audit Logging
Compliance Features:
- Immutable Trail: SHA-256 integrity hash on each log entry
- Comprehensive Metadata: User, action, resource, timestamp, IP, user agent
- Request Correlation: Request ID for distributed tracing
- Tamper Detection: Integrity verification queries
- HIPAA Alignment: Meets audit trail requirements
Logged Events:
- User registration, login, logout
- Token refresh, token revocation
- Password changes, failed authentication
- Admin operations (KB management, system config)
- Document access and modifications
Data Protection
Encryption:
- In Transit: HTTPS/TLS 1.2+ (production)
- At Rest: Database-level encryption (future: PostgreSQL transparent encryption)
- Tokens: JWT with signed claims
- Passwords: bcrypt hashing (cost factor: 12)
PHI Protection (Future):
- PHI detection service (Phase 8+)
- Automatic log redaction
- Local vs cloud AI routing based on PHI presence
- Separate encryption keys for PHI data
Network Security
Docker Compose Network Isolation:
networks: voiceassist_network: driver: bridge internal: false # API gateway needs external access voiceassist_internal: driver: bridge internal: true # Database layer isolated
Future (Kubernetes - Phases 11-14):
- Network policies for pod-to-pod restrictions
- Service mesh (Linkerd) for mTLS
- Ingress controller with WAF (Web Application Firewall)
Deployment Architecture
Development Environment (Docker Compose)
Current Stack:
# docker-compose.yml services: # Application Services voiceassist-server: build: ./services/api-gateway ports: ["8000:8000"] depends_on: [postgres, redis, qdrant] voiceassist-worker: build: ./services/api-gateway command: ["python", "-m", "app.worker.worker"] depends_on: [redis] # Data Layer postgres: image: pgvector/pgvector:pg16 ports: ["5432:5432"] volumes: [postgres_data:/var/lib/postgresql/data] redis: image: redis:7-alpine ports: ["6379:6379"] volumes: [redis_data:/data] qdrant: image: qdrant/qdrant:latest ports: ["6333:6333"] volumes: [qdrant_data:/qdrant/storage] # Observability (Phase 7+) prometheus: image: prom/prometheus:latest ports: ["9090:9090"] volumes: - ./infrastructure/observability/prometheus:/etc/prometheus grafana: image: grafana/grafana:latest ports: ["3000:3000"] volumes: - ./infrastructure/observability/grafana:/etc/grafana
Resource Allocation:
- PostgreSQL: 2 CPU, 4GB RAM
- Redis: 1 CPU, 1GB RAM
- Qdrant: 2 CPU, 4GB RAM
- API Gateway: 2 CPU, 4GB RAM
- Worker: 1 CPU, 2GB RAM
Production Deployment (Future - Kubernetes)
Planned Architecture (Phases 11-14):
Kubernetes Cluster
├── Ingress (voiceassist.asimo.io)
│ └── SSL Termination (Let's Encrypt)
├── Service Mesh (Linkerd)
│ └── mTLS between all services
├── Microservices (2-10 replicas each)
│ ├── API Gateway (Kong/Nginx)
│ ├── Auth Service
│ ├── Realtime Service
│ ├── RAG Service
│ ├── Admin Service
│ └── Integration Service
├── Data Layer
│ ├── PostgreSQL (Primary + 2 Read Replicas)
│ ├── Redis Cluster (3 masters, 3 replicas)
│ └── Qdrant (3 replicas)
└── Observability
├── Prometheus (HA pair)
├── Grafana
├── Jaeger (distributed tracing)
└── Loki (log aggregation)
Observability Architecture
Metrics Collection (Prometheus)
Instrumentation:
- HTTP Metrics: Request count, latency (p50, p95, p99), status codes
- Cache Metrics: Hit/miss rates by layer (L1, L2), size, evictions
- RAG Metrics: Query latency, embedding generation time, search results
- RBAC Metrics: Protected endpoint access, admin operations
- Database Metrics: Connection pool utilization, query latency
- External API Metrics: OpenAI call latency, rate limits
Metrics Endpoint:
- Location:
GET /metrics - Format: Prometheus exposition format
- Protection: Optional authentication (configurable)
Service Level Objectives (SLOs)
Defined SLOs (Phase 7):
| SLO | Target | Error Budget | Measurement Window |
|---|---|---|---|
| API Availability | 99.9% | 43.2 min/month | 30 days |
| API Latency (P95) | < 500ms | - | 5 minutes |
| RAG Query Success | 99% | 1% failures | 24 hours |
| Cache Hit Rate | > 40% | - | 1 hour |
| Database P95 Latency | < 100ms | - | 5 minutes |
Prometheus Recording Rules:
# API Availability (30-day) - record: slo:api_availability:ratio_rate30d expr: | sum(rate(voiceassist_http_requests_total{status_code=~"2..|3.."}[30d])) / sum(rate(voiceassist_http_requests_total[30d])) # Error Budget Remaining - record: slo:error_budget_remaining:percent expr: | 100 * (1 - ((1 - slo:api_availability:ratio_rate30d) / 0.001))
Alerting:
- Multi-window, multi-burn-rate approach (Google SRE guidelines)
- Critical alerts: SLO violations (< 99.9% availability)
- Warning alerts: Error budget burn rate > 14.4x
- Info alerts: Informational notifications
Logging Strategy
Structured Logging:
logger.info("user_login_success", extra={ "user_id": user.id, "email": user.email, "ip_address": request.client.host, "request_id": request.state.request_id, "timestamp": datetime.utcnow().isoformat() })
Log Levels:
- DEBUG: Development only (not in production)
- INFO: Normal operations, audit events
- WARNING: Potential issues, deprecated API usage
- ERROR: Errors requiring attention
- CRITICAL: Service failures
Log Aggregation (Future - Loki):
- Centralized log storage
- Full-text search
- Log correlation by request ID
- PHI redaction applied automatically
Dashboards (Grafana)
Implemented Dashboards (Phase 7):
-
Health Monitoring Dashboard (
health-monitoring.json)- System overview (CPU, memory, disk)
- Service health status
- Database connection pool
- Redis memory usage
- Qdrant storage
-
SLO Overview Dashboard (
slo-overview.json)- API availability (30d)
- Error budget remaining
- Error budget burn rate
- API latency (P50, P95, P99)
- Cache hit rates
-
Security Audit Dashboard (
security-audit.json)- Recent authentication events
- Failed login attempts
- Token revocations
- Admin operations
- Audit log integrity status
Data Flow Examples
Example 1: User Registration and Login
1. User Registration
├─> POST /api/auth/register {email, password}
├─> Password Validator: Check strength
├─> User Model: Create with bcrypt hash
├─> PostgreSQL: Insert into users table
├─> Audit Service: Log registration event
└─> Response: {user_id, email}
2. User Login
├─> POST /api/auth/login {email, password}
├─> User Model: Query by email
├─> Security Service: Verify password (bcrypt)
├─> Token Service: Generate JWT tokens (access + refresh)
├─> Session Model: Create session record
├─> Audit Service: Log login event
└─> Response: {access_token, refresh_token, user}
3. Authenticated Request
├─> GET /api/auth/me
├─> Header: Authorization: Bearer <access_token>
├─> Dependency: get_current_user
├─> Token Service: Decode and validate JWT
├─> Token Revocation: Check Redis blacklist
├─> User Model: Query user details
└─> Response: {user}
Example 2: RAG Query with Caching
1. User Query via WebSocket
├─> WS /api/realtime/ws
├─> Client: {"type": "message", "content": "What is diabetic ketoacidosis?"}
├─> Realtime Service: Parse and validate
└─> Forward to QueryOrchestrator
2. RAG Pipeline
├─> QueryOrchestrator: handle_query()
├─> SearchAggregator: generate_query_embedding()
│ ├─> CacheService: Check L1 cache (LRU)
│ ├─> CacheService: Check L2 cache (Redis)
│ ├─> Cache Miss → OpenAI API: Create embedding
│ └─> CacheService: Store in L2 + L1 (24h TTL)
├─> SearchAggregator: search() in Qdrant
│ ├─> Qdrant: Cosine similarity search (top_k=5)
│ └─> Return: List[SearchResult]
├─> SearchAggregator: format_context_for_rag()
├─> LLMClient: generate() with context
│ └─> OpenAI API: GPT-4 generation
└─> SearchAggregator: extract_citations()
3. Streaming Response
├─> Realtime Service: Stream response chunks
│ ├─> Send: {"type": "message_start", "message_id": "..."}
│ ├─> Send: {"type": "message_chunk", "content": "Diabetic..."}
│ ├─> Send: {"type": "message_chunk", "content": " ketoacidosis..."}
│ └─> Send: {"type": "message_complete", "citations": [...]}
└─> Client: Receives streaming response
Example 3: Document Upload and Indexing
1. Admin Upload
├─> POST /api/admin/kb/documents
├─> Dependency: get_current_admin_user (RBAC check)
├─> File: multipart/form-data (PDF or TXT)
└─> Forward to KBIndexer
2. Document Processing
├─> KBIndexer: index_pdf_document() or index_document()
├─> Text Extraction: PyPDF2 or pdfplumber
├─> Chunking: 500 chars, 50 overlap
├─> For each chunk:
│ ├─> OpenAI API: Create embedding (1536 dims)
│ ├─> Qdrant: Store vector with metadata
│ │ └─> Payload: {document_id, chunk_index, title, content, source_type}
│ └─> Metrics: Track chunks_indexed
└─> Return: IndexingResult {document_id, chunks_indexed, success}
3. Response to Admin
├─> Success Envelope: {success: true, data: {...}}
├─> Cache Invalidation: Clear L1 + L2 caches
├─> Audit Log: Document upload event
└─> Prometheus Metrics: Increment kb_documents_indexed_total
Example 4: Calendar Event Creation via Nextcloud
1. Create Event Request
├─> POST /api/integrations/calendar/events
├─> Dependency: get_current_user (authentication)
├─> Body: {summary, start, end, description, location}
└─> Forward to CalDAVService
2. CalDAV Integration
├─> CalDAVService: create_event()
├─> Connect to Nextcloud CalDAV
│ └─> URL: {NEXTCLOUD_BASE_URL}/remote.php/dav/calendars/{user}/default
├─> Create iCalendar event (vobject)
│ └─> VEVENT with SUMMARY, DTSTART, DTEND, DESCRIPTION, LOCATION
├─> Save to Nextcloud calendar
└─> Return: Event UID
3. Response
├─> Success Envelope: {success: true, data: {event_uid: "..."}}
├─> Future: Send notification to user
└─> Audit Log: Calendar event created
Technology Stack
Backend
| Component | Technology | Version | Purpose |
|---|---|---|---|
| Language | Python | 3.11+ | Primary backend language |
| Framework | FastAPI | 0.104+ | Async web framework |
| ORM | SQLAlchemy | 2.0+ | Database ORM |
| Migrations | Alembic | 1.12+ | Database schema versioning |
| Validation | Pydantic | 2.4+ | Data validation and settings |
| Authentication | python-jose | 3.3+ | JWT token handling |
| Password Hashing | passlib | 1.7+ | bcrypt hashing |
| HTTP Client | httpx | 0.25+ | Async HTTP requests |
| Job Queue | ARQ | 0.25+ | Async background jobs |
Databases & Storage
| Component | Technology | Version | Purpose |
|---|---|---|---|
| RDBMS | PostgreSQL | 16 | Primary relational database |
| Vector Extension | pgvector | 0.5+ | Vector storage in PostgreSQL |
| Cache/Queue | Redis | 7+ | Caching, sessions, job queue |
| Vector DB | Qdrant | 1.7+ | Semantic search |
AI & ML
| Component | Technology | Purpose |
|---|---|---|
| Embeddings | OpenAI text-embedding-3-small | 1536-dim embeddings |
| LLM | OpenAI GPT-4 Turbo | Response generation |
| Future | BioGPT, PubMedBERT | Medical-specific models |
Integrations
| Component | Technology | Purpose |
|---|---|---|
| Calendar | caldav (Python library) | CalDAV protocol support |
| Files | webdavclient3 | WebDAV protocol support |
| imaplib, smtplib | IMAP/SMTP (future) | |
| PDF Processing | PyPDF2, pdfplumber | Text extraction |
Observability
| Component | Technology | Version | Purpose |
|---|---|---|---|
| Metrics | Prometheus | 2.47+ | Metrics collection |
| Metrics Client | prometheus-client | 0.19+ | Python instrumentation |
| Dashboards | Grafana | 10.2+ | Visualization |
| Future: Tracing | Jaeger | - | Distributed tracing |
| Future: Logging | Loki | - | Log aggregation |
Infrastructure
| Component | Technology | Version | Purpose |
|---|---|---|---|
| Containerization | Docker | 24+ | Container runtime |
| Orchestration | Docker Compose | 2.23+ | Multi-container orchestration |
| Future: K8s | Kubernetes | 1.28+ | Production orchestration |
| Future: Service Mesh | Linkerd | 2.14+ | mTLS, observability |
Architecture Evolution
Phase-by-Phase Evolution
Phase 0-1: Foundation
- Docker Compose setup
- PostgreSQL, Redis, Qdrant
- Health endpoints
- Database migrations
Phase 2-3: Security & Core Services
- JWT authentication
- Password validation and hashing
- Token revocation
- Nextcloud integration skeleton
- API Gateway solidified
- Core endpoint structure
Phase 4: Realtime Communication
- WebSocket endpoint
- QueryOrchestrator integration
- Message streaming protocol
- Ping/pong keepalive
Phase 5: Medical AI
- Document ingestion (PDF, TXT)
- OpenAI embeddings
- Qdrant vector storage
- RAG pipeline
- Semantic search
- Citation tracking
Phase 6: Nextcloud Integration
- CalDAV calendar operations
- WebDAV file discovery
- Automatic file indexing
- Email service skeleton
Phase 7: Admin & RBAC
- Role-based access control
- Admin-only endpoints
- Admin dashboard API
- Smoke tests for RBAC
Future Phases (8-14):
- OIDC authentication
- Complete email integration
- Frontend apps (Web Client, Admin Panel UI)
- Voice processing (Thinker-Talker pipeline; legacy Realtime API fallback)
- Specialized medical models
- Microservices extraction (if needed)
- Kubernetes deployment
- Service mesh (Linkerd)
- Advanced observability (Jaeger, Loki)
Migration to Microservices (When Needed)
Trigger Conditions:
-
50 concurrent users
- Team size > 5 developers
- Independent scaling requirements
- Different deployment cycles
- Regulatory requirements
Extraction Strategy:
-
Phase 11: Prepare
- Ensure clean module boundaries
- Extract shared code to library
- Define API contracts
- Independent service tests
-
Phase 12: Extract Services
- Start with independent services (Search, PHI Detection)
- Extract core services (Auth, RAG, Admin)
- Extract shared services last (Integrations)
-
Phase 13: Deploy to Kubernetes
- Create Dockerfiles per service
- Create K8s manifests (Deployments, Services, ConfigMaps, Secrets)
- Set up service mesh (Linkerd)
- Deploy to dev cluster, then production
Design Decisions and Trade-offs
1. Monorepo vs Microservices (Phases 0-10)
Decision: Start with monorepo, maintain logical service boundaries
Rationale:
- Faster development iteration
- Simpler debugging (single codebase)
- Lower operational complexity
- Easier testing (no distributed systems challenges)
- Suitable for < 50 concurrent users
Trade-offs:
- Pros: Speed, simplicity, shared dependencies
- Cons: Single deployment unit, harder to scale independently
- Mitigation: Clear module boundaries enable future extraction
2. JWT vs Session-Based Authentication
Decision: JWT with short-lived access tokens + refresh tokens
Rationale:
- Stateless authentication (scales horizontally)
- No server-side session storage required
- Works well with SPAs and mobile apps
- Industry standard for API authentication
Trade-offs:
- Pros: Scalable, stateless, widely supported
- Cons: Cannot revoke tokens without additional infrastructure
- Mitigation: Redis-based token revocation blacklist
3. Multi-Level Caching (L1 + L2)
Decision: In-memory LRU cache (L1) + Redis distributed cache (L2)
Rationale:
- L1 provides ultra-low latency for hot data
- L2 provides distributed caching across instances
- Automatic promotion from L2 to L1 on cache hits
Trade-offs:
- Pros: Fast, distributed, high hit rate
- Cons: More complex invalidation, cache consistency
- Mitigation: TTLs on all cached data, explicit invalidation APIs
4. OpenAI Embeddings vs Self-Hosted Models
Decision: Use OpenAI text-embedding-3-small for MVP
Rationale:
- High quality embeddings (1536 dimensions)
- No infrastructure overhead
- Fast API responses
- Easy integration
Trade-offs:
- Pros: Quality, speed, simplicity
- Cons: External dependency, cost per API call, data privacy
- Mitigation: Future migration to BioGPT/PubMedBERT for medical-specific embeddings
5. ARQ vs Celery for Background Jobs
Decision: ARQ (Async Redis Queue)
Rationale:
- Simpler than Celery (no separate broker required)
- Native async/await support
- Lightweight, fast
- Redis-backed (already using Redis)
Trade-offs:
- Pros: Simple, async-native, fast
- Cons: Less mature than Celery, fewer features
- Mitigation: Sufficient for current needs, can migrate to Celery if needed
6. Docker Compose vs Kubernetes (Phases 0-10)
Decision: Docker Compose for development and initial production
Rationale:
- Simple local development
- Easy to understand and debug
- Suitable for single-server deployment
- Lower operational complexity
Trade-offs:
- Pros: Simplicity, speed, low overhead
- Cons: Limited scaling, no auto-healing, single point of failure
- Mitigation: Migrate to Kubernetes when scaling requirements justify complexity
7. Nextcloud Separation vs Integrated Deployment
Decision: Nextcloud as separate stack, VoiceAssist as client
Rationale:
- Nextcloud is complex, mature, independently managed
- Allows using existing Nextcloud installations
- Clear separation of concerns
- Independent update cycles
Trade-offs:
- Pros: Flexibility, clear boundaries, reuse existing infrastructure
- Cons: More complex configuration, network dependency
- Mitigation: Well-defined API contracts, robust error handling
8. Synchronous vs Asynchronous Service Communication
Decision: Synchronous (direct function calls) in monorepo, async (message queue) for long-running jobs
Rationale:
- Synchronous is simpler and faster for request-response patterns
- Async is better for fire-and-forget and long-running tasks
- Most operations in VoiceAssist are request-response
Trade-offs:
- Pros: Simple, fast, easy to debug
- Cons: Tighter coupling, harder to scale independently
- Mitigation: Clear service boundaries enable future async migration
Related Documentation
Core Architecture:
- SERVICE_CATALOG.md - Detailed service descriptions
- BACKEND_ARCHITECTURE.md - Backend structure evolution
- ARCHITECTURE_V2.md - Original V2 architecture (reference)
- DATA_MODEL.md - Canonical data entities
Design Documents:
- ORCHESTRATION_DESIGN.md - RAG orchestrator design
- SEMANTIC_SEARCH_DESIGN.md - Search implementation
- NEXTCLOUD_INTEGRATION.md - Integration architecture
Operations:
- docs/operations/SLO_DEFINITIONS.md - Service level objectives
- docs/testing/E2E_TESTING_GUIDE.md - Testing strategy
- OBSERVABILITY.md - Monitoring and logging
Development:
- Implementation Status - Component status
- DEVELOPMENT_PHASES_V2.md - Phase-by-phase plan
- LOCAL_DEVELOPMENT.md - Local setup guide
- Archive: CURRENT_PHASE - Historical phase info
Security & Compliance:
- SECURITY_COMPLIANCE.md - HIPAA compliance details
- INTEGRATION_IMPROVEMENTS_PHASE_0-8.md - Integration roadmap
Document Version: 1.0 Last Updated: 2025-11-20 Maintained By: VoiceAssist Development Team Review Cycle: Updated after each major phase completion
VoiceAssist V2 - Backend Architecture
Last Updated: 2025-11-27 (All 15 Phases Complete) Status: Canonical Reference Purpose: Clarify backend structure evolution from monorepo to microservices
Overview
VoiceAssist V2 backend follows a progressive architecture strategy:
- Phases 0-10: Monorepo structure with clear module boundaries (Docker Compose)
- Phases 11-14: Optional split into microservices (Kubernetes)
This document explains both approaches and when to use each.
Table of Contents
- Development Evolution
- Monorepo Structure (Phases 0-10)
- Microservices Structure (Phases 11-14)
- When to Split
- Service Boundaries
- Migration Path
Repository Layout for Backend
IMPORTANT: The canonical backend is services/api-gateway/. The server/ directory is a deprecated legacy stub and should NOT be used.
The production backend code lives in:
-
services/api-gateway/app/– The production API Gateway (FastAPI)app/api/– 20+ API modules (auth, conversations, admin, voice, etc.)app/core/– Configuration, security, database, loggingapp/models/– SQLAlchemy ORM modelsapp/schemas/– Pydantic request/response schemasapp/services/– 40+ business logic servicesapp/middleware/– Request middleware (rate limiting)
-
server/– DEPRECATED - Legacy stub kept only for historical reference. Do not use for new development.
All new backend development should occur in services/api-gateway/.
Development Evolution
Phase-Based Approach
Phases 0-10: Monorepo + Docker Compose
├─ Single FastAPI application
├─ Clear module boundaries
├─ Faster development iteration
└─ Production-ready for < 50 concurrent users
Phases 11-14: Microservices + Kubernetes (Optional)
├─ Extract modules to separate services
├─ Independent scaling
├─ Suitable for > 50 concurrent users
└─ K8s orchestration
Why Start with Monorepo?
Advantages:
- Faster Development: Single codebase, shared models, easier refactoring
- Simpler Debugging: All code in one place, unified logging
- Lower Complexity: No distributed tracing, service mesh, or K8s initially
- Easier Testing: Integration tests within single app
- Shared Dependencies: Common libraries, models, utilities
When It's Sufficient:
- Development and testing phases
- Deployment to single server
- < 50 concurrent users
- Team size < 5 developers
Production Structure (All 15 Phases Complete)
Directory Layout
services/api-gateway/
├── app/
│ ├── main.py # FastAPI application entry point
│ ├── api/ # API routes (20+ modules)
│ │ ├── __init__.py
│ │ ├── auth.py # Authentication endpoints
│ │ ├── users.py # User management
│ │ ├── conversations.py # Chat/conversation management
│ │ ├── admin_panel.py # Admin dashboard
│ │ ├── admin_kb.py # Knowledge base admin
│ │ ├── admin_cache.py # Cache management
│ │ ├── admin_feature_flags.py # Feature flags
│ │ ├── voice.py # Voice endpoints
│ │ ├── realtime.py # WebSocket handling
│ │ ├── medical_ai.py # Medical AI endpoints
│ │ ├── health.py # Health checks
│ │ └── ... # Additional modules
│ │
│ ├── services/ # Business logic (40+ services)
│ │ ├── __init__.py
│ │ ├── rag_service.py # RAG pipeline orchestration
│ │ ├── phi_detector.py # PHI detection logic
│ │ ├── voice_service.py # Voice transcription/TTS
│ │ ├── kb_indexer.py # Knowledge base indexing
│ │ ├── ai_router.py # Local vs cloud AI routing
│ │ ├── search_service.py # Vector search
│ │ ├── external_apis/ # External API integrations
│ │ │ ├── uptodate.py
│ │ │ ├── pubmed.py
│ │ │ └── nextcloud.py
│ │ └── audit_logger.py # Audit logging service
│ │
│ ├── models/ # SQLAlchemy ORM models
│ │ ├── __init__.py
│ │ ├── base.py # Base model class
│ │ ├── user.py # User model
│ │ ├── session.py # Session/Conversation model
│ │ ├── message.py # ChatMessage model
│ │ ├── document.py # KnowledgeDocument model
│ │ ├── chunk.py # KBChunk model
│ │ ├── settings.py # UserSettings, SystemSettings models
│ │ └── audit.py # AuditLogEntry model
│ │
│ ├── schemas/ # Pydantic schemas (from DATA_MODEL.md)
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── session.py
│ │ ├── message.py
│ │ ├── document.py
│ │ ├── citation.py
│ │ └── settings.py
│ │
│ ├── core/ # Core configuration and utilities
│ │ ├── __init__.py
│ │ ├── config.py # Settings (Pydantic Settings)
│ │ ├── database.py # Database session management
│ │ ├── vector_db.py # Qdrant client
│ │ ├── redis_client.py # Redis client
│ │ ├── security.py # JWT, password hashing
│ │ ├── dependencies.py # FastAPI dependencies
│ │ └── middleware.py # Custom middleware
│ │
│ ├── utils/ # Utility functions
│ │ ├── __init__.py
│ │ ├── chunking.py # Text chunking utilities
│ │ ├── pdf_parser.py # PDF parsing
│ │ ├── embeddings.py # Embedding generation
│ │ └── validators.py # Custom validators
│ │
│ └── tasks/ # Background tasks (Celery)
│ ├── __init__.py
│ ├── indexing.py # Document indexing tasks
│ └── cleanup.py # Maintenance tasks
│
├── tests/ # Test suite
│ ├── unit/ # Unit tests
│ ├── integration/ # Integration tests
│ └── e2e/ # End-to-end tests
│
├── alembic/ # Database migrations
│ ├── versions/
│ └── env.py
│
├── requirements.txt # Python dependencies
├── Dockerfile # Docker image definition
├── docker-compose.yml # Local development setup
├── .env.example # Environment variables template
└── README.md # Backend documentation
FastAPI Application Structure
app/main.py:
from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.core.config import settings from app.core.middleware import setup_middleware from app.api import auth, chat, search, admin, voice, documents, users # Create FastAPI app app = FastAPI( title=settings.PROJECT_NAME, version=settings.VERSION, openapi_url=f"{settings.API_V1_STR}/openapi.json" ) # Setup middleware setup_middleware(app) # Include routers app.include_router(auth.router, prefix=f"{settings.API_V1_STR}/auth", tags=["auth"]) app.include_router(chat.router, prefix=f"{settings.API_V1_STR}/chat", tags=["chat"]) app.include_router(search.router, prefix=f"{settings.API_V1_STR}/search", tags=["search"]) app.include_router(admin.router, prefix=f"{settings.API_V1_STR}/admin", tags=["admin"]) app.include_router(voice.router, prefix=f"{settings.API_V1_STR}/voice", tags=["voice"]) app.include_router(documents.router, prefix=f"{settings.API_V1_STR}/documents", tags=["documents"]) app.include_router(users.router, prefix=f"{settings.API_V1_STR}/users", tags=["users"]) @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy"}
Service Layer Pattern
Each "service" is a Python module with clear responsibilities:
app/services/rag_service.py:
from typing import List, Dict from app.services.search_service import SearchService from app.services.ai_router import AIRouter from app.services.phi_detector import PHIDetector from app.schemas.message import ChatMessage from app.schemas.citation import Citation class RAGService: """Orchestrates RAG pipeline""" def __init__(self): self.search = SearchService() self.ai_router = AIRouter() self.phi_detector = PHIDetector() async def process_query( self, query: str, session_id: str, clinical_context: Optional[Dict] = None ) -> Dict: """ Process user query through RAG pipeline: 1. Detect PHI 2. Search knowledge base 3. Route to appropriate AI model 4. Generate response with citations """ # 1. PHI Detection phi_result = await self.phi_detector.detect(query) # 2. Search KB search_results = await self.search.search( query=query, filters={"specialty": clinical_context.get("specialty")} ) # 3. Route to AI model model = self.ai_router.select_model(phi_detected=phi_result.has_phi) # 4. Generate response response = await model.generate( query=query, context=search_results, clinical_context=clinical_context ) return { "content": response.text, "citations": response.citations, "model_used": model.name, "phi_detected": phi_result.has_phi }
Module Boundaries
Even in monorepo, maintain strict boundaries:
| Module | Responsibility | Can Import From | Cannot Import From |
|---|---|---|---|
api/ | HTTP endpoints, request/response | services/, schemas/, core/ | models/ directly |
services/ | Business logic | models/, schemas/, core/, other services/ | api/ |
models/ | Database ORM | core/ | api/, services/ |
schemas/ | Pydantic models | Nothing (pure data) | Everything |
core/ | Config, database, security | Nothing (foundational) | api/, services/, models/ |
Docker Compose Setup
docker-compose.yml:
version: "3.8" services: # Backend API (monorepo) backend: build: ./server ports: - "8000:8000" environment: - DATABASE_URL=postgresql://user:pass@postgres:5432/voiceassist - REDIS_URL=redis://redis:6379 - QDRANT_URL=http://qdrant:6333 depends_on: - postgres - redis - qdrant volumes: - ./server:/app - ./data/uploads:/app/data/uploads # PostgreSQL postgres: image: postgres:15 environment: - POSTGRES_USER=voiceassist - POSTGRES_PASSWORD=password - POSTGRES_DB=voiceassist volumes: - postgres_data:/var/lib/postgresql/data # Redis redis: image: redis:7 volumes: - redis_data:/data # Qdrant Vector DB qdrant: image: qdrant/qdrant ports: - "6333:6333" volumes: - qdrant_data:/qdrant/storage # Nextcloud (Phase 2+) nextcloud: image: nextcloud:29-apache ports: - "8080:80" environment: - POSTGRES_HOST=nextcloud-db - NEXTCLOUD_ADMIN_USER=${NEXTCLOUD_ADMIN_USER} - NEXTCLOUD_ADMIN_PASSWORD=${NEXTCLOUD_ADMIN_PASSWORD} depends_on: - nextcloud-db volumes: - nextcloud_data:/var/www/html # Nextcloud Database (Phase 2+) nextcloud-db: image: postgres:16-alpine environment: - POSTGRES_DB=nextcloud - POSTGRES_USER=nextcloud - POSTGRES_PASSWORD=${NEXTCLOUD_DB_PASSWORD} volumes: - nextcloud_db_data:/var/lib/postgresql/data volumes: postgres_data: redis_data: qdrant_data: nextcloud_data: nextcloud_db_data:
Microservices Structure (Phases 11-14)
When to Split
Trigger Conditions:
- Deployment to Kubernetes cluster
- Need for independent scaling (e.g., voice service needs more resources)
- Team growth (> 5 developers, need ownership boundaries)
- Different deployment cycles (e.g., ML model updates vs API changes)
- Regulatory requirements (e.g., PHI handling in separate service)
Service Decomposition
Extract modules from monorepo into separate services:
services/
├── api-gateway/ # Kong or Nginx (routing, rate limiting)
│ ├── kong.yml
│ └── Dockerfile
│
├── auth-service/ # Authentication (from app/api/auth.py + app/services/auth)
│ ├── app/
│ │ ├── main.py
│ │ ├── api/
│ │ └── services/
│ ├── Dockerfile
│ └── requirements.txt
│
├── chat-service/ # Chat/conversations (from app/api/chat.py + app/services/rag_service.py)
│ ├── app/
│ │ ├── main.py
│ │ ├── api/
│ │ └── services/
│ ├── Dockerfile
│ └── requirements.txt
│
├── knowledge-base-service/ # KB management (from app/api/documents.py + app/services/kb_indexer.py)
│ ├── app/
│ │ ├── main.py
│ │ ├── api/
│ │ └── services/
│ ├── Dockerfile
│ └── requirements.txt
│
├── voice-service/ # Voice/WebSocket (from app/api/voice.py + app/services/voice_service.py)
│ ├── app/
│ │ ├── main.py
│ │ ├── api/
│ │ └── services/
│ ├── Dockerfile
│ └── requirements.txt
│
├── search-service/ # Vector search (from app/services/search_service.py)
│ ├── app/
│ │ ├── main.py
│ │ ├── api/
│ │ └── services/
│ ├── Dockerfile
│ └── requirements.txt
│
├── admin-service/ # Admin panel API (from app/api/admin.py)
│ ├── app/
│ │ ├── main.py
│ │ ├── api/
│ │ └── services/
│ ├── Dockerfile
│ └── requirements.txt
│
└── shared/ # Shared libraries
├── models/ # Shared SQLAlchemy models
├── schemas/ # Shared Pydantic schemas (from DATA_MODEL.md)
└── utils/ # Shared utilities
Service Communication
Synchronous (HTTP/REST):
- API Gateway → Services: REST API calls
- Service → Service: HTTP with service discovery (K8s DNS)
Asynchronous (Message Queue):
- Document indexing: Publish to RabbitMQ/Redis queue
- Audit logging: Async events to audit service
Shared Data:
- PostgreSQL: Shared database (schema per service if needed)
- Redis: Shared cache
- Qdrant: Shared vector DB
Kubernetes Deployment
Example: Chat Service
k8s/chat-service.yaml:
apiVersion: apps/v1 kind: Deployment metadata: name: chat-service spec: replicas: 3 selector: matchLabels: app: chat-service template: metadata: labels: app: chat-service spec: containers: - name: chat-service image: voiceassist/chat-service:latest ports: - containerPort: 8000 env: - name: DATABASE_URL valueFrom: secretKeyRef: name: db-secret key: url - name: REDIS_URL value: redis://redis-service:6379 resources: requests: memory: "512Mi" cpu: "500m" limits: memory: "1Gi" cpu: "1000m" --- apiVersion: v1 kind: Service metadata: name: chat-service spec: selector: app: chat-service ports: - port: 80 targetPort: 8000 type: ClusterIP
When to Split
Decision Matrix
| Factor | Monorepo | Microservices |
|---|---|---|
| Team Size | < 5 developers | > 5 developers |
| Concurrent Users | < 50 users | > 50 users |
| Deployment | Single server | Multi-node K8s cluster |
| Scaling Needs | Vertical scaling OK | Need horizontal scaling |
| Development Speed | Faster (single codebase) | Slower (coordination overhead) |
| Operational Complexity | Low (Docker Compose) | High (K8s, service mesh) |
| Cost | Lower (single server) | Higher (multiple servers) |
| Regulatory | OK for small clinics | Required for large hospitals |
Recommended Path
- Phases 0-10: Start with monorepo + Docker Compose
- Phase 10 End: Evaluate scaling needs
- If < 50 users: Stay with monorepo, deploy to single Ubuntu server
- If > 50 users: Proceed to Phases 11-14, split into microservices + K8s
Service Boundaries
Logical Services (Monorepo Modules)
These are the logical boundaries, whether in monorepo or microservices:
-
Authentication Service (
app/api/auth.py+app/core/security.py)- User registration with email validation
- User login/logout with JWT tokens
- JWT token management:
- Access tokens (15-minute expiry, HS256 algorithm)
- Refresh tokens (7-day expiry)
- Token verification and validation
- Token revocation via Redis (
app/services/token_revocation.py):- Dual-level revocation (individual tokens + all user tokens)
- Fail-open design for Redis unavailability
- Automatic TTL management
- Immediate session invalidation on logout
- Password hashing using bcrypt (via passlib)
- Advanced password validation (
app/core/password_validator.py):- Multi-criteria validation (uppercase, lowercase, digits, special chars)
- Password strength scoring (0-100)
- Common password rejection
- Sequential and repeated character detection
- Rate limiting on authentication endpoints:
- Registration: 5 requests/hour per IP
- Login: 10 requests/minute per IP
- Token refresh: 20 requests/minute per IP
- Authentication middleware (
get_current_user,get_current_admin_user) - Protected endpoints with JWT dependency injection
- Comprehensive audit logging for all authentication events (see Audit Service below)
-
Chat Service (
app/api/chat.py+app/services/rag_service.py)- Conversation management
- Message processing
- RAG pipeline orchestration
- Response generation
-
Knowledge Base Service (
app/api/documents.py+app/services/kb_indexer.py)- Document upload
- Document processing
- Indexing jobs
- KB management
-
Search Service (
app/services/search_service.py)- Vector search
- Semantic search
- Hybrid search (vector + keyword)
- Result reranking
-
Voice Service (
app/api/voice.py+app/services/voice_service.py)- WebSocket connections
- Audio transcription
- Text-to-speech
- Voice mode management
-
Admin Service (
app/api/admin.py)- User management
- System settings
- Analytics dashboard
- Audit log access
-
PHI Detection Service (
app/services/phi_detector.py)- PHI detection
- AI model routing
- Local vs cloud decision
-
External APIs Service (
app/services/external_apis/)- Nextcloud Integration (
app/services/nextcloud.py):- OCS API client for user provisioning
- User creation and management via REST API
- Health check for Nextcloud connectivity
- Authentication with admin credentials
- WebDAV integration (future phase)
- PubMed integration (future phase)
- UpToDate integration (future phase)
- External search aggregation (future phase)
- Nextcloud Integration (
-
Audit Service (
app/services/audit_service.py+app/models/audit_log.py)- HIPAA-compliant audit logging:
- Immutable audit trail with SHA-256 integrity verification
- Comprehensive metadata capture (user, action, resource, timestamp)
- Request context tracking (IP address, user agent, request ID)
- Service context (service name, endpoint, status)
- Success/failure tracking with error details
- JSON metadata for additional context
- Automated logging for authentication events:
- User registration, login, logout
- Token refresh, token revocation
- Password changes, failed authentication attempts
- Query capabilities:
- Retrieve audit logs by user, action, timerange
- Integrity verification for tamper detection
- Composite indexes for efficient queries
- Database table:
audit_logs(PostgreSQL with JSONB support)
- HIPAA-compliant audit logging:
Core Infrastructure
Request ID Middleware (app/core/request_id.py):
- Generates unique UUID v4 for each request
- Accepts client-provided request IDs via
X-Request-IDheader - Returns request ID in response header for correlation
- Enables distributed tracing across services
- Stored in
request.state.request_idfor access in route handlers
API Envelope Standardization (app/core/api_envelope.py):
- Consistent response format for all endpoints:
{ "success": true/false, "data": {...} | null, "error": {code, message, details, field} | null, "metadata": {version, request_id, pagination}, "timestamp": "2024-11-20T12:00:00Z" } - Standard error codes (
ErrorCodesclass):- INVALID_CREDENTIALS, TOKEN_EXPIRED, TOKEN_REVOKED
- WEAK_PASSWORD, VALIDATION_ERROR, NOT_FOUND
- UNAUTHORIZED, FORBIDDEN, INTERNAL_ERROR
- Helper functions:
success_response(data, request_id, version, pagination)error_response(code, message, details, field, request_id)
- Pagination support via
PaginationMetadatamodel - Benefits:
- Simplified client-side error handling
- Consistent API experience across all endpoints
- Built-in request correlation for debugging
API Contracts
Each service exposes REST API endpoints documented in OpenAPI/Swagger.
Example: Search Service API
POST /api/v1/search
Request:
{
"query": "treatment for hypertension",
"filters": {"specialty": "cardiology"},
"limit": 10
}
Response:
{
"results": [
{
"document_id": "uuid",
"title": "Harrison's Principles - Chapter 252",
"snippet": "...",
"relevance_score": 0.95
}
]
}
Migration Path
Step-by-Step Migration (Monorepo → Microservices)
Phase 11: Prepare for Split
- Ensure Clean Boundaries: Verify modules don't have circular dependencies
- Extract Shared Code: Move shared models/schemas to
shared/library - Create Service Interfaces: Define API contracts for each service
- Add Service Tests: Test each module independently
Phase 12: Split Services
-
Start with Independent Services: Extract services with fewest dependencies first
- Search Service (only depends on Qdrant)
- PHI Detection Service (self-contained)
-
Extract Core Services: Move API-facing services next
- Auth Service
- Chat Service
- Admin Service
-
Last: Shared Services: Extract services used by others
- Knowledge Base Service
- External APIs Service
Phase 13: Deploy to Kubernetes
- Create Dockerfiles: One per service
- Create K8s Manifests: Deployments, Services, ConfigMaps, Secrets
- Set Up Service Mesh (optional): Istio or Linkerd for mTLS, observability
- Deploy to Dev Cluster: Test inter-service communication
- Deploy to Prod: Gradual rollout with monitoring
Shared Library Pattern
shared/ Package:
# shared/models/user.py from sqlalchemy import Column, String, Boolean from shared.models.base import Base class User(Base): __tablename__ = "users" id = Column(String, primary_key=True) email = Column(String, unique=True) # ... (same across all services)
Install shared library in each service:
pip install -e /path/to/shared
Or publish to private PyPI:
pip install voiceassist-shared==1.0.0
References
- DATA_MODEL.md - Canonical data entities
- SERVICE_CATALOG.md - Complete service descriptions
- ARCHITECTURE_V2.md - System architecture overview
- DEVELOPMENT_PHASES_V2.md - Phase-by-phase plan
- COMPOSE_TO_K8S_MIGRATION.md - K8s migration guide
- server/README.md - Backend implementation guide
VoiceAssist Frontend Architecture
Last Updated: 2025-12-03 Status: Production Ready (Phases 0-3.5 Complete, Web App and Admin Panel stable) Detailed Spec: client-implementation/TECHNICAL_ARCHITECTURE.md
Overview
VoiceAssist uses a pnpm monorepo with Turborepo for build orchestration. All frontend applications share common packages for consistency, type safety, and code reuse.
Quick Facts
| Aspect | Technology |
|---|---|
| Package Manager | pnpm 8+ |
| Build System | Turborepo |
| UI Framework | React 18+ |
| Language | TypeScript (strict mode) |
| Bundler | Vite (apps), Rollup (packages) |
| State Management | Zustand |
| Styling | Tailwind CSS |
| Component Library | shadcn/ui + custom |
Architecture Diagram
┌─────────────────────────────────────────────────────────────┐
│ apps/ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ web-app │ │ admin-panel │ │ docs-site │ │
│ │ (Vite) │ │ (Vite) │ │ (Next.js 14) │ │
│ │ │ │ │ │ │ │
│ │ User-facing │ │ Admin ops │ │ Documentation │ │
│ │ medical AI │ │ dashboard │ │ & guides │ │
│ └──────┬───────┘ └──────┬───────┘ └────────┬─────────┘ │
│ │ │ │ │
│ └─────────────────┼────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ packages/ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │ │
│ │ │ ui │ │ types │ │ utils │ │ api- │ │ │
│ │ │ │ │ │ │ │ │ client │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ config │ │telemetry │ │ design- │ │ │
│ │ │ │ │ │ │ tokens │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Backend (FastAPI) │
│ services/api-gateway/ │
└─────────────────────────────────────────────────────────────┘
Applications
1. Web App (apps/web-app/)
Main user-facing medical AI assistant application.
Features:
- Unified Chat/Voice Interface (NEW) - Seamless text and voice mode in single view
- Medical knowledge retrieval with citations
- Document upload and management
- Conversation history with branching
- PHI-safe data handling
Key Components:
| Component | Path | Purpose |
|---|---|---|
| UnifiedChatContainer | src/components/unified-chat/ | Three-panel layout with sidebar, main, context pane |
| CollapsibleSidebar | src/components/unified-chat/ | Conversation list with pinning and search |
| UnifiedInputArea | src/components/unified-chat/ | Text/voice mode toggle |
| CollapsibleContextPane | src/components/unified-chat/ | Citations, clinical context, branches |
Entry Point: src/main.tsx
Dev Port: 5173
Documentation: See UNIFIED_CHAT_VOICE_UI.md
2. Admin Panel (apps/admin-panel/)
System administration and monitoring dashboard.
Features:
- Real-time system metrics
- User management (RBAC)
- Knowledge base administration
- Feature flag management
- Audit log viewer
Entry Point: src/main.tsx
Dev Port: 5174
3. Docs Site (apps/docs-site/)
Documentation website built with Next.js 14.
Features:
- Markdown documentation rendering
- Navigation from
navigation.tsconfig - Support for docs from multiple locations (
@root/prefix) - Search functionality (planned)
Entry Point: src/app/layout.tsx
Dev Port: 3000
Shared Packages
| Package | Purpose | Key Exports |
|---|---|---|
@voiceassist/ui | React component library | Button, Input, Card, ChatMessage, etc. |
@voiceassist/types | TypeScript type definitions | API types, User, Session, Message, etc. |
@voiceassist/utils | Utility functions | PHI detection, formatters, validators |
@voiceassist/api-client | HTTP client | Type-safe API calls, auto token injection |
@voiceassist/config | Shared configurations | ESLint, Prettier, Tailwind presets |
@voiceassist/telemetry | Observability | Error tracking, analytics helpers |
@voiceassist/design-tokens | Design system | Colors, typography, spacing tokens |
Development Commands
# Install dependencies pnpm install # Start all apps in dev mode pnpm dev # Start specific app pnpm --filter web-app dev pnpm --filter admin-panel dev pnpm --filter docs-site dev # Build all packages pnpm build # Run tests pnpm test # Type checking pnpm type-check # Lint pnpm lint # Storybook (component library) pnpm storybook
State Management
Zustand is used for client-side state management.
// Store structure pattern interface AppStore { // Auth state user: User | null; token: string | null; // UI state sidebarOpen: boolean; theme: "light" | "dark"; // Actions login: (credentials: LoginCredentials) => Promise<void>; logout: () => void; }
API Communication
REST API
Use @voiceassist/api-client for all backend calls:
import { apiClient } from "@voiceassist/api-client"; // Typed API call with auto-token injection const sessions = await apiClient.conversations.list(); const session = await apiClient.conversations.create({ title: "New Chat" });
WebSocket
Real-time communication for streaming responses:
import { useWebSocket } from "@/hooks/useWebSocket"; const { connect, send, messages } = useWebSocket("/ws"); // Send message send({ type: "chat", content: "Hello" }); // Receive streaming response messages.forEach((msg) => { if (msg.type === "assistant_chunk") { appendToResponse(msg.content); } });
Key Design Patterns
1. Feature-based Organization
src/
├── features/
│ ├── chat/
│ │ ├── components/
│ │ ├── hooks/
│ │ ├── services/
│ │ └── index.ts
│ ├── auth/
│ └── admin/
2. Type-safe API Layer
All API calls are typed end-to-end using shared types from @voiceassist/types.
3. PHI Protection
Client-side PHI detection using @voiceassist/utils:
import { detectPHI, redactPHI } from "@voiceassist/utils"; if (detectPHI(userInput)) { // Warn user or apply redaction const safe = redactPHI(userInput); }
Related Documentation
- Detailed Architecture: client-implementation/TECHNICAL_ARCHITECTURE.md
- Development Roadmap: client-implementation/CLIENT_DEV_ROADMAP.md
- Web App Specs: WEB_APP_SPECS.md
- Admin Panel Specs: ADMIN_PANEL_SPECS.md
- Component Library: Run
pnpm storybookto view
Version History
| Version | Date | Changes |
|---|---|---|
| 1.1.0 | 2025-12-03 | Updated status to Production Ready (Phase 3.5 done) |
| 1.0.0 | 2025-11-27 | Initial architecture document |
VoiceAssist Real-time Architecture
Last Updated: 2025-11-27 Status: Production Ready
Related Documentation:
- WebSocket Protocol - Wire protocol specification
- Voice Mode Pipeline - Voice-specific implementation
- Implementation Status - Component status
Overview
VoiceAssist uses WebSocket connections for real-time bidirectional communication, enabling:
- Streaming chat responses - Token-by-token LLM output
- Voice interactions - Speech-to-text and text-to-speech
- Live updates - Typing indicators, connection status
Architecture Diagram
┌─────────────────────────────────────────────────────────────────────────┐
│ Client │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ Chat UI │ │ Voice Input │ │ Connection Manager │ │
│ │ │ │ (Web Audio) │ │ - Reconnection │ │
│ │ - Messages │ │ - Mic capture │ │ - Heartbeat │ │
│ │ - Streaming │ │ - STT stream │ │ - Token refresh │ │
│ └────────┬────────┘ └────────┬────────┘ └────────────┬────────────┘ │
│ │ │ │ │
│ └────────────────────┼────────────────────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ WebSocket │ │
│ │ Client │ │
│ └──────┬──────┘ │
└────────────────────────────────┼────────────────────────────────────────┘
│
WSS/WS │
│
┌────────────────────────────────┼────────────────────────────────────────┐
│ │ │
│ ┌──────▼──────┐ │
│ │ WebSocket │ │
│ │ Handler │ │
│ │ (FastAPI) │ │
│ └──────┬──────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │
│ │ Chat │ │ Voice │ │ Connection │ │
│ │ Service │ │ Service │ │ Manager │ │
│ │ │ │ │ │ │ │
│ │ - RAG Query │ │ - STT │ │ - Sessions │ │
│ │ - LLM Call │ │ - TTS │ │ - Heartbeat │ │
│ │ - Streaming │ │ - VAD │ │ - Auth │ │
│ └──────┬──────┘ └──────┬──────┘ └─────────────┘ │
│ │ │ │
│ └────────────────────┼────────────────────────────────────────┤
│ │ │
│ ┌──────▼──────┐ │
│ │ OpenAI │ │
│ │ API │ │
│ │ │ │
│ │ - GPT-4 │ │
│ │ - Whisper │ │
│ │ - TTS │ │
│ └─────────────┘ │
│ │
│ Backend │
└─────────────────────────────────────────────────────────────────────────┘
Connection Lifecycle
1. Connection Establishment
Client Server
│ │
├──── WebSocket Connect ─────────────────►│
│ (with token & conversationId) │
│ │
│◄──── connection_established ────────────┤
│ { connectionId, serverTime } │
│ │
2. Message Exchange
Client Server
│ │
├──── message ───────────────────────────►│
│ { content: "Hello" } │
│ │
│◄──── thinking ──────────────────────────┤
│ │
│◄──── assistant_chunk ───────────────────┤
│ { content: "Hi" } │
│◄──── assistant_chunk ───────────────────┤
│ { content: " there" } │
│◄──── assistant_chunk ───────────────────┤
│ { content: "!" } │
│ │
│◄──── message_complete ──────────────────┤
│ { messageId, totalTokens } │
│ │
3. Heartbeat
Client Server
│ │
├──── ping ──────────────────────────────►│
│ │
│◄──── pong ──────────────────────────────┤
│ │
WebSocket Endpoints
| Endpoint | Purpose |
|---|---|
/api/realtime/ws | Main chat WebSocket |
/api/voice/ws | Voice-specific WebSocket (future) |
Query Parameters
| Parameter | Required | Description |
|---|---|---|
conversationId | Yes | UUID of the conversation session |
token | Yes | JWT access token |
Connection URL Example
// Development ws://localhost:8000/api/realtime/ws?conversationId=uuid&token=jwt // Production wss://assist.asimo.io/api/realtime/ws?conversationId=uuid&token=jwt
Message Types
Client → Server
| Type | Description |
|---|---|
message | Send user message |
ping | Heartbeat ping |
stop | Cancel current response |
voice_start | Begin voice input (future) |
voice_chunk | Audio data chunk (future) |
voice_end | End voice input (future) |
Server → Client
| Type | Description |
|---|---|
connection_established | Connection successful |
thinking | AI is processing |
assistant_chunk | Streaming response chunk |
message_complete | Response finished |
error | Error occurred |
pong | Heartbeat response |
voice_transcript | Speech-to-text result (future) |
voice_audio | TTS audio chunk (future) |
Streaming Response Flow
RAG + LLM Pipeline
User Message → WebSocket Handler
│
▼
┌───────────────┐
│ RAG Service │ ← Retrieves relevant context
│ │ from Qdrant vector store
└───────┬───────┘
│
▼
┌───────────────┐
│ LLM Client │ ← Calls OpenAI with streaming
│ │
└───────┬───────┘
│
┌─────────┼─────────┐
│ │ │
▼ ▼ ▼
chunk_1 chunk_2 chunk_n
│ │ │
└─────────┼─────────┘
│
▼
WebSocket Send
(per chunk)
Streaming Implementation
# Backend (FastAPI WebSocket handler) async def handle_message(websocket, message): # Send thinking indicator await websocket.send_json({"type": "thinking"}) # Get RAG context context = await rag_service.retrieve(message.content) # Stream LLM response async for chunk in llm_client.stream_chat(message.content, context): await websocket.send_json({ "type": "assistant_chunk", "content": chunk.content }) # Send completion await websocket.send_json({ "type": "message_complete", "messageId": str(uuid.uuid4()), "totalTokens": chunk.usage.total_tokens })
Voice Architecture (Future Enhancement)
Voice Input Flow
Microphone → Web Audio API → VAD (Voice Activity Detection)
│
▼
Audio Chunks (PCM)
│
▼
WebSocket Send
│
▼
Server VAD + STT
│
▼
Transcript Event
Voice Output Flow
LLM Response Text → TTS Service (OpenAI/ElevenLabs)
│
▼
Audio Stream (MP3/PCM)
│
▼
WebSocket Send (chunks)
│
▼
Web Audio API Playback
Error Handling
Reconnection Strategy
class WebSocketClient { private reconnectAttempts = 0; private maxReconnectAttempts = 5; private baseDelay = 1000; // 1 second async reconnect() { const delay = Math.min( this.baseDelay * Math.pow(2, this.reconnectAttempts), 30000, // max 30 seconds ); await sleep(delay); this.reconnectAttempts++; if (this.reconnectAttempts < this.maxReconnectAttempts) { await this.connect(); } else { this.emit("connection_failed"); } } }
Error Types
| Error Code | Description | Client Action |
|---|---|---|
auth_failed | Invalid/expired token | Refresh token and reconnect |
session_not_found | Invalid conversation ID | Create new session |
rate_limited | Too many requests | Backoff and retry |
server_error | Internal server error | Retry with backoff |
Performance Considerations
Client-side
- Buffer chunks - Don't update DOM on every chunk
- Throttle renders - Use requestAnimationFrame
- Heartbeat interval - 30 seconds recommended
Server-side
- Connection pooling - Reuse OpenAI connections
- Chunk size - Optimize for network vs. latency
- Memory management - Clean up closed connections
Security
- Authentication - JWT token required in query params
- Rate limiting - Per-user connection limits
- Message validation - Schema validation on all messages
- TLS - WSS required in production
Related Documentation
- Protocol Specification: WEBSOCKET_PROTOCOL.md
- Voice Pipeline: VOICE_MODE_PIPELINE.md
- Backend Handler:
services/api-gateway/app/api/realtime.py - Client Hook:
apps/web-app/src/hooks/useWebSocket.ts
Version History
| Version | Date | Changes |
|---|---|---|
| 1.0.0 | 2025-11-27 | Initial architecture document |
Security & Compliance Guide
Overview
VoiceAssist V2 is designed as a HIPAA-compliant, zero-trust medical AI assistant that handles Protected Health Information (PHI). This document outlines security requirements, implementation strategies, and compliance procedures.
Table of Contents
- HIPAA Compliance
- Zero-Trust Architecture
- Encryption
- Authentication & Authorization
- PHI Detection & Redaction
- Audit Logging
- Network Security
- Data Retention & Disposal
- Incident Response
- Security Monitoring
- Compliance Checklists
HIPAA Compliance
HIPAA Security Rule Requirements
VoiceAssist implements the following HIPAA Security Rule requirements:
Administrative Safeguards
1. Security Management Process
- Risk Analysis: Annual security risk assessments
- Risk Management: Documented mitigation strategies
- Sanction Policy: Employee discipline for violations
- Information System Activity Review: Regular audit log reviews
2. Assigned Security Responsibility
- Designated Security Official (Admin role)
- Security incident response team
- Regular security training
3. Workforce Security
- Authorization/Supervision procedures
- Workforce clearance procedures
- Termination procedures (access revocation)
4. Information Access Management
- Access Authorization policies
- Access Establishment/Modification procedures
- Role-Based Access Control (RBAC)
5. Security Awareness and Training
- Security reminders (quarterly)
- Protection from malicious software
- Log-in monitoring
- Password management training
6. Security Incident Procedures
- Incident response plan
- Incident reporting procedures
- Incident documentation
7. Contingency Plan
- Data backup plan (automated daily backups)
- Disaster recovery plan
- Emergency mode operation plan
- Testing and revision procedures
8. Evaluation
- Annual security evaluations
- Periodic technical and non-technical evaluations
9. Business Associate Agreements
- OpenAI API (Business Associate Agreement required)
- UpToDate API (BAA required)
- OpenEvidence API (BAA required)
- Cloud hosting provider (BAA required if using cloud)
Physical Safeguards
1. Facility Access Controls
- Contingency operations (backup power, redundancy)
- Facility security plan (datacenter access controls)
- Access control and validation procedures
- Maintenance records
2. Workstation Use
- Workstation security policies
- Screen lock requirements (5 minutes idle)
- Encrypted workstations
3. Workstation Security
- Physical security of workstations
- Restricted access to terminals
4. Device and Media Controls
- Disposal procedures (secure wipe/destroy)
- Media re-use procedures
- Accountability tracking
- Data backup and storage
Technical Safeguards
1. Access Control
- Unique User Identification (via JWT tokens with email, Phase 2; Nextcloud OIDC in Phase 6+)
- Emergency Access Procedure (admin override)
- Automatic Logoff (access tokens expire after 15 minutes, refresh tokens after 7 days)
- Encryption and Decryption (AES-256)
- Rate limiting on authentication endpoints to prevent brute force attacks
2. Audit Controls
- Hardware, software, and procedural mechanisms to record and examine activity
3. Integrity
- Mechanism to authenticate ePHI is not improperly altered or destroyed
- Digital signatures for critical data
4. Person or Entity Authentication
- Verify that a person or entity seeking access is who they claim to be
- Multi-factor authentication available
5. Transmission Security
- Integrity controls (checksums, digital signatures)
- Encryption (TLS 1.3 for all network communications)
HIPAA Implementation in VoiceAssist
| HIPAA Requirement | VoiceAssist Implementation |
|---|---|
| Access Control | RBAC via JWT tokens (Phase 2), Nextcloud OIDC integration (Phase 6+) |
| Audit Logging | Comprehensive audit logs (all PHI access tracked) |
| Authentication | JWT with bcrypt password hashing (Phase 2), OIDC/OAuth2 + optional MFA (Phase 6+) |
| Encryption at Rest | AES-256 encryption for database and file storage |
| Encryption in Transit | TLS 1.3 for all communications |
| Data Backup | Automated daily backups with encryption |
| Emergency Access | Admin override with audit trail |
| Session Management | Access tokens (15-min), refresh tokens (7-day), rate limiting on auth endpoints |
| PHI Minimization | PHI detection service redacts unnecessary PHI |
| Audit Trail | Immutable audit logs stored separately |
Zero-Trust Architecture
Zero-Trust Principles
- Never Trust, Always Verify: Every request is authenticated and authorized
- Least Privilege Access: Users/services get minimum required permissions
- Assume Breach: Design assumes attacker has network access
- Verify Explicitly: Use all available data points for authorization decisions
- Microsegmentation: Network isolation between services
Implementation
1. Service-to-Service Authentication
Docker Compose (Phases 0-10):
# Each service authenticates via API keys services: api-gateway: environment: - SERVICE_API_KEY=${API_GATEWAY_KEY} medical-kb: environment: - SERVICE_API_KEY=${MEDICAL_KB_KEY} - REQUIRED_API_KEYS=${API_GATEWAY_KEY}
Kubernetes (Phases 11-14):
# Service mesh (Linkerd) provides mTLS --- apiVersion: v1 kind: Service metadata: annotations: linkerd.io/inject: enabled spec: # mTLS automatically enabled
2. Network Segmentation
Docker Compose:
networks: public: # API Gateway only internal: # Microservices database: # Database access only internal: true # No external access
Kubernetes:
apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: api-gateway-policy spec: podSelector: matchLabels: app: api-gateway policyTypes: - Ingress - Egress ingress: - from: - podSelector: {} ports: - protocol: TCP port: 8000 egress: - to: - podSelector: matchLabels: app: auth-service ports: - protocol: TCP port: 8002
3. Identity-Based Access
# Every API request requires: # 1. Valid JWT token from Nextcloud OIDC # 2. Role-based permission check # 3. Resource-level access validation @router.get("/medical-record/{record_id}") async def get_medical_record( record_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # 1. User already authenticated (JWT valid) # 2. Check user role if current_user.role not in ["doctor", "nurse", "admin"]: raise HTTPException(status_code=403) # 3. Check resource-level access record = db.query(MedicalRecord).filter( MedicalRecord.id == record_id, MedicalRecord.authorized_users.contains(current_user.id) ).first() if not record: raise HTTPException(status_code=404) # 4. Log access audit_log.log_access( user_id=current_user.id, resource="medical_record", resource_id=record_id, action="read" ) return record
4. Short-Lived Credentials
# JWT tokens expire after 1 hour JWT_EXPIRATION = 3600 # seconds # Refresh tokens expire after 7 days REFRESH_TOKEN_EXPIRATION = 604800 # seconds # Service-to-service tokens rotate every 5 minutes SERVICE_TOKEN_EXPIRATION = 300 # seconds
5. Continuous Verification
# Every request goes through middleware that verifies: # - Token validity # - Token not revoked # - User still has required permissions # - Rate limiting # - Anomaly detection @app.middleware("http") async def security_middleware(request: Request, call_next): # Verify token token = request.headers.get("Authorization", "").replace("Bearer ", "") if not verify_token(token): return JSONResponse(status_code=401, content={"error": "Invalid token"}) # Check if token revoked if await redis.get(f"revoked:{token}"): return JSONResponse(status_code=401, content={"error": "Token revoked"}) # Rate limiting user_id = get_user_from_token(token) if not await rate_limiter.check(user_id): return JSONResponse(status_code=429, content={"error": "Rate limit exceeded"}) # Anomaly detection if await detect_anomaly(user_id, request): await alert_security_team(user_id, request) response = await call_next(request) return response
Encryption
Encryption at Rest
1. Database Encryption
PostgreSQL (Transparent Data Encryption):
-- Enable pgcrypto extension CREATE EXTENSION pgcrypto; -- Encrypt sensitive columns CREATE TABLE medical_records ( id UUID PRIMARY KEY, patient_id UUID NOT NULL, diagnosis TEXT NOT NULL, -- Encrypted column notes TEXT, -- Encrypted column created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), encryption_key_id VARCHAR(255) NOT NULL ); -- Encrypt data before insert INSERT INTO medical_records (id, patient_id, diagnosis, notes, encryption_key_id) VALUES ( gen_random_uuid(), 'patient-uuid', pgp_sym_encrypt('Patient has diabetes', 'encryption_key'), pgp_sym_encrypt('Notes about treatment', 'encryption_key'), 'key-id-123' ); -- Decrypt on read SELECT id, patient_id, pgp_sym_decrypt(diagnosis::bytea, 'encryption_key') AS diagnosis, pgp_sym_decrypt(notes::bytea, 'encryption_key') AS notes FROM medical_records;
Application-Level Encryption:
from cryptography.fernet import Fernet import os class EncryptionService: def __init__(self): # Use environment variable for encryption key # In production, use key management service (AWS KMS, Azure Key Vault, etc.) self.key = os.environ.get("ENCRYPTION_KEY").encode() self.cipher = Fernet(self.key) def encrypt(self, data: str) -> bytes: """Encrypt plaintext data""" return self.cipher.encrypt(data.encode()) def decrypt(self, encrypted_data: bytes) -> str: """Decrypt encrypted data""" return self.cipher.decrypt(encrypted_data).decode() # Usage in models class MedicalRecord(Base): __tablename__ = "medical_records" id = Column(UUID, primary_key=True) patient_id = Column(UUID, nullable=False) _diagnosis = Column("diagnosis", LargeBinary) # Encrypted _notes = Column("notes", LargeBinary) # Encrypted @property def diagnosis(self) -> str: if self._diagnosis: return encryption_service.decrypt(self._diagnosis) return None @diagnosis.setter def diagnosis(self, value: str): if value: self._diagnosis = encryption_service.encrypt(value)
2. File Storage Encryption
import boto3 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend class SecureFileStorage: def __init__(self): self.s3 = boto3.client('s3') self.bucket = os.environ.get("S3_BUCKET") def upload_file(self, file_data: bytes, file_name: str, user_id: str): # Generate unique encryption key for this file file_key = os.urandom(32) iv = os.urandom(16) # Encrypt file cipher = Cipher( algorithms.AES(file_key), modes.GCM(iv), backend=default_backend() ) encryptor = cipher.encryptor() encrypted_data = encryptor.update(file_data) + encryptor.finalize() # Store encryption key in database (encrypted with master key) encryption_key_record = FileEncryptionKey( file_id=file_name, encrypted_key=master_encrypt(file_key), iv=iv, user_id=user_id ) db.add(encryption_key_record) db.commit() # Upload to S3 with server-side encryption self.s3.put_object( Bucket=self.bucket, Key=file_name, Body=encrypted_data, ServerSideEncryption='AES256' )
3. Backup Encryption
#!/bin/bash # backup-encrypted.sh BACKUP_DIR="/opt/backups" DATE=$(date +%Y%m%d_%H%M%S) ENCRYPTION_KEY="$BACKUP_ENCRYPTION_KEY" # From environment # Backup PostgreSQL and encrypt docker exec voiceassist-prod-postgres-1 pg_dump -U voiceassist voiceassist | \ gzip | \ openssl enc -aes-256-cbc -salt -pbkdf2 -k "$ENCRYPTION_KEY" \ > "$BACKUP_DIR/voiceassist_db_$DATE.sql.gz.enc" # Backup files and encrypt tar czf - /data/voiceassist | \ openssl enc -aes-256-cbc -salt -pbkdf2 -k "$ENCRYPTION_KEY" \ > "$BACKUP_DIR/voiceassist_data_$DATE.tar.gz.enc" echo "Encrypted backups created"
Encryption in Transit
1. TLS Configuration
Traefik TLS Configuration:
# traefik.yml entryPoints: websecure: address: ":443" http: tls: options: strict tls: options: strict: minVersion: VersionTLS13 cipherSuites: - TLS_AES_256_GCM_SHA384 - TLS_CHACHA20_POLY1305_SHA256 curvePreferences: - CurveP521 - CurveP384
2. Internal Service Communication
Docker Compose (Phases 0-10):
# Use internal networks + API key authentication services: api-gateway: networks: - public - internal environment: - TLS_CERT=/certs/cert.pem - TLS_KEY=/certs/key.pem
Kubernetes (Phases 11-14):
# Linkerd provides automatic mTLS --- apiVersion: linkerd.io/v1alpha2 kind: ServiceProfile metadata: name: medical-kb spec: routes: - condition: method: GET pathRegex: /api/.* name: api-route isRetryable: false timeout: 30s
3. Client-to-Server (WebRTC Voice)
// WebRTC with DTLS-SRTP encryption const peerConnection = new RTCPeerConnection({ iceServers: [{ urls: "stun:stun.l.google.com:19302" }], // Force DTLS-SRTP encryption bundlePolicy: "max-bundle", rtcpMuxPolicy: "require", }); // Verify encryption is active peerConnection.getStats().then((stats) => { stats.forEach((report) => { if (report.type === "transport") { console.log("DTLS State:", report.dtlsState); // Must be 'connected' console.log("SRTP Cipher:", report.srtpCipher); // e.g., 'AES_CM_128_HMAC_SHA1_80' } }); });
Authentication & Authorization
Authentication Flow (Phase 2: JWT-based)
Current Implementation (Phase 2):
1. User → Web App (email + password)
2. Web App → API Gateway POST /api/auth/login
3. API Gateway → Database (validate credentials)
4. API Gateway verifies password hash (bcrypt)
5. API Gateway → Web App (access token + refresh token)
6. Web App stores tokens securely
7. Web App → API Gateway (requests with Authorization: Bearer <access_token>)
8. API Gateway verifies JWT signature and expiry
9. API Gateway extracts user info from token payload
10. API Gateway → Web App (protected resource)
JWT Token Details (Phase 2 Enhancements):
- Access Token: 15-minute expiry, HS256 algorithm, contains user ID + email + role
- Refresh Token: 7-day expiry, used to obtain new access tokens
- Token Revocation (
app/services/token_revocation.py):- Redis-based blacklisting for immediate invalidation
- Dual-level revocation (individual token + all user tokens)
- Fail-open design (allows requests if Redis unavailable)
- Automatic TTL management matching token expiry
- Used for logout, password changes, security breaches
- Password Security:
- Hashing: bcrypt via passlib (12 rounds)
- Validation (
app/core/password_validator.py):- Minimum 8 characters (configurable)
- Requires uppercase, lowercase, digits, special characters
- Rejects common passwords (password, 123456, qwerty, etc.)
- Detects sequential characters (abc, 123, etc.)
- Detects repeated characters (aaa, 111, etc.)
- Strength scoring (0-100): Weak (<40), Medium (40-70), Strong (≥70)
- Rate Limiting:
- Registration: 5 requests/hour per IP
- Login: 10 requests/minute per IP
- Token refresh: 20 requests/minute per IP
- Request Tracking (
app/core/request_id.py):- Unique UUID v4 for each request
- Returned in X-Request-ID response header
- Correlated across audit logs for debugging
- API Response Format (
app/core/api_envelope.py):- Standardized envelope with success/error/metadata/timestamp
- Standard error codes (INVALID_CREDENTIALS, TOKEN_EXPIRED, TOKEN_REVOKED, etc.)
- Request ID correlation in metadata
Future Enhancement (Phase 6+):
Full OIDC integration with Nextcloud:
1. User → VoiceAssist Web App
2. Web App → Nextcloud OIDC (/auth/login)
3. Nextcloud → User (login form)
4. User → Nextcloud (credentials)
5. Nextcloud → Web App (authorization code)
6. Web App → Nextcloud (/token endpoint)
7. Nextcloud → Web App (ID token + access token)
8. Web App → API Gateway (access token)
9. API Gateway → Auth Service (verify token)
10. Auth Service → Nextcloud (validate token)
11. Nextcloud → Auth Service (user info)
12. Auth Service → API Gateway (JWT token with user info + roles)
13. API Gateway → Web App (JWT token)
14. Web App stores JWT in httpOnly cookie
Authorization Levels
| Role | Permissions |
|---|---|
| Admin | Full system access, user management, audit log access |
| Doctor | Read/write patient records, prescribe medications, view medical knowledge |
| Nurse | Read/write patient records, limited prescribing, view medical knowledge |
| Patient | Read own records only, limited voice assistant access |
| Researcher | Read de-identified data only, no PHI access |
| API Service | Service-specific permissions (e.g., file-indexer can read files) |
RBAC Implementation
from enum import Enum from typing import List class Role(str, Enum): ADMIN = "admin" DOCTOR = "doctor" NURSE = "nurse" PATIENT = "patient" RESEARCHER = "researcher" class Permission(str, Enum): READ_PATIENT_RECORD = "read:patient_record" WRITE_PATIENT_RECORD = "write:patient_record" DELETE_PATIENT_RECORD = "delete:patient_record" PRESCRIBE_MEDICATION = "prescribe:medication" VIEW_AUDIT_LOGS = "view:audit_logs" MANAGE_USERS = "manage:users" ACCESS_DEIDENTIFIED_DATA = "access:deidentified_data" # Role-Permission mapping ROLE_PERMISSIONS = { Role.ADMIN: [p for p in Permission], # All permissions Role.DOCTOR: [ Permission.READ_PATIENT_RECORD, Permission.WRITE_PATIENT_RECORD, Permission.PRESCRIBE_MEDICATION, ], Role.NURSE: [ Permission.READ_PATIENT_RECORD, Permission.WRITE_PATIENT_RECORD, ], Role.PATIENT: [ Permission.READ_PATIENT_RECORD, # Own records only ], Role.RESEARCHER: [ Permission.ACCESS_DEIDENTIFIED_DATA, ], } def require_permission(permission: Permission): """Decorator to enforce permission requirements""" def decorator(func): async def wrapper(*args, current_user: User, **kwargs): user_permissions = ROLE_PERMISSIONS.get(current_user.role, []) if permission not in user_permissions: raise HTTPException( status_code=403, detail=f"Permission denied: requires {permission}" ) return await func(*args, current_user=current_user, **kwargs) return wrapper return decorator # Usage @router.delete("/patient-record/{record_id}") @require_permission(Permission.DELETE_PATIENT_RECORD) async def delete_patient_record( record_id: str, current_user: User = Depends(get_current_user) ): # Only admins can reach here pass
PHI Detection & Redaction
PHI Detection Service
import re from typing import List, Dict import spacy class PHIDetector: """Detect and redact Protected Health Information""" def __init__(self): # Load NLP model for NER self.nlp = spacy.load("en_core_web_sm") # PHI patterns (18 HIPAA identifiers) self.patterns = { "name": r"\b[A-Z][a-z]+ [A-Z][a-z]+\b", "ssn": r"\b\d{3}-\d{2}-\d{4}\b", "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "mrn": r"\bMRN:?\s*\d{6,10}\b", "date": r"\b\d{1,2}/\d{1,2}/\d{2,4}\b", "zipcode": r"\b\d{5}(-\d{4})?\b", "ip_address": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", "account_number": r"\b[A-Z]{2}\d{6,10}\b", } def detect(self, text: str) -> List[Dict]: """Detect all PHI in text""" phi_detected = [] # Regex-based detection for phi_type, pattern in self.patterns.items(): matches = re.finditer(pattern, text) for match in matches: phi_detected.append({ "type": phi_type, "value": match.group(), "start": match.start(), "end": match.end() }) # NLP-based detection (names, locations) doc = self.nlp(text) for ent in doc.ents: if ent.label_ in ["PERSON", "GPE", "LOC", "ORG", "DATE"]: phi_detected.append({ "type": ent.label_.lower(), "value": ent.text, "start": ent.start_char, "end": ent.end_char }) return phi_detected def redact(self, text: str, redaction_char="*") -> str: """Redact all detected PHI""" phi_list = self.detect(text) # Sort by position (reverse order to maintain indices) phi_list.sort(key=lambda x: x["start"], reverse=True) result = text for phi in phi_list: redacted = redaction_char * (phi["end"] - phi["start"]) result = result[:phi["start"]] + redacted + result[phi["end"]:] return result def anonymize(self, text: str) -> str: """Replace PHI with placeholder tokens""" phi_list = self.detect(text) phi_list.sort(key=lambda x: x["start"], reverse=True) result = text for phi in phi_list: placeholder = f"[{phi['type'].upper()}]" result = result[:phi["start"]] + placeholder + result[phi["end"]:] return result # Usage phi_detector = PHIDetector() # Example text text = "Patient John Doe (SSN: 123-45-6789) visited on 01/15/2024. Contact: john.doe@email.com, 555-123-4567." # Detect PHI detected = phi_detector.detect(text) # [{'type': 'name', 'value': 'John Doe', ...}, {'type': 'ssn', 'value': '123-45-6789', ...}, ...] # Redact PHI redacted = phi_detector.redact(text) # "Patient ******** (SSN: ***-**-****) visited on **/**/****. Contact: *******************, ***-***-****." # Anonymize PHI anonymized = phi_detector.anonymize(text) # "Patient [NAME] (SSN: [SSN]) visited on [DATE]. Contact: [EMAIL], [PHONE]."
PHI Logging Policy
import logging from functools import wraps class PHISafeLogger: """Logger that automatically redacts PHI""" def __init__(self, name: str): self.logger = logging.getLogger(name) self.phi_detector = PHIDetector() def _redact_message(self, message: str) -> str: """Redact PHI from log message""" return self.phi_detector.redact(message) def info(self, message: str, **kwargs): self.logger.info(self._redact_message(message), **kwargs) def warning(self, message: str, **kwargs): self.logger.warning(self._redact_message(message), **kwargs) def error(self, message: str, **kwargs): self.logger.error(self._redact_message(message), **kwargs) # Usage logger = PHISafeLogger(__name__) logger.info(f"Patient John Doe logged in") # Logs: "Patient ******** logged in"
Tool PHI Security Rules
VoiceAssist's tools system (see TOOLS_AND_INTEGRATIONS.md) implements PHI-aware security controls to ensure compliance with HIPAA.
Tool PHI Classification
All tools are classified by their ability to handle PHI:
| Tool Name | Allows PHI | Execution Location | External API | Rationale |
|---|---|---|---|---|
get_calendar_events | ✅ Yes | Local/Nextcloud | No | Calendar data may contain patient appointments |
create_calendar_event | ✅ Yes | Local/Nextcloud | No | Event titles/descriptions may reference patients |
search_nextcloud_files | ✅ Yes | Local/Nextcloud | No | File names and metadata may contain PHI |
retrieve_nextcloud_file | ✅ Yes | Local/Nextcloud | No | File contents are clinical documents with PHI |
calculate_medical_score | ✅ Yes | Local compute | No | Calculations use patient-specific data (age, labs, etc.) |
generate_differential_diagnosis | ✅ Yes | Local LLM | No | DDx generated from patient symptoms and history |
search_openevidence | ❌ No | External API | Yes | External service - PHI must be stripped before sending |
search_pubmed | ❌ No | External API | Yes | External service - PHI must be stripped before sending |
search_medical_guidelines | ❌ No | Local vector DB | No | General medical knowledge, no patient data |
web_search_medical | ❌ No | External API | Yes | External service - PHI must be stripped before sending |
Key Principles:
- Local PHI Tools: Tools that access PHI (calendar, files, calculations, DDx) execute locally or via Nextcloud (same network)
- External Non-PHI Tools: Tools that call external APIs (OpenEvidence, PubMed, web search) must never receive PHI
- PHI Detection: All tool arguments are scanned for PHI before execution
- Violation Prevention: If PHI is detected in arguments to a non-PHI tool, execution is blocked with
PHI_VIOLATIONerror
PHI Detection in Tool Arguments
# server/app/services/orchestration/tool_executor.py from app.services.phi.detector import PHIDetector from app.services.tools.registry import TOOL_REGISTRY phi_detector = PHIDetector() async def execute_tool( tool_name: str, args: dict, user: UserContext, trace_id: str, ) -> ToolResult: """ Execute tool with PHI detection and enforcement. PHI Security Rules: 1. Detect PHI in all tool arguments 2. If PHI detected and tool.allows_phi = False, raise PHI_VIOLATION 3. If PHI detected and tool.allows_phi = True, route to local execution 4. Log all PHI detections to audit log """ tool_def = TOOL_REGISTRY[tool_name] # Scan all arguments for PHI phi_result = await phi_detector.detect_in_dict(args) if phi_result.contains_phi: # Log PHI detection audit_logger.info( "PHI detected in tool arguments", extra={ "tool_name": tool_name, "user_id": user.id, "trace_id": trace_id, "phi_types": phi_result.phi_types, # e.g., ["name", "mrn", "date"] "allows_phi": tool_def.allows_phi, } ) # Enforce PHI policy if not tool_def.allows_phi: # BLOCK: Tool cannot handle PHI raise ToolPHIViolationError( f"Tool '{tool_name}' cannot process PHI. " f"Detected: {', '.join(phi_result.phi_types)}. " f"Use a local tool or remove PHI from query." ) # Execute tool (PHI check passed) return await tool_def.execute(args, user, trace_id)
PHI Routing for AI Models
When generating tool calls via OpenAI Realtime API or other LLMs:
# server/app/services/orchestration/query_orchestrator.py async def route_query_to_llm( query: str, user: UserContext, trace_id: str, ) -> LLMResponse: """ Route query to appropriate LLM based on PHI content. PHI Routing Rules: - PHI detected → Local Llama 3.1 8B (on-prem) - No PHI → OpenAI GPT-4 (cloud) """ # Detect PHI in user query phi_result = await phi_detector.detect(query) if phi_result.contains_phi: # Route to LOCAL LLM llm_provider = "llama_local" model = "llama-3.1-8b-instruct" endpoint = "http://llm-service:8000/v1/chat/completions" audit_logger.info( "PHI detected - routing to local LLM", extra={ "query_length": len(query), "phi_types": phi_result.phi_types, "model": model, "user_id": user.id, "trace_id": trace_id, } ) else: # Route to CLOUD LLM llm_provider = "openai" model = "gpt-4-turbo" endpoint = "https://api.openai.com/v1/chat/completions" audit_logger.info( "No PHI detected - routing to cloud LLM", extra={ "query_length": len(query), "model": model, "user_id": user.id, "trace_id": trace_id, } ) # Make LLM request with tool definitions response = await llm_client.chat_completion( endpoint=endpoint, model=model, messages=[{"role": "user", "content": query}], tools=get_available_tools(phi_detected=phi_result.contains_phi), ) return response
Tool Definition PHI Flags
Tool definitions include allows_phi flag:
# server/app/tools/calendar_tool.py from app.tools.base import ToolDefinition calendar_tool = ToolDefinition( name="create_calendar_event", description="Create an event in the user's calendar", category="calendar", allows_phi=True, # ← PHI flag requires_confirmation=True, timeout_seconds=30, execute=create_calendar_event_impl, )
# server/app/tools/medical_search_tool.py openevidence_tool = ToolDefinition( name="search_openevidence", description="Search evidence-based medicine database", category="medical_search", allows_phi=False, # ← PHI flag (external API) requires_confirmation=False, timeout_seconds=10, execute=search_openevidence_impl, )
PHI Audit Trail
All tool invocations with PHI are logged to the audit log:
# After tool execution if phi_result.contains_phi: await audit_log_service.log_event( event_type="TOOL_CALL_PHI", user_id=user.id, resource_type="tool", resource_id=tool_name, action="execute", metadata={ "tool_name": tool_name, "phi_detected": True, "phi_types": phi_result.phi_types, "tool_allows_phi": tool_def.allows_phi, "execution_status": status, "duration_ms": duration_ms, "trace_id": trace_id, } )
PHI Error Responses
When PHI is detected in arguments to a non-PHI tool:
{ "success": false, "error": { "code": "PHI_VIOLATION", "message": "Tool 'search_openevidence' cannot process PHI. Detected: name, mrn. Use a local tool or remove PHI from query.", "details": { "tool_name": "search_openevidence", "allows_phi": false, "phi_types_detected": ["name", "mrn"], "suggested_tools": ["search_medical_guidelines", "generate_differential_diagnosis"] } }, "trace_id": "550e8400-e29b-41d4-a716-446655440000", "timestamp": "2025-11-20T12:34:56.789Z" }
Frontend Handling:
- Display user-friendly error message
- Suggest alternative tools that allow PHI
- Allow user to rephrase query without PHI
Related Documentation:
- TOOLS_AND_INTEGRATIONS.md - Complete tools specification with PHI classification
- ORCHESTRATION_DESIGN.md - Tool execution flow with PHI checks
- DATA_MODEL.md - ToolCall entity with
phi_detectedfield - OBSERVABILITY.md - Tool PHI detection metrics
Audit Logging
For logging conventions and metrics, see OBSERVABILITY.md.
Audit Log Requirements
Every access to PHI must be logged with:
- Who: User ID, role, email
- What: Action performed (read, write, delete, authentication events)
- When: Timestamp (UTC with timezone support)
- Where: IP address, service, endpoint, request ID
- Why: Purpose/reason (stored in metadata)
- Result: Success/failure with error details
Phase 2 Implementation Status
✅ IMPLEMENTED - Comprehensive audit logging system deployed in Phase 2:
Key Features:
- Immutable audit trail with SHA-256 integrity verification
- Authentication event logging (registration, login, logout, token refresh/revocation)
- Comprehensive metadata capture including IP address, user agent, request ID
- JSONB metadata field for extensible additional context
- Composite indexes for efficient queries by user, action, timestamp
- Automated integrity verification to detect tampering
- Fail-safe logging ensuring audit logs are created even if errors occur
Database Schema: audit_logs table (PostgreSQL with JSONB)
Service Layer:
app/services/audit_service.py- Audit logging serviceapp/models/audit_log.py- Audit log ORM model
Usage in Authentication Flow:
- All authentication events automatically logged
- Token revocation events captured
- Failed login attempts tracked
- Request IDs correlated for debugging
Audit Log Implementation (Phase 2)
from sqlalchemy import Column, String, DateTime, JSON, Text from datetime import datetime import hashlib class AuditLog(Base): __tablename__ = "audit_logs" id = Column(UUID, primary_key=True, default=uuid.uuid4) timestamp = Column(DateTime, nullable=False, default=datetime.utcnow) user_id = Column(UUID, nullable=False) user_role = Column(String(50), nullable=False) action = Column(String(100), nullable=False) # read, write, delete, export, etc. resource_type = Column(String(100), nullable=False) # patient_record, prescription, etc. resource_id = Column(String(255)) ip_address = Column(String(45)) user_agent = Column(Text) request_id = Column(String(100)) service_name = Column(String(100)) success = Column(Boolean, nullable=False) error_message = Column(Text) metadata = Column(JSON) # Additional context hash = Column(String(64), nullable=False) # Integrity verification def __init__(self, **kwargs): super().__init__(**kwargs) # Calculate hash for integrity self.hash = self.calculate_hash() def calculate_hash(self) -> str: """Calculate hash to detect tampering""" data = f"{self.timestamp}{self.user_id}{self.action}{self.resource_type}{self.resource_id}" return hashlib.sha256(data.encode()).hexdigest() def verify_integrity(self) -> bool: """Verify audit log has not been tampered with""" expected_hash = self.calculate_hash() return self.hash == expected_hash class AuditService: """Service for creating audit logs""" @staticmethod async def log_access( user_id: str, user_role: str, action: str, resource_type: str, resource_id: str = None, request: Request = None, success: bool = True, error_message: str = None, metadata: dict = None ): """Create audit log entry""" log_entry = AuditLog( user_id=user_id, user_role=user_role, action=action, resource_type=resource_type, resource_id=resource_id, ip_address=request.client.host if request else None, user_agent=request.headers.get("user-agent") if request else None, request_id=request.state.request_id if request else None, service_name="voiceassist", success=success, error_message=error_message, metadata=metadata ) db.add(log_entry) db.commit() # Also send to immutable log storage (e.g., WORM storage, blockchain) await send_to_immutable_storage(log_entry) # Decorator for automatic audit logging def audit_log(action: str, resource_type: str): def decorator(func): @wraps(func) async def wrapper(*args, current_user: User, **kwargs): success = True error_message = None try: result = await func(*args, current_user=current_user, **kwargs) return result except Exception as e: success = False error_message = str(e) raise finally: # Log regardless of success/failure resource_id = kwargs.get("record_id") or kwargs.get("patient_id") await AuditService.log_access( user_id=current_user.id, user_role=current_user.role, action=action, resource_type=resource_type, resource_id=resource_id, request=kwargs.get("request"), success=success, error_message=error_message ) return wrapper return decorator # Usage @router.get("/patient-record/{record_id}") @audit_log(action="read", resource_type="patient_record") async def get_patient_record( record_id: str, current_user: User = Depends(get_current_user), request: Request = None ): # Audit log created automatically return db.query(PatientRecord).filter_by(id=record_id).first()
Audit Log Retention
# Retain audit logs for 6 years (HIPAA requirement) AUDIT_LOG_RETENTION_YEARS = 6 # Archive old logs to cold storage async def archive_old_audit_logs(): """Archive audit logs older than 1 year to cold storage""" cutoff_date = datetime.utcnow() - timedelta(days=365) # Export to JSON old_logs = db.query(AuditLog).filter(AuditLog.timestamp < cutoff_date).all() # Write to encrypted archive with open(f"/archive/audit_logs_{cutoff_date.year}.json.enc", "w") as f: encrypted_data = encrypt_data(json.dumps([log.to_dict() for log in old_logs])) f.write(encrypted_data) # Verify integrity for log in old_logs: if not log.verify_integrity(): alert_security_team(f"Audit log integrity violation: {log.id}") # Delete from active database (after successful archive) db.query(AuditLog).filter(AuditLog.timestamp < cutoff_date).delete() db.commit()
Network Security
Firewall Rules
# UFW rules for production server sudo ufw default deny incoming sudo ufw default allow outgoing # Allow SSH (change port if using non-standard) sudo ufw allow 22/tcp # Allow HTTP/HTTPS sudo ufw allow 80/tcp sudo ufw allow 443/tcp # Deny all other ports sudo ufw enable
Network Policies (Kubernetes)
--- # Only API Gateway can receive external traffic apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: api-gateway-policy namespace: voiceassist spec: podSelector: matchLabels: app: api-gateway policyTypes: - Ingress - Egress ingress: - from: - namespaceSelector: {} # From any namespace ports: - protocol: TCP port: 8000 egress: - to: - podSelector: matchLabels: app: auth-service ports: - protocol: TCP port: 8002 --- # Database only accessible by specific services apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: postgres-policy namespace: voiceassist spec: podSelector: matchLabels: app: postgres policyTypes: - Ingress ingress: - from: - podSelector: matchLabels: app: api-gateway - podSelector: matchLabels: app: auth-service - podSelector: matchLabels: app: medical-kb ports: - protocol: TCP port: 5432
Data Retention & Disposal
Retention Policy
| Data Type | Retention Period | Disposal Method |
|---|---|---|
| Medical Records | 6 years after last visit | Secure wipe + shred (physical) |
| Audit Logs | 6 years | Encrypted archive, then secure wipe |
| Voice Recordings | 30 days (unless saved) | Secure wipe |
| Temporary Files | 24 hours | Automatic secure deletion |
| Backups | 30 days (rolling) | Encrypt, then secure wipe |
| De-identified Data | Indefinite | N/A (no PHI) |
Secure Deletion
import os import random def secure_delete(file_path: str, passes: int = 7): """ Securely delete file using DOD 5220.22-M standard (7-pass) """ if not os.path.exists(file_path): return file_size = os.path.getsize(file_path) with open(file_path, "ba+") as f: for pass_num in range(passes): f.seek(0) if pass_num in [0, 2, 4]: # Write zeros f.write(b'\x00' * file_size) elif pass_num in [1, 3, 5]: # Write ones f.write(b'\xFF' * file_size) else: # Write random data f.write(os.urandom(file_size)) f.flush() os.fsync(f.fileno()) # Finally, delete the file os.remove(file_path) # Log deletion audit_log.log_deletion(file_path) # Scheduled cleanup job @celery.task def cleanup_expired_files(): """Clean up files older than retention period""" cutoff_date = datetime.utcnow() - timedelta(days=30) expired_files = db.query(TemporaryFile).filter( TemporaryFile.created_at < cutoff_date ).all() for file_record in expired_files: # Secure delete physical file secure_delete(file_record.file_path) # Delete database record db.delete(file_record) db.commit()
Incident Response
Incident Response Plan
1. Preparation
- Incident response team identified
- Contact list maintained
- Incident response playbooks documented
- Regular drills conducted (quarterly)
2. Detection & Analysis
- 24/7 monitoring via Prometheus/Grafana
- Automated alerts for suspicious activity
- Log analysis for anomalies
- User reports
3. Containment
- Short-term: Isolate affected systems, revoke compromised credentials
- Long-term: Apply patches, update firewall rules
4. Eradication
- Remove malware/backdoors
- Close vulnerabilities
- Reset all passwords
5. Recovery
- Restore from clean backups
- Verify system integrity
- Gradual service restoration
6. Post-Incident
- Incident report (within 60 days for HIPAA breach)
- Lessons learned meeting
- Update security controls
- Notify affected users (if PHI breach)
Security Incident Examples
Unauthorized Access Attempt:
# Alert triggered when multiple failed login attempts @app.middleware("http") async def detect_brute_force(request: Request, call_next): user_ip = request.client.host # Check failed login count failed_count = await redis.get(f"failed_login:{user_ip}") if failed_count and int(failed_count) > 5: # Block IP await redis.setex(f"blocked:{user_ip}", 3600, "1") # Alert security team await alert_security_team( severity="high", message=f"Brute force attack detected from {user_ip}", metadata={"ip": user_ip, "failed_attempts": failed_count} ) return JSONResponse(status_code=403, content={"error": "Blocked"}) return await call_next(request)
Data Breach Response:
async def handle_data_breach(affected_users: List[str], breach_type: str): """ HIPAA Breach Notification Rule: Notify within 60 days """ # 1. Document breach breach_report = BreachReport( incident_id=str(uuid.uuid4()), discovered_at=datetime.utcnow(), breach_type=breach_type, affected_user_count=len(affected_users), description="Unauthorized access to patient records", mitigation_steps="Access revoked, passwords reset, audit log reviewed", reported_to_authorities=False ) db.add(breach_report) db.commit() # 2. Notify affected users (email) for user_id in affected_users: await send_breach_notification_email(user_id, breach_report) # 3. Notify HHS if >500 individuals affected if len(affected_users) > 500: await notify_hhs(breach_report) # 4. Post on website if >500 individuals in same state if breach_report.requires_media_notice(): await post_media_notice(breach_report) # 5. Document in breach log audit_log.log_breach(breach_report)
Security Monitoring
Metrics to Monitor
# Prometheus alerts groups: - name: security_alerts rules: # Failed login attempts - alert: HighFailedLoginRate expr: rate(failed_login_total[5m]) > 10 for: 1m labels: severity: warning annotations: summary: "High rate of failed login attempts" # Unauthorized access attempts - alert: UnauthorizedAccessAttempt expr: rate(http_requests_total{status="403"}[5m]) > 5 for: 1m labels: severity: high annotations: summary: "Multiple unauthorized access attempts detected" # Unusual data export volume - alert: UnusualDataExport expr: rate(data_export_bytes_total[10m]) > 1000000000 # 1GB/10min for: 5m labels: severity: critical annotations: summary: "Unusual volume of data exports detected" # PHI access outside business hours - alert: PHIAccessAfterHours expr: phi_access_total{hour="<8"} > 0 OR phi_access_total{hour=">18"} > 0 for: 1m labels: severity: warning annotations: summary: "PHI accessed outside business hours"
Security Dashboard (Grafana)
{ "dashboard": { "title": "Security Monitoring", "panels": [ { "title": "Failed Login Attempts (Last 24h)", "targets": [ { "expr": "sum(increase(failed_login_total[24h]))" } ] }, { "title": "Unauthorized Access by IP", "targets": [ { "expr": "topk(10, sum by (ip) (http_requests_total{status=\"403\"}))" } ] }, { "title": "PHI Access by User", "targets": [ { "expr": "sum by (user_id) (phi_access_total)" } ] }, { "title": "Audit Log Integrity Checks", "targets": [ { "expr": "audit_log_integrity_violations_total" } ] } ] } }
Compliance Checklists
Pre-Production Checklist
- All sensitive data encrypted at rest (AES-256)
- All network traffic encrypted in transit (TLS 1.3)
- OIDC authentication configured with Nextcloud
- RBAC implemented and tested
- PHI detection service deployed and tested
- Audit logging enabled for all PHI access
- Backup encryption enabled
- Firewall rules configured (deny by default)
- Network policies configured (Kubernetes)
- Business Associate Agreements signed (OpenAI, UpToDate, etc.)
- Incident response plan documented
- Security monitoring dashboard configured
- Automatic session timeout (30 minutes)
- Password policy enforced (min 12 characters, complexity)
- MFA available (optional but recommended)
- Vulnerability scanning completed
- Penetration testing completed
- Security training completed for all users
- HIPAA compliance review completed
- Privacy policy published
Annual Security Review
- Review audit logs for unusual activity
- Test backup restoration
- Test incident response procedures
- Update risk assessment
- Review and update access controls
- Vulnerability assessment
- Penetration testing
- Review Business Associate Agreements
- Staff security training refresh
- Update security policies
- Review and test disaster recovery plan
- Verify audit log integrity
- Review encryption keys (rotation)