VoiceAssist Docs

Architecture

System overview and component diagrams

stablemixed2025-12-02human, agent, backend, frontend, devops
architecturesystem-designoverview

VoiceAssist V2 - Unified Architecture Documentation

Last Updated: 2025-12-02 (All 16 Phases Complete) Status: Canonical Reference Purpose: Comprehensive system architecture covering all components, data flows, and integration points


Table of Contents

  1. Executive Summary
  2. System Overview
  3. Architecture Principles
  4. Current Implementation Status
  5. Component Architecture
  6. Data Architecture
  7. Integration Architecture
  8. Security Architecture
  9. Deployment Architecture
  10. Observability Architecture
  11. Data Flow Examples
  12. Technology Stack
  13. Architecture Evolution
  14. Design Decisions and Trade-offs

Executive Summary

VoiceAssist V2 is an enterprise-grade, HIPAA-compliant medical AI assistant designed to support clinical decision-making through voice and text interfaces. The system has completed all 16 phases (0-15) with progressive architecture:

  • Phases 0-10: Monorepo-first backend with Docker Compose orchestration
  • Phases 11-14: Security hardening, HA/DR, testing, production deployment
  • Phase 15: Final review and handoff

Current Capabilities (all phases complete):

  • ✅ JWT-based authentication with token revocation
  • ✅ Role-based access control (RBAC) for admin operations
  • ✅ RAG-powered medical knowledge base with semantic search
  • ✅ Real-time WebSocket communication for streaming responses
  • ✅ Nextcloud integration (CalDAV, WebDAV, file auto-indexing)
  • ✅ Multi-level caching (L1 in-memory + L2 Redis)
  • ✅ Comprehensive observability (Prometheus metrics, structured logging, SLOs)
  • ✅ Admin panel with system monitoring dashboard
  • ✅ Async background job processing for document indexing

Design Philosophy: Start simple (monorepo), maintain clear boundaries (logical services), scale when needed (microservices extraction).


System Overview

High-Level Architecture

┌─────────────────────────────────────────────────────────────────┐
│                        Users (Web/Mobile)                        │
│                  Browser / Mobile Apps / Web UI                  │
└────────────────┬────────────────────┬────────────────────────────┘
                 │                    │
          ┌──────┴──────┐      ┌──────┴──────┐
          │             │      │             │
          v             │      v             │
┌───────────────────┐   │  ┌──────────────────────────────────────┐
│  Nextcloud Stack  │   │  │    VoiceAssist Backend Stack         │
│  (Separate)       │   │  │    (This Repository)                 │
│                   │   │  │                                      │
│  - Identity/SSO   │◄──┼──│  API Gateway (FastAPI)               │
│  - File Storage   │   │  │  Port: 8000                          │
│  - Calendar       │   │  │                                      │
│  - Email          │   │  │  Logical Services (Phases 0-7):      │
│  - User Directory │   │  │  - Auth Service (JWT + RBAC)         │
│                   │   │  │  - Realtime Service (WebSocket)      │
│  Local Dev:       │   │  │  - RAG Service (QueryOrchestrator)   │
│  Port 8080        │   │  │  - Admin Service (Dashboard + Mgmt)  │
│                   │   │  │  - KB Indexer (Document Ingestion)   │
│  Production:      │   │  │  - Integration Service (CalDAV/File) │
│  cloud.asimo.io   │   │  │  - Cache Service (L1+L2)             │
└───────────────────┘   │  │  - Audit Service (Compliance)        │
                        │  │                                      │
                        │  │  Background Workers (ARQ):           │
                        │  │  - Document Indexing Jobs            │
                        │  │  - File Auto-Indexing                │
                        │  └──────────────────────────────────────┘
                        │
                        │  HTTPS / OIDC / WebDAV / CalDAV APIs
                        │
                        v
┌──────────────────────────────────────────────────────────────────┐
│                      Data Layer (Docker Compose)                 │
│                                                                   │
│  ┌──────────────────┐  ┌──────────────┐  ┌─────────────────┐   │
│  │  PostgreSQL      │  │  Redis       │  │  Qdrant         │   │
│  │  (pgvector)      │  │  (6 DBs)     │  │  (Vectors)      │   │
│  │                  │  │              │  │                 │   │
│  │  Tables:         │  │  DB 0: Cache │  │  Collection:    │   │
│  │  - users         │  │  DB 1: Queue │  │  - medical_kb   │   │
│  │  - sessions      │  │  DB 2: L2    │  │                 │   │
│  │  - messages      │  │  DB 3: Token │  │  Embedding:     │   │
│  │  - documents     │  │  DB 15: Test │  │  - 1536 dims    │   │
│  │  - audit_logs    │  │              │  │  - Cosine sim   │   │
│  └──────────────────┘  └──────────────┘  └─────────────────┘   │
└──────────────────────────────────────────────────────────────────┘
                            │
┌───────────────────────────┴───────────────────────────────────┐
│                 Observability Stack (Docker Compose)           │
│  ┌────────────┬────────────┬────────────┬───────────────┐    │
│  │ Prometheus │  Grafana   │  (Jaeger)  │  Loki (Logs)  │    │
│  │            │            │  (Future)  │  (Future)     │    │
│  │  Metrics:  │ Dashboards:│            │               │    │
│  │  - SLOs    │ - Health   │            │               │    │
│  │  - Cache   │ - SLOs     │            │               │    │
│  │  - RAG     │ - Security │            │               │    │
│  │  - RBAC    │            │            │               │    │
│  └────────────┴────────────┴────────────┴───────────────┘    │
└─────────────────────────────────────────────────────────────────┘

Key Architectural Separation

Nextcloud is a separate stack, not part of VoiceAssist deployment.

Local Development:

MacBook Pro
├── ~/Nextcloud-Dev/                    # Separate Nextcloud Stack
│   ├── docker-compose.yml              # Nextcloud + DB
│   └── Running at: http://localhost:8080
│
└── ~/VoiceAssist/                      # VoiceAssist Stack
    ├── docker-compose.yml              # All VoiceAssist services
    └── Running at: http://localhost:8000
    └── Connects via: NEXTCLOUD_BASE_URL=http://localhost:8080

Integration Pattern:

  • VoiceAssist services are clients of Nextcloud
  • Communication via HTTP/HTTPS APIs (OIDC, WebDAV, CalDAV, CardDAV)
  • No shared Docker Compose project, no shared databases
  • Environment variables configure the connection

Architecture Principles

1. Progressive Complexity

Start Simple: Begin with monorepo for rapid development Maintain Boundaries: Enforce logical service boundaries even in monorepo Scale When Needed: Extract to microservices only when scaling requires it

Decision Matrix:

FactorMonorepo (Current)Microservices (Future)
Team Size< 5 developers> 5 developers
Concurrent Users< 50 users> 50 users
DeploymentSingle serverMulti-node K8s cluster
Development SpeedFaster (single codebase)Slower (coordination overhead)
Operational ComplexityLow (Docker Compose)High (K8s, service mesh)

2. Security by Design

  • Zero-trust model: Never trust, always verify
  • PHI protection: Never log PHI, automatic redaction
  • Least privilege: RBAC with granular permissions
  • Encryption everywhere: TLS in transit, encryption at rest
  • Audit everything: Immutable audit logs for all sensitive operations

3. Observability First

  • Metrics: Prometheus for performance and SLO tracking
  • Logs: Structured JSON with correlation IDs
  • Tracing: Request context propagation (future: OpenTelemetry)
  • Dashboards: Grafana for real-time system health
  • Alerts: Multi-window, multi-burn-rate SLO alerting

4. API-First Design

  • Standard envelope: Consistent response format across all endpoints
  • Error codes: Typed error codes for client error handling
  • Versioning: API version in URL path (/api/v1/...)
  • Documentation: OpenAPI/Swagger auto-generated from code

5. Performance Optimization

  • Multi-level caching: L1 (LRU in-memory) + L2 (Redis distributed)
  • Connection pooling: Efficient database and API client connections
  • Async processing: Background jobs for long-running tasks
  • Query optimization: Indexed database queries, vector search tuning

Current Implementation Status

Phase Completion Summary

All 16 project phases (0-15) are complete. See Implementation Status for detailed component status.

PhaseStatusKey Deliverables
Phase 0✅ CompleteProject structure, Docker Compose, base infrastructure
Phase 1✅ CompletePostgreSQL, Redis, Qdrant, health endpoints, Alembic migrations
Phase 2✅ CompleteJWT auth, password validation, token revocation, Nextcloud integration
Phase 3✅ CompleteAPI Gateway solidified, core endpoints, service boundaries
Phase 4✅ CompleteWebSocket realtime communication, QueryOrchestrator integration
Phase 5✅ CompleteRAG pipeline, semantic search, document ingestion, OpenAI embeddings
Phase 6✅ CompleteCalDAV calendar, WebDAV file indexing, email skeleton
Phase 7✅ CompleteRBAC enforcement, admin panel dashboard, smoke tests
Phase 8✅ CompleteDistributed tracing, observability infrastructure
Phase 9✅ CompleteInfrastructure as code, CI/CD pipelines
Phase 10✅ CompleteLoad testing, performance optimization
Phase 11✅ CompleteSecurity hardening, HIPAA compliance
Phase 12✅ CompleteHigh availability, disaster recovery
Phase 13✅ CompleteFinal testing, documentation
Phase 14✅ CompleteProduction deployment
Phase 15✅ CompleteFinal review and handoff

Completed Features

Authentication & Authorization:

  • ✅ User registration with password strength validation
  • ✅ JWT access tokens (15-min) + refresh tokens (7-day)
  • ✅ Token revocation via Redis (dual-level: individual + all-user)
  • ✅ Role-based access control (admin vs user)
  • ✅ Admin-only endpoints protected with get_current_admin_user dependency
  • ✅ Comprehensive audit logging (SHA-256 integrity verification)

Medical AI & Knowledge Base:

  • ✅ Document upload (PDF, TXT support)
  • ✅ Text extraction and intelligent chunking
  • ✅ OpenAI embeddings (text-embedding-3-small, 1536 dimensions)
  • ✅ Qdrant vector storage with cosine similarity
  • ✅ RAG pipeline with context retrieval and citation tracking
  • ✅ QueryOrchestrator with LLM integration
  • ✅ Streaming responses via WebSocket

Nextcloud Integration:

  • ✅ CalDAV calendar operations (list, create, update, delete events)
  • ✅ WebDAV file discovery and auto-indexing
  • ✅ Automatic knowledge base population from Nextcloud files
  • ✅ Duplicate prevention for re-indexing

Observability & Operations:

  • ✅ Prometheus metrics (cache, RAG, RBAC, HTTP, DB)
  • ✅ Multi-level caching with hit/miss tracking
  • ✅ SLO definitions (availability, latency, cache performance)
  • ✅ SLO recording rules and alerting (Prometheus)
  • ✅ Grafana dashboards (health, SLOs, security audit)
  • ✅ Admin panel dashboard with system summary

Infrastructure:

  • ✅ Docker Compose orchestration
  • ✅ PostgreSQL with pgvector extension
  • ✅ Redis with multiple databases (cache, queue, L2, token revocation)
  • ✅ Qdrant vector database
  • ✅ ARQ async job queue for background processing
  • ✅ Alembic database migrations

Future Enhancements (Optional)

The following features are candidates for future enhancement beyond the current implementation:

  • ⏳ OIDC authentication integration (Nextcloud SSO)
  • ⏳ Per-user credential management
  • ⏳ Complete email integration (threading, search, attachments)
  • ⏳ CardDAV contacts integration
  • ⏳ BioGPT/PubMedBERT specialized medical models
  • ⏳ Multi-hop reasoning and complex retrieval strategies
  • ⏳ External integrations (UpToDate, OpenEvidence, PubMed live APIs)
  • ⏳ Microservices extraction (when scaling requires)

Component Architecture

Monorepo Structure

VoiceAssist/
├── services/
│   └── api-gateway/              # Main FastAPI application
│       ├── app/
│       │   ├── main.py           # Application entry point
│       │   ├── api/              # API routes (FastAPI routers)
│       │   │   ├── auth.py       # Authentication endpoints
│       │   │   ├── users.py      # User management
│       │   │   ├── realtime.py   # WebSocket endpoint
│       │   │   ├── admin_kb.py   # Admin KB management
│       │   │   ├── admin_panel.py # Admin dashboard
│       │   │   ├── integrations.py # Nextcloud integrations
│       │   │   └── metrics.py    # Prometheus metrics
│       │   ├── services/         # Business logic layer
│       │   │   ├── rag_service.py         # QueryOrchestrator (RAG pipeline)
│       │   │   ├── llm_client.py          # LLM interface
│       │   │   ├── kb_indexer.py          # Document ingestion
│       │   │   ├── search_aggregator.py   # Semantic search
│       │   │   ├── cache_service.py       # Multi-level caching
│       │   │   ├── audit_service.py       # Audit logging
│       │   │   ├── caldav_service.py      # Calendar integration
│       │   │   ├── nextcloud_file_indexer.py # File auto-indexing
│       │   │   ├── email_service.py       # Email skeleton
│       │   │   └── token_revocation.py    # Token blacklist
│       │   ├── models/           # SQLAlchemy ORM models
│       │   │   ├── user.py
│       │   │   ├── session.py
│       │   │   ├── message.py
│       │   │   └── audit_log.py
│       │   ├── core/             # Core infrastructure
│       │   │   ├── config.py     # Settings (Pydantic)
│       │   │   ├── database.py   # DB session management
│       │   │   ├── security.py   # JWT, password hashing
│       │   │   ├── dependencies.py # FastAPI dependencies
│       │   │   ├── api_envelope.py # Standard response format
│       │   │   ├── metrics.py    # Prometheus metrics definitions
│       │   │   ├── request_id.py # Request correlation
│       │   │   └── password_validator.py # Password strength
│       │   └── worker/           # Background job processing
│       │       ├── tasks.py      # ARQ tasks (document indexing)
│       │       └── worker.py     # ARQ worker entrypoint
│       ├── tests/
│       │   ├── unit/
│       │   ├── integration/
│       │   └── e2e/              # End-to-end tests (Phase 7)
│       ├── alembic/              # Database migrations
│       ├── requirements.txt
│       └── Dockerfile
├── infrastructure/
│   └── observability/
│       ├── prometheus/
│       │   ├── prometheus.yml
│       │   └── rules/
│       │       ├── slo_recording_rules.yml
│       │       └── slo_alerts.yml
│       └── grafana/
│           └── dashboards/
│               ├── health-monitoring.json
│               ├── slo-overview.json
│               └── security-audit.json
├── docs/                         # Documentation
│   ├── UNIFIED_ARCHITECTURE.md   # This document
│   ├── SERVICE_CATALOG.md
│   ├── DATA_MODEL.md
│   ├── operations/
│   │   └── SLO_DEFINITIONS.md
│   └── testing/
│       └── E2E_TESTING_GUIDE.md
├── docker-compose.yml            # Service orchestration
├── .env                          # Environment configuration
└── PHASE_STATUS.md               # Development status

Logical Service Boundaries

Even in monorepo, services maintain strict boundaries:

ServiceModule LocationResponsibilityDependencies
Auth Serviceapp/api/auth.py + app/core/security.pyUser registration, login, JWT tokens, RBACPostgreSQL, Redis (token revocation)
Realtime Serviceapp/api/realtime.pyWebSocket endpoint, streaming responsesQueryOrchestrator, LLMClient
Voice Pipeline Serviceapp/services/voice_pipeline_service.pyThinker-Talker voice orchestrationThinkerService, TalkerService
Thinker Serviceapp/services/thinker_service.pyLLM processing with tool/RAG supportLLMClient, RAGService
Talker Serviceapp/services/talker_service.pyTTS audio generation (ElevenLabs)ElevenLabsService
RAG Serviceapp/services/rag_service.pyQuery orchestration, RAG pipelineSearchAggregator, LLMClient, Qdrant
KB Indexerapp/services/kb_indexer.pyDocument ingestion, chunking, embeddingOpenAI API, Qdrant, PostgreSQL
Search Aggregatorapp/services/search_aggregator.pySemantic search, citation extractionQdrant, CacheService
Cache Serviceapp/services/cache_service.pyMulti-level caching (L1 + L2)Redis
Admin Serviceapp/api/admin_kb.py + app/api/admin_panel.pySystem management, dashboardAll services (monitoring)
Integration Serviceapp/api/integrations.py + app/services/caldav_service.pyNextcloud integrationsNextcloud APIs (CalDAV, WebDAV)
Audit Serviceapp/services/audit_service.pyCompliance logging, integrity verificationPostgreSQL
Worker Serviceapp/worker/Async background jobsRedis (ARQ), KBIndexer

Voice Architecture: The Thinker-Talker pipeline is the primary voice implementation. See Voice Mode Pipeline for details.

Service Communication Patterns

Synchronous (Direct Function Calls in Monorepo):

  • API routes → Service layer
  • Service → Service (internal imports)
  • Service → Database (SQLAlchemy)
  • Service → External APIs (HTTP clients)

Asynchronous (Background Jobs via ARQ):

  • Document indexing jobs
  • File auto-indexing from Nextcloud
  • Future: Email sending, scheduled tasks

Future (Microservices - Phases 11-14):

  • HTTP/REST between services
  • gRPC for high-performance internal communication
  • Message queue (RabbitMQ/Kafka) for async events

Data Architecture

Database Schema

PostgreSQL Tables (Alembic managed):

-- User Management CREATE TABLE users ( id UUID PRIMARY KEY, email VARCHAR(255) UNIQUE NOT NULL, hashed_password VARCHAR(255) NOT NULL, is_active BOOLEAN DEFAULT TRUE, is_admin BOOLEAN DEFAULT FALSE, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL ); -- Session Management CREATE TABLE sessions ( id UUID PRIMARY KEY, user_id UUID REFERENCES users(id), created_at TIMESTAMP NOT NULL, last_activity TIMESTAMP NOT NULL ); -- Conversation Messages CREATE TABLE messages ( id UUID PRIMARY KEY, session_id UUID REFERENCES sessions(id), user_id UUID REFERENCES users(id), role VARCHAR(50) NOT NULL, -- user, assistant, system content TEXT NOT NULL, created_at TIMESTAMP NOT NULL ); -- Audit Logs (HIPAA Compliance) CREATE TABLE audit_logs ( id UUID PRIMARY KEY, user_id UUID REFERENCES users(id), action VARCHAR(100) NOT NULL, resource_type VARCHAR(100), resource_id VARCHAR(255), ip_address VARCHAR(45), user_agent TEXT, request_id VARCHAR(255), service_name VARCHAR(100), endpoint VARCHAR(255), status_code INTEGER, success BOOLEAN NOT NULL, error_message TEXT, metadata JSONB, integrity_hash VARCHAR(64) NOT NULL, -- SHA-256 created_at TIMESTAMP NOT NULL ); CREATE INDEX idx_audit_logs_user_id ON audit_logs(user_id); CREATE INDEX idx_audit_logs_action ON audit_logs(action); CREATE INDEX idx_audit_logs_created_at ON audit_logs(created_at); CREATE INDEX idx_audit_logs_resource ON audit_logs(resource_type, resource_id);

Redis Database Organization

Redis Databases (0-15):

DBPurposeTTLKeys
0General cachingVaries (15min-24h)cache:*, user:*
1ARQ job queueN/Aarq:*
2L2 cache (multi-level)Variescache:l2:*
3Token revocationToken expirytoken:revoked:*, user:revoked:*
15Test databaseN/A(cleared after tests)

Qdrant Vector Database

Collection: medical_knowledge

{ "collection_name": "medical_knowledge", "vectors": { "size": 1536, # OpenAI text-embedding-3-small "distance": "Cosine" }, "payload_schema": { "document_id": "keyword", "chunk_index": "integer", "source_type": "keyword", # textbook, journal, guideline, note "title": "text", "content": "text", "metadata": "json" } }

Data Flow Architecture

Document Ingestion Flow:

File Upload → KBIndexer →
  1. Text Extraction (PyPDF2/pdfplumber)
  2. Chunking (500 chars, 50 overlap)
  3. Embedding Generation (OpenAI API)
  4. Vector Storage (Qdrant)
  5. Metadata Storage (PostgreSQL - future)
  6. Cache Invalidation

RAG Query Flow:

User Query → QueryOrchestrator →
  1. Check L1 Cache (embedding)
  2. Check L2 Cache (embedding)
  3. Generate Embedding (OpenAI API)
  4. Store in Cache (L2 + L1)
  5. Vector Search (Qdrant)
  6. Format Context
  7. LLM Generation (OpenAI GPT-4)
  8. Citation Extraction
  9. Response Streaming (WebSocket)

Authentication Flow:

Login Request → Auth API →
  1. Validate Credentials (bcrypt)
  2. Generate JWT Tokens (access + refresh)
  3. Store Session (PostgreSQL)
  4. Audit Log (audit_logs table)
  5. Return Tokens

Integration Architecture

Nextcloud Integration Pattern

Architecture Decision: Nextcloud is a separate deployment, VoiceAssist is a client.

Integration Points:

  1. CalDAV (Calendar)

    • Protocol: CalDAV (RFC 4791)
    • Library: caldav Python library
    • Operations: List calendars, create/update/delete events
    • Location: app/services/caldav_service.py
  2. WebDAV (Files)

    • Protocol: WebDAV (RFC 4918)
    • Library: webdavclient3
    • Operations: Discover files, download for indexing
    • Location: app/services/nextcloud_file_indexer.py
  3. OIDC (Authentication - Future)

    • Protocol: OpenID Connect
    • Flow: Authorization code flow
    • Provider: Nextcloud OIDC app
    • Status: Deferred to Phase 8+

Environment Configuration:

# Nextcloud Connection NEXTCLOUD_BASE_URL=http://localhost:8080 # or https://cloud.asimo.io NEXTCLOUD_ADMIN_USER=admin NEXTCLOUD_ADMIN_PASSWORD=secure_password # CalDAV NEXTCLOUD_CALDAV_URL=${NEXTCLOUD_BASE_URL}/remote.php/dav/calendars # WebDAV NEXTCLOUD_WEBDAV_URL=${NEXTCLOUD_BASE_URL}/remote.php/dav/files # OIDC (Future) NEXTCLOUD_OIDC_ISSUER=${NEXTCLOUD_BASE_URL}/apps/oidc NEXTCLOUD_CLIENT_ID=voiceassist NEXTCLOUD_CLIENT_SECRET=<from_nextcloud>

External API Integrations

OpenAI API:

  • Embeddings: text-embedding-3-small (1536 dimensions)
  • LLM: gpt-4-turbo-preview (configurable)
  • Usage: Document embedding, RAG response generation
  • Rate limiting: Handled by OpenAI client

Future Integrations (Phases 8+):

  • PubMed E-utilities API (medical literature search)
  • UpToDate API (evidence-based clinical references)
  • OpenEvidence API (guideline summaries)
  • Medical calculator libraries

Security Architecture

Authentication & Authorization

JWT Token Strategy:

  • Access Token: 15-minute expiry, HS256 algorithm
  • Refresh Token: 7-day expiry, HS256 algorithm
  • Token Revocation: Redis-based blacklist (individual + all-user-tokens)
  • Claims: sub (user_id), email, role, exp, iat, type

Password Security:

  • Hashing: bcrypt via passlib
  • Validation: Multi-criteria (8+ chars, upper, lower, digit, special)
  • Strength Scoring: 0-100 scale with Weak/Medium/Strong classification
  • Common Password Rejection: Blocks password, 123456, qwerty, etc.

RBAC (Role-Based Access Control):

  • Roles: admin, user (more roles in future phases)
  • Admin Enforcement: get_current_admin_user dependency
  • Protected Endpoints:
    • /api/admin/kb/* - Knowledge base management
    • /api/admin/panel/* - System dashboard
    • /api/integrations/* - Nextcloud integrations

Audit Logging

Compliance Features:

  • Immutable Trail: SHA-256 integrity hash on each log entry
  • Comprehensive Metadata: User, action, resource, timestamp, IP, user agent
  • Request Correlation: Request ID for distributed tracing
  • Tamper Detection: Integrity verification queries
  • HIPAA Alignment: Meets audit trail requirements

Logged Events:

  • User registration, login, logout
  • Token refresh, token revocation
  • Password changes, failed authentication
  • Admin operations (KB management, system config)
  • Document access and modifications

Data Protection

Encryption:

  • In Transit: HTTPS/TLS 1.2+ (production)
  • At Rest: Database-level encryption (future: PostgreSQL transparent encryption)
  • Tokens: JWT with signed claims
  • Passwords: bcrypt hashing (cost factor: 12)

PHI Protection (Future):

  • PHI detection service (Phase 8+)
  • Automatic log redaction
  • Local vs cloud AI routing based on PHI presence
  • Separate encryption keys for PHI data

Network Security

Docker Compose Network Isolation:

networks: voiceassist_network: driver: bridge internal: false # API gateway needs external access voiceassist_internal: driver: bridge internal: true # Database layer isolated

Future (Kubernetes - Phases 11-14):

  • Network policies for pod-to-pod restrictions
  • Service mesh (Linkerd) for mTLS
  • Ingress controller with WAF (Web Application Firewall)

Deployment Architecture

Development Environment (Docker Compose)

Current Stack:

# docker-compose.yml services: # Application Services voiceassist-server: build: ./services/api-gateway ports: ["8000:8000"] depends_on: [postgres, redis, qdrant] voiceassist-worker: build: ./services/api-gateway command: ["python", "-m", "app.worker.worker"] depends_on: [redis] # Data Layer postgres: image: pgvector/pgvector:pg16 ports: ["5432:5432"] volumes: [postgres_data:/var/lib/postgresql/data] redis: image: redis:7-alpine ports: ["6379:6379"] volumes: [redis_data:/data] qdrant: image: qdrant/qdrant:latest ports: ["6333:6333"] volumes: [qdrant_data:/qdrant/storage] # Observability (Phase 7+) prometheus: image: prom/prometheus:latest ports: ["9090:9090"] volumes: - ./infrastructure/observability/prometheus:/etc/prometheus grafana: image: grafana/grafana:latest ports: ["3000:3000"] volumes: - ./infrastructure/observability/grafana:/etc/grafana

Resource Allocation:

  • PostgreSQL: 2 CPU, 4GB RAM
  • Redis: 1 CPU, 1GB RAM
  • Qdrant: 2 CPU, 4GB RAM
  • API Gateway: 2 CPU, 4GB RAM
  • Worker: 1 CPU, 2GB RAM

Production Deployment (Future - Kubernetes)

Planned Architecture (Phases 11-14):

Kubernetes Cluster
├── Ingress (voiceassist.asimo.io)
│   └── SSL Termination (Let's Encrypt)
├── Service Mesh (Linkerd)
│   └── mTLS between all services
├── Microservices (2-10 replicas each)
│   ├── API Gateway (Kong/Nginx)
│   ├── Auth Service
│   ├── Realtime Service
│   ├── RAG Service
│   ├── Admin Service
│   └── Integration Service
├── Data Layer
│   ├── PostgreSQL (Primary + 2 Read Replicas)
│   ├── Redis Cluster (3 masters, 3 replicas)
│   └── Qdrant (3 replicas)
└── Observability
    ├── Prometheus (HA pair)
    ├── Grafana
    ├── Jaeger (distributed tracing)
    └── Loki (log aggregation)

Observability Architecture

Metrics Collection (Prometheus)

Instrumentation:

  • HTTP Metrics: Request count, latency (p50, p95, p99), status codes
  • Cache Metrics: Hit/miss rates by layer (L1, L2), size, evictions
  • RAG Metrics: Query latency, embedding generation time, search results
  • RBAC Metrics: Protected endpoint access, admin operations
  • Database Metrics: Connection pool utilization, query latency
  • External API Metrics: OpenAI call latency, rate limits

Metrics Endpoint:

  • Location: GET /metrics
  • Format: Prometheus exposition format
  • Protection: Optional authentication (configurable)

Service Level Objectives (SLOs)

Defined SLOs (Phase 7):

SLOTargetError BudgetMeasurement Window
API Availability99.9%43.2 min/month30 days
API Latency (P95)< 500ms-5 minutes
RAG Query Success99%1% failures24 hours
Cache Hit Rate> 40%-1 hour
Database P95 Latency< 100ms-5 minutes

Prometheus Recording Rules:

# API Availability (30-day) - record: slo:api_availability:ratio_rate30d expr: | sum(rate(voiceassist_http_requests_total{status_code=~"2..|3.."}[30d])) / sum(rate(voiceassist_http_requests_total[30d])) # Error Budget Remaining - record: slo:error_budget_remaining:percent expr: | 100 * (1 - ((1 - slo:api_availability:ratio_rate30d) / 0.001))

Alerting:

  • Multi-window, multi-burn-rate approach (Google SRE guidelines)
  • Critical alerts: SLO violations (< 99.9% availability)
  • Warning alerts: Error budget burn rate > 14.4x
  • Info alerts: Informational notifications

Logging Strategy

Structured Logging:

logger.info("user_login_success", extra={ "user_id": user.id, "email": user.email, "ip_address": request.client.host, "request_id": request.state.request_id, "timestamp": datetime.utcnow().isoformat() })

Log Levels:

  • DEBUG: Development only (not in production)
  • INFO: Normal operations, audit events
  • WARNING: Potential issues, deprecated API usage
  • ERROR: Errors requiring attention
  • CRITICAL: Service failures

Log Aggregation (Future - Loki):

  • Centralized log storage
  • Full-text search
  • Log correlation by request ID
  • PHI redaction applied automatically

Dashboards (Grafana)

Implemented Dashboards (Phase 7):

  1. Health Monitoring Dashboard (health-monitoring.json)

    • System overview (CPU, memory, disk)
    • Service health status
    • Database connection pool
    • Redis memory usage
    • Qdrant storage
  2. SLO Overview Dashboard (slo-overview.json)

    • API availability (30d)
    • Error budget remaining
    • Error budget burn rate
    • API latency (P50, P95, P99)
    • Cache hit rates
  3. Security Audit Dashboard (security-audit.json)

    • Recent authentication events
    • Failed login attempts
    • Token revocations
    • Admin operations
    • Audit log integrity status

Data Flow Examples

Example 1: User Registration and Login

1. User Registration
   ├─> POST /api/auth/register {email, password}
   ├─> Password Validator: Check strength
   ├─> User Model: Create with bcrypt hash
   ├─> PostgreSQL: Insert into users table
   ├─> Audit Service: Log registration event
   └─> Response: {user_id, email}

2. User Login
   ├─> POST /api/auth/login {email, password}
   ├─> User Model: Query by email
   ├─> Security Service: Verify password (bcrypt)
   ├─> Token Service: Generate JWT tokens (access + refresh)
   ├─> Session Model: Create session record
   ├─> Audit Service: Log login event
   └─> Response: {access_token, refresh_token, user}

3. Authenticated Request
   ├─> GET /api/auth/me
   ├─> Header: Authorization: Bearer <access_token>
   ├─> Dependency: get_current_user
   ├─> Token Service: Decode and validate JWT
   ├─> Token Revocation: Check Redis blacklist
   ├─> User Model: Query user details
   └─> Response: {user}

Example 2: RAG Query with Caching

1. User Query via WebSocket
   ├─> WS /api/realtime/ws
   ├─> Client: {"type": "message", "content": "What is diabetic ketoacidosis?"}
   ├─> Realtime Service: Parse and validate
   └─> Forward to QueryOrchestrator

2. RAG Pipeline
   ├─> QueryOrchestrator: handle_query()
   ├─> SearchAggregator: generate_query_embedding()
   │   ├─> CacheService: Check L1 cache (LRU)
   │   ├─> CacheService: Check L2 cache (Redis)
   │   ├─> Cache Miss → OpenAI API: Create embedding
   │   └─> CacheService: Store in L2 + L1 (24h TTL)
   ├─> SearchAggregator: search() in Qdrant
   │   ├─> Qdrant: Cosine similarity search (top_k=5)
   │   └─> Return: List[SearchResult]
   ├─> SearchAggregator: format_context_for_rag()
   ├─> LLMClient: generate() with context
   │   └─> OpenAI API: GPT-4 generation
   └─> SearchAggregator: extract_citations()

3. Streaming Response
   ├─> Realtime Service: Stream response chunks
   │   ├─> Send: {"type": "message_start", "message_id": "..."}
   │   ├─> Send: {"type": "message_chunk", "content": "Diabetic..."}
   │   ├─> Send: {"type": "message_chunk", "content": " ketoacidosis..."}
   │   └─> Send: {"type": "message_complete", "citations": [...]}
   └─> Client: Receives streaming response

Example 3: Document Upload and Indexing

1. Admin Upload
   ├─> POST /api/admin/kb/documents
   ├─> Dependency: get_current_admin_user (RBAC check)
   ├─> File: multipart/form-data (PDF or TXT)
   └─> Forward to KBIndexer

2. Document Processing
   ├─> KBIndexer: index_pdf_document() or index_document()
   ├─> Text Extraction: PyPDF2 or pdfplumber
   ├─> Chunking: 500 chars, 50 overlap
   ├─> For each chunk:
   │   ├─> OpenAI API: Create embedding (1536 dims)
   │   ├─> Qdrant: Store vector with metadata
   │   │   └─> Payload: {document_id, chunk_index, title, content, source_type}
   │   └─> Metrics: Track chunks_indexed
   └─> Return: IndexingResult {document_id, chunks_indexed, success}

3. Response to Admin
   ├─> Success Envelope: {success: true, data: {...}}
   ├─> Cache Invalidation: Clear L1 + L2 caches
   ├─> Audit Log: Document upload event
   └─> Prometheus Metrics: Increment kb_documents_indexed_total

Example 4: Calendar Event Creation via Nextcloud

1. Create Event Request
   ├─> POST /api/integrations/calendar/events
   ├─> Dependency: get_current_user (authentication)
   ├─> Body: {summary, start, end, description, location}
   └─> Forward to CalDAVService

2. CalDAV Integration
   ├─> CalDAVService: create_event()
   ├─> Connect to Nextcloud CalDAV
   │   └─> URL: {NEXTCLOUD_BASE_URL}/remote.php/dav/calendars/{user}/default
   ├─> Create iCalendar event (vobject)
   │   └─> VEVENT with SUMMARY, DTSTART, DTEND, DESCRIPTION, LOCATION
   ├─> Save to Nextcloud calendar
   └─> Return: Event UID

3. Response
   ├─> Success Envelope: {success: true, data: {event_uid: "..."}}
   ├─> Future: Send notification to user
   └─> Audit Log: Calendar event created

Technology Stack

Backend

ComponentTechnologyVersionPurpose
LanguagePython3.11+Primary backend language
FrameworkFastAPI0.104+Async web framework
ORMSQLAlchemy2.0+Database ORM
MigrationsAlembic1.12+Database schema versioning
ValidationPydantic2.4+Data validation and settings
Authenticationpython-jose3.3+JWT token handling
Password Hashingpasslib1.7+bcrypt hashing
HTTP Clienthttpx0.25+Async HTTP requests
Job QueueARQ0.25+Async background jobs

Databases & Storage

ComponentTechnologyVersionPurpose
RDBMSPostgreSQL16Primary relational database
Vector Extensionpgvector0.5+Vector storage in PostgreSQL
Cache/QueueRedis7+Caching, sessions, job queue
Vector DBQdrant1.7+Semantic search

AI & ML

ComponentTechnologyPurpose
EmbeddingsOpenAI text-embedding-3-small1536-dim embeddings
LLMOpenAI GPT-4 TurboResponse generation
FutureBioGPT, PubMedBERTMedical-specific models

Integrations

ComponentTechnologyPurpose
Calendarcaldav (Python library)CalDAV protocol support
Fileswebdavclient3WebDAV protocol support
Emailimaplib, smtplibIMAP/SMTP (future)
PDF ProcessingPyPDF2, pdfplumberText extraction

Observability

ComponentTechnologyVersionPurpose
MetricsPrometheus2.47+Metrics collection
Metrics Clientprometheus-client0.19+Python instrumentation
DashboardsGrafana10.2+Visualization
Future: TracingJaeger-Distributed tracing
Future: LoggingLoki-Log aggregation

Infrastructure

ComponentTechnologyVersionPurpose
ContainerizationDocker24+Container runtime
OrchestrationDocker Compose2.23+Multi-container orchestration
Future: K8sKubernetes1.28+Production orchestration
Future: Service MeshLinkerd2.14+mTLS, observability

Architecture Evolution

Phase-by-Phase Evolution

Phase 0-1: Foundation

  • Docker Compose setup
  • PostgreSQL, Redis, Qdrant
  • Health endpoints
  • Database migrations

Phase 2-3: Security & Core Services

  • JWT authentication
  • Password validation and hashing
  • Token revocation
  • Nextcloud integration skeleton
  • API Gateway solidified
  • Core endpoint structure

Phase 4: Realtime Communication

  • WebSocket endpoint
  • QueryOrchestrator integration
  • Message streaming protocol
  • Ping/pong keepalive

Phase 5: Medical AI

  • Document ingestion (PDF, TXT)
  • OpenAI embeddings
  • Qdrant vector storage
  • RAG pipeline
  • Semantic search
  • Citation tracking

Phase 6: Nextcloud Integration

  • CalDAV calendar operations
  • WebDAV file discovery
  • Automatic file indexing
  • Email service skeleton

Phase 7: Admin & RBAC

  • Role-based access control
  • Admin-only endpoints
  • Admin dashboard API
  • Smoke tests for RBAC

Future Phases (8-14):

  • OIDC authentication
  • Complete email integration
  • Frontend apps (Web Client, Admin Panel UI)
  • Voice processing (Thinker-Talker pipeline; legacy Realtime API fallback)
  • Specialized medical models
  • Microservices extraction (if needed)
  • Kubernetes deployment
  • Service mesh (Linkerd)
  • Advanced observability (Jaeger, Loki)

Migration to Microservices (When Needed)

Trigger Conditions:

  • 50 concurrent users

  • Team size > 5 developers
  • Independent scaling requirements
  • Different deployment cycles
  • Regulatory requirements

Extraction Strategy:

  1. Phase 11: Prepare

    • Ensure clean module boundaries
    • Extract shared code to library
    • Define API contracts
    • Independent service tests
  2. Phase 12: Extract Services

    • Start with independent services (Search, PHI Detection)
    • Extract core services (Auth, RAG, Admin)
    • Extract shared services last (Integrations)
  3. Phase 13: Deploy to Kubernetes

    • Create Dockerfiles per service
    • Create K8s manifests (Deployments, Services, ConfigMaps, Secrets)
    • Set up service mesh (Linkerd)
    • Deploy to dev cluster, then production

Design Decisions and Trade-offs

1. Monorepo vs Microservices (Phases 0-10)

Decision: Start with monorepo, maintain logical service boundaries

Rationale:

  • Faster development iteration
  • Simpler debugging (single codebase)
  • Lower operational complexity
  • Easier testing (no distributed systems challenges)
  • Suitable for < 50 concurrent users

Trade-offs:

  • Pros: Speed, simplicity, shared dependencies
  • Cons: Single deployment unit, harder to scale independently
  • Mitigation: Clear module boundaries enable future extraction

2. JWT vs Session-Based Authentication

Decision: JWT with short-lived access tokens + refresh tokens

Rationale:

  • Stateless authentication (scales horizontally)
  • No server-side session storage required
  • Works well with SPAs and mobile apps
  • Industry standard for API authentication

Trade-offs:

  • Pros: Scalable, stateless, widely supported
  • Cons: Cannot revoke tokens without additional infrastructure
  • Mitigation: Redis-based token revocation blacklist

3. Multi-Level Caching (L1 + L2)

Decision: In-memory LRU cache (L1) + Redis distributed cache (L2)

Rationale:

  • L1 provides ultra-low latency for hot data
  • L2 provides distributed caching across instances
  • Automatic promotion from L2 to L1 on cache hits

Trade-offs:

  • Pros: Fast, distributed, high hit rate
  • Cons: More complex invalidation, cache consistency
  • Mitigation: TTLs on all cached data, explicit invalidation APIs

4. OpenAI Embeddings vs Self-Hosted Models

Decision: Use OpenAI text-embedding-3-small for MVP

Rationale:

  • High quality embeddings (1536 dimensions)
  • No infrastructure overhead
  • Fast API responses
  • Easy integration

Trade-offs:

  • Pros: Quality, speed, simplicity
  • Cons: External dependency, cost per API call, data privacy
  • Mitigation: Future migration to BioGPT/PubMedBERT for medical-specific embeddings

5. ARQ vs Celery for Background Jobs

Decision: ARQ (Async Redis Queue)

Rationale:

  • Simpler than Celery (no separate broker required)
  • Native async/await support
  • Lightweight, fast
  • Redis-backed (already using Redis)

Trade-offs:

  • Pros: Simple, async-native, fast
  • Cons: Less mature than Celery, fewer features
  • Mitigation: Sufficient for current needs, can migrate to Celery if needed

6. Docker Compose vs Kubernetes (Phases 0-10)

Decision: Docker Compose for development and initial production

Rationale:

  • Simple local development
  • Easy to understand and debug
  • Suitable for single-server deployment
  • Lower operational complexity

Trade-offs:

  • Pros: Simplicity, speed, low overhead
  • Cons: Limited scaling, no auto-healing, single point of failure
  • Mitigation: Migrate to Kubernetes when scaling requirements justify complexity

7. Nextcloud Separation vs Integrated Deployment

Decision: Nextcloud as separate stack, VoiceAssist as client

Rationale:

  • Nextcloud is complex, mature, independently managed
  • Allows using existing Nextcloud installations
  • Clear separation of concerns
  • Independent update cycles

Trade-offs:

  • Pros: Flexibility, clear boundaries, reuse existing infrastructure
  • Cons: More complex configuration, network dependency
  • Mitigation: Well-defined API contracts, robust error handling

8. Synchronous vs Asynchronous Service Communication

Decision: Synchronous (direct function calls) in monorepo, async (message queue) for long-running jobs

Rationale:

  • Synchronous is simpler and faster for request-response patterns
  • Async is better for fire-and-forget and long-running tasks
  • Most operations in VoiceAssist are request-response

Trade-offs:

  • Pros: Simple, fast, easy to debug
  • Cons: Tighter coupling, harder to scale independently
  • Mitigation: Clear service boundaries enable future async migration

Core Architecture:

Design Documents:

Operations:

Development:

Security & Compliance:


Document Version: 1.0 Last Updated: 2025-11-20 Maintained By: VoiceAssist Development Team Review Cycle: Updated after each major phase completion


VoiceAssist V2 - Backend Architecture

Last Updated: 2025-11-27 (All 15 Phases Complete) Status: Canonical Reference Purpose: Clarify backend structure evolution from monorepo to microservices


Overview

VoiceAssist V2 backend follows a progressive architecture strategy:

  • Phases 0-10: Monorepo structure with clear module boundaries (Docker Compose)
  • Phases 11-14: Optional split into microservices (Kubernetes)

This document explains both approaches and when to use each.


Table of Contents

  1. Development Evolution
  2. Monorepo Structure (Phases 0-10)
  3. Microservices Structure (Phases 11-14)
  4. When to Split
  5. Service Boundaries
  6. Migration Path

Repository Layout for Backend

IMPORTANT: The canonical backend is services/api-gateway/. The server/ directory is a deprecated legacy stub and should NOT be used.

The production backend code lives in:

  • services/api-gateway/app/ – The production API Gateway (FastAPI)

    • app/api/ – 20+ API modules (auth, conversations, admin, voice, etc.)
    • app/core/ – Configuration, security, database, logging
    • app/models/ – SQLAlchemy ORM models
    • app/schemas/ – Pydantic request/response schemas
    • app/services/ – 40+ business logic services
    • app/middleware/ – Request middleware (rate limiting)
  • server/DEPRECATED - Legacy stub kept only for historical reference. Do not use for new development.

All new backend development should occur in services/api-gateway/.

Development Evolution

Phase-Based Approach

Phases 0-10: Monorepo + Docker Compose
    ├─ Single FastAPI application
    ├─ Clear module boundaries
    ├─ Faster development iteration
    └─ Production-ready for < 50 concurrent users

Phases 11-14: Microservices + Kubernetes (Optional)
    ├─ Extract modules to separate services
    ├─ Independent scaling
    ├─ Suitable for > 50 concurrent users
    └─ K8s orchestration

Why Start with Monorepo?

Advantages:

  • Faster Development: Single codebase, shared models, easier refactoring
  • Simpler Debugging: All code in one place, unified logging
  • Lower Complexity: No distributed tracing, service mesh, or K8s initially
  • Easier Testing: Integration tests within single app
  • Shared Dependencies: Common libraries, models, utilities

When It's Sufficient:

  • Development and testing phases
  • Deployment to single server
  • < 50 concurrent users
  • Team size < 5 developers

Production Structure (All 15 Phases Complete)

Directory Layout

services/api-gateway/
├── app/
│   ├── main.py                 # FastAPI application entry point
│   ├── api/                    # API routes (20+ modules)
│   │   ├── __init__.py
│   │   ├── auth.py             # Authentication endpoints
│   │   ├── users.py            # User management
│   │   ├── conversations.py    # Chat/conversation management
│   │   ├── admin_panel.py      # Admin dashboard
│   │   ├── admin_kb.py         # Knowledge base admin
│   │   ├── admin_cache.py      # Cache management
│   │   ├── admin_feature_flags.py # Feature flags
│   │   ├── voice.py            # Voice endpoints
│   │   ├── realtime.py         # WebSocket handling
│   │   ├── medical_ai.py       # Medical AI endpoints
│   │   ├── health.py           # Health checks
│   │   └── ...                 # Additional modules
│   │
│   ├── services/               # Business logic (40+ services)
│   │   ├── __init__.py
│   │   ├── rag_service.py      # RAG pipeline orchestration
│   │   ├── phi_detector.py     # PHI detection logic
│   │   ├── voice_service.py    # Voice transcription/TTS
│   │   ├── kb_indexer.py       # Knowledge base indexing
│   │   ├── ai_router.py        # Local vs cloud AI routing
│   │   ├── search_service.py   # Vector search
│   │   ├── external_apis/      # External API integrations
│   │   │   ├── uptodate.py
│   │   │   ├── pubmed.py
│   │   │   └── nextcloud.py
│   │   └── audit_logger.py     # Audit logging service
│   │
│   ├── models/                 # SQLAlchemy ORM models
│   │   ├── __init__.py
│   │   ├── base.py             # Base model class
│   │   ├── user.py             # User model
│   │   ├── session.py          # Session/Conversation model
│   │   ├── message.py          # ChatMessage model
│   │   ├── document.py         # KnowledgeDocument model
│   │   ├── chunk.py            # KBChunk model
│   │   ├── settings.py         # UserSettings, SystemSettings models
│   │   └── audit.py            # AuditLogEntry model
│   │
│   ├── schemas/                # Pydantic schemas (from DATA_MODEL.md)
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── session.py
│   │   ├── message.py
│   │   ├── document.py
│   │   ├── citation.py
│   │   └── settings.py
│   │
│   ├── core/                   # Core configuration and utilities
│   │   ├── __init__.py
│   │   ├── config.py           # Settings (Pydantic Settings)
│   │   ├── database.py         # Database session management
│   │   ├── vector_db.py        # Qdrant client
│   │   ├── redis_client.py     # Redis client
│   │   ├── security.py         # JWT, password hashing
│   │   ├── dependencies.py     # FastAPI dependencies
│   │   └── middleware.py       # Custom middleware
│   │
│   ├── utils/                  # Utility functions
│   │   ├── __init__.py
│   │   ├── chunking.py         # Text chunking utilities
│   │   ├── pdf_parser.py       # PDF parsing
│   │   ├── embeddings.py       # Embedding generation
│   │   └── validators.py       # Custom validators
│   │
│   └── tasks/                  # Background tasks (Celery)
│       ├── __init__.py
│       ├── indexing.py         # Document indexing tasks
│       └── cleanup.py          # Maintenance tasks
│
├── tests/                      # Test suite
│   ├── unit/                   # Unit tests
│   ├── integration/            # Integration tests
│   └── e2e/                    # End-to-end tests
│
├── alembic/                    # Database migrations
│   ├── versions/
│   └── env.py
│
├── requirements.txt            # Python dependencies
├── Dockerfile                  # Docker image definition
├── docker-compose.yml          # Local development setup
├── .env.example                # Environment variables template
└── README.md                   # Backend documentation

FastAPI Application Structure

app/main.py:

from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.core.config import settings from app.core.middleware import setup_middleware from app.api import auth, chat, search, admin, voice, documents, users # Create FastAPI app app = FastAPI( title=settings.PROJECT_NAME, version=settings.VERSION, openapi_url=f"{settings.API_V1_STR}/openapi.json" ) # Setup middleware setup_middleware(app) # Include routers app.include_router(auth.router, prefix=f"{settings.API_V1_STR}/auth", tags=["auth"]) app.include_router(chat.router, prefix=f"{settings.API_V1_STR}/chat", tags=["chat"]) app.include_router(search.router, prefix=f"{settings.API_V1_STR}/search", tags=["search"]) app.include_router(admin.router, prefix=f"{settings.API_V1_STR}/admin", tags=["admin"]) app.include_router(voice.router, prefix=f"{settings.API_V1_STR}/voice", tags=["voice"]) app.include_router(documents.router, prefix=f"{settings.API_V1_STR}/documents", tags=["documents"]) app.include_router(users.router, prefix=f"{settings.API_V1_STR}/users", tags=["users"]) @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy"}

Service Layer Pattern

Each "service" is a Python module with clear responsibilities:

app/services/rag_service.py:

from typing import List, Dict from app.services.search_service import SearchService from app.services.ai_router import AIRouter from app.services.phi_detector import PHIDetector from app.schemas.message import ChatMessage from app.schemas.citation import Citation class RAGService: """Orchestrates RAG pipeline""" def __init__(self): self.search = SearchService() self.ai_router = AIRouter() self.phi_detector = PHIDetector() async def process_query( self, query: str, session_id: str, clinical_context: Optional[Dict] = None ) -> Dict: """ Process user query through RAG pipeline: 1. Detect PHI 2. Search knowledge base 3. Route to appropriate AI model 4. Generate response with citations """ # 1. PHI Detection phi_result = await self.phi_detector.detect(query) # 2. Search KB search_results = await self.search.search( query=query, filters={"specialty": clinical_context.get("specialty")} ) # 3. Route to AI model model = self.ai_router.select_model(phi_detected=phi_result.has_phi) # 4. Generate response response = await model.generate( query=query, context=search_results, clinical_context=clinical_context ) return { "content": response.text, "citations": response.citations, "model_used": model.name, "phi_detected": phi_result.has_phi }

Module Boundaries

Even in monorepo, maintain strict boundaries:

ModuleResponsibilityCan Import FromCannot Import From
api/HTTP endpoints, request/responseservices/, schemas/, core/models/ directly
services/Business logicmodels/, schemas/, core/, other services/api/
models/Database ORMcore/api/, services/
schemas/Pydantic modelsNothing (pure data)Everything
core/Config, database, securityNothing (foundational)api/, services/, models/

Docker Compose Setup

docker-compose.yml:

version: "3.8" services: # Backend API (monorepo) backend: build: ./server ports: - "8000:8000" environment: - DATABASE_URL=postgresql://user:pass@postgres:5432/voiceassist - REDIS_URL=redis://redis:6379 - QDRANT_URL=http://qdrant:6333 depends_on: - postgres - redis - qdrant volumes: - ./server:/app - ./data/uploads:/app/data/uploads # PostgreSQL postgres: image: postgres:15 environment: - POSTGRES_USER=voiceassist - POSTGRES_PASSWORD=password - POSTGRES_DB=voiceassist volumes: - postgres_data:/var/lib/postgresql/data # Redis redis: image: redis:7 volumes: - redis_data:/data # Qdrant Vector DB qdrant: image: qdrant/qdrant ports: - "6333:6333" volumes: - qdrant_data:/qdrant/storage # Nextcloud (Phase 2+) nextcloud: image: nextcloud:29-apache ports: - "8080:80" environment: - POSTGRES_HOST=nextcloud-db - NEXTCLOUD_ADMIN_USER=${NEXTCLOUD_ADMIN_USER} - NEXTCLOUD_ADMIN_PASSWORD=${NEXTCLOUD_ADMIN_PASSWORD} depends_on: - nextcloud-db volumes: - nextcloud_data:/var/www/html # Nextcloud Database (Phase 2+) nextcloud-db: image: postgres:16-alpine environment: - POSTGRES_DB=nextcloud - POSTGRES_USER=nextcloud - POSTGRES_PASSWORD=${NEXTCLOUD_DB_PASSWORD} volumes: - nextcloud_db_data:/var/lib/postgresql/data volumes: postgres_data: redis_data: qdrant_data: nextcloud_data: nextcloud_db_data:

Microservices Structure (Phases 11-14)

When to Split

Trigger Conditions:

  • Deployment to Kubernetes cluster
  • Need for independent scaling (e.g., voice service needs more resources)
  • Team growth (> 5 developers, need ownership boundaries)
  • Different deployment cycles (e.g., ML model updates vs API changes)
  • Regulatory requirements (e.g., PHI handling in separate service)

Service Decomposition

Extract modules from monorepo into separate services:

services/
├── api-gateway/            # Kong or Nginx (routing, rate limiting)
│   ├── kong.yml
│   └── Dockerfile
│
├── auth-service/           # Authentication (from app/api/auth.py + app/services/auth)
│   ├── app/
│   │   ├── main.py
│   │   ├── api/
│   │   └── services/
│   ├── Dockerfile
│   └── requirements.txt
│
├── chat-service/           # Chat/conversations (from app/api/chat.py + app/services/rag_service.py)
│   ├── app/
│   │   ├── main.py
│   │   ├── api/
│   │   └── services/
│   ├── Dockerfile
│   └── requirements.txt
│
├── knowledge-base-service/ # KB management (from app/api/documents.py + app/services/kb_indexer.py)
│   ├── app/
│   │   ├── main.py
│   │   ├── api/
│   │   └── services/
│   ├── Dockerfile
│   └── requirements.txt
│
├── voice-service/          # Voice/WebSocket (from app/api/voice.py + app/services/voice_service.py)
│   ├── app/
│   │   ├── main.py
│   │   ├── api/
│   │   └── services/
│   ├── Dockerfile
│   └── requirements.txt
│
├── search-service/         # Vector search (from app/services/search_service.py)
│   ├── app/
│   │   ├── main.py
│   │   ├── api/
│   │   └── services/
│   ├── Dockerfile
│   └── requirements.txt
│
├── admin-service/          # Admin panel API (from app/api/admin.py)
│   ├── app/
│   │   ├── main.py
│   │   ├── api/
│   │   └── services/
│   ├── Dockerfile
│   └── requirements.txt
│
└── shared/                 # Shared libraries
    ├── models/             # Shared SQLAlchemy models
    ├── schemas/            # Shared Pydantic schemas (from DATA_MODEL.md)
    └── utils/              # Shared utilities

Service Communication

Synchronous (HTTP/REST):

  • API Gateway → Services: REST API calls
  • Service → Service: HTTP with service discovery (K8s DNS)

Asynchronous (Message Queue):

  • Document indexing: Publish to RabbitMQ/Redis queue
  • Audit logging: Async events to audit service

Shared Data:

  • PostgreSQL: Shared database (schema per service if needed)
  • Redis: Shared cache
  • Qdrant: Shared vector DB

Kubernetes Deployment

Example: Chat Service

k8s/chat-service.yaml:

apiVersion: apps/v1 kind: Deployment metadata: name: chat-service spec: replicas: 3 selector: matchLabels: app: chat-service template: metadata: labels: app: chat-service spec: containers: - name: chat-service image: voiceassist/chat-service:latest ports: - containerPort: 8000 env: - name: DATABASE_URL valueFrom: secretKeyRef: name: db-secret key: url - name: REDIS_URL value: redis://redis-service:6379 resources: requests: memory: "512Mi" cpu: "500m" limits: memory: "1Gi" cpu: "1000m" --- apiVersion: v1 kind: Service metadata: name: chat-service spec: selector: app: chat-service ports: - port: 80 targetPort: 8000 type: ClusterIP

When to Split

Decision Matrix

FactorMonorepoMicroservices
Team Size< 5 developers> 5 developers
Concurrent Users< 50 users> 50 users
DeploymentSingle serverMulti-node K8s cluster
Scaling NeedsVertical scaling OKNeed horizontal scaling
Development SpeedFaster (single codebase)Slower (coordination overhead)
Operational ComplexityLow (Docker Compose)High (K8s, service mesh)
CostLower (single server)Higher (multiple servers)
RegulatoryOK for small clinicsRequired for large hospitals
  1. Phases 0-10: Start with monorepo + Docker Compose
  2. Phase 10 End: Evaluate scaling needs
  3. If < 50 users: Stay with monorepo, deploy to single Ubuntu server
  4. If > 50 users: Proceed to Phases 11-14, split into microservices + K8s

Service Boundaries

Logical Services (Monorepo Modules)

These are the logical boundaries, whether in monorepo or microservices:

  1. Authentication Service (app/api/auth.py + app/core/security.py)

    • User registration with email validation
    • User login/logout with JWT tokens
    • JWT token management:
      • Access tokens (15-minute expiry, HS256 algorithm)
      • Refresh tokens (7-day expiry)
      • Token verification and validation
      • Token revocation via Redis (app/services/token_revocation.py):
        • Dual-level revocation (individual tokens + all user tokens)
        • Fail-open design for Redis unavailability
        • Automatic TTL management
        • Immediate session invalidation on logout
    • Password hashing using bcrypt (via passlib)
    • Advanced password validation (app/core/password_validator.py):
      • Multi-criteria validation (uppercase, lowercase, digits, special chars)
      • Password strength scoring (0-100)
      • Common password rejection
      • Sequential and repeated character detection
    • Rate limiting on authentication endpoints:
      • Registration: 5 requests/hour per IP
      • Login: 10 requests/minute per IP
      • Token refresh: 20 requests/minute per IP
    • Authentication middleware (get_current_user, get_current_admin_user)
    • Protected endpoints with JWT dependency injection
    • Comprehensive audit logging for all authentication events (see Audit Service below)
  2. Chat Service (app/api/chat.py + app/services/rag_service.py)

    • Conversation management
    • Message processing
    • RAG pipeline orchestration
    • Response generation
  3. Knowledge Base Service (app/api/documents.py + app/services/kb_indexer.py)

    • Document upload
    • Document processing
    • Indexing jobs
    • KB management
  4. Search Service (app/services/search_service.py)

    • Vector search
    • Semantic search
    • Hybrid search (vector + keyword)
    • Result reranking
  5. Voice Service (app/api/voice.py + app/services/voice_service.py)

    • WebSocket connections
    • Audio transcription
    • Text-to-speech
    • Voice mode management
  6. Admin Service (app/api/admin.py)

    • User management
    • System settings
    • Analytics dashboard
    • Audit log access
  7. PHI Detection Service (app/services/phi_detector.py)

    • PHI detection
    • AI model routing
    • Local vs cloud decision
  8. External APIs Service (app/services/external_apis/)

    • Nextcloud Integration (app/services/nextcloud.py):
      • OCS API client for user provisioning
      • User creation and management via REST API
      • Health check for Nextcloud connectivity
      • Authentication with admin credentials
      • WebDAV integration (future phase)
    • PubMed integration (future phase)
    • UpToDate integration (future phase)
    • External search aggregation (future phase)
  9. Audit Service (app/services/audit_service.py + app/models/audit_log.py)

    • HIPAA-compliant audit logging:
      • Immutable audit trail with SHA-256 integrity verification
      • Comprehensive metadata capture (user, action, resource, timestamp)
      • Request context tracking (IP address, user agent, request ID)
      • Service context (service name, endpoint, status)
      • Success/failure tracking with error details
      • JSON metadata for additional context
    • Automated logging for authentication events:
      • User registration, login, logout
      • Token refresh, token revocation
      • Password changes, failed authentication attempts
    • Query capabilities:
      • Retrieve audit logs by user, action, timerange
      • Integrity verification for tamper detection
      • Composite indexes for efficient queries
    • Database table: audit_logs (PostgreSQL with JSONB support)

Core Infrastructure

Request ID Middleware (app/core/request_id.py):

  • Generates unique UUID v4 for each request
  • Accepts client-provided request IDs via X-Request-ID header
  • Returns request ID in response header for correlation
  • Enables distributed tracing across services
  • Stored in request.state.request_id for access in route handlers

API Envelope Standardization (app/core/api_envelope.py):

  • Consistent response format for all endpoints:
    { "success": true/false, "data": {...} | null, "error": {code, message, details, field} | null, "metadata": {version, request_id, pagination}, "timestamp": "2024-11-20T12:00:00Z" }
  • Standard error codes (ErrorCodes class):
    • INVALID_CREDENTIALS, TOKEN_EXPIRED, TOKEN_REVOKED
    • WEAK_PASSWORD, VALIDATION_ERROR, NOT_FOUND
    • UNAUTHORIZED, FORBIDDEN, INTERNAL_ERROR
  • Helper functions:
    • success_response(data, request_id, version, pagination)
    • error_response(code, message, details, field, request_id)
  • Pagination support via PaginationMetadata model
  • Benefits:
    • Simplified client-side error handling
    • Consistent API experience across all endpoints
    • Built-in request correlation for debugging

API Contracts

Each service exposes REST API endpoints documented in OpenAPI/Swagger.

Example: Search Service API

POST /api/v1/search
  Request:
    {
      "query": "treatment for hypertension",
      "filters": {"specialty": "cardiology"},
      "limit": 10
    }
  Response:
    {
      "results": [
        {
          "document_id": "uuid",
          "title": "Harrison's Principles - Chapter 252",
          "snippet": "...",
          "relevance_score": 0.95
        }
      ]
    }

Migration Path

Step-by-Step Migration (Monorepo → Microservices)

Phase 11: Prepare for Split

  1. Ensure Clean Boundaries: Verify modules don't have circular dependencies
  2. Extract Shared Code: Move shared models/schemas to shared/ library
  3. Create Service Interfaces: Define API contracts for each service
  4. Add Service Tests: Test each module independently

Phase 12: Split Services

  1. Start with Independent Services: Extract services with fewest dependencies first

    • Search Service (only depends on Qdrant)
    • PHI Detection Service (self-contained)
  2. Extract Core Services: Move API-facing services next

    • Auth Service
    • Chat Service
    • Admin Service
  3. Last: Shared Services: Extract services used by others

    • Knowledge Base Service
    • External APIs Service

Phase 13: Deploy to Kubernetes

  1. Create Dockerfiles: One per service
  2. Create K8s Manifests: Deployments, Services, ConfigMaps, Secrets
  3. Set Up Service Mesh (optional): Istio or Linkerd for mTLS, observability
  4. Deploy to Dev Cluster: Test inter-service communication
  5. Deploy to Prod: Gradual rollout with monitoring

Shared Library Pattern

shared/ Package:

# shared/models/user.py from sqlalchemy import Column, String, Boolean from shared.models.base import Base class User(Base): __tablename__ = "users" id = Column(String, primary_key=True) email = Column(String, unique=True) # ... (same across all services)

Install shared library in each service:

pip install -e /path/to/shared

Or publish to private PyPI:

pip install voiceassist-shared==1.0.0

References


VoiceAssist Frontend Architecture

Last Updated: 2025-12-03 Status: Production Ready (Phases 0-3.5 Complete, Web App and Admin Panel stable) Detailed Spec: client-implementation/TECHNICAL_ARCHITECTURE.md


Overview

VoiceAssist uses a pnpm monorepo with Turborepo for build orchestration. All frontend applications share common packages for consistency, type safety, and code reuse.

Quick Facts

AspectTechnology
Package Managerpnpm 8+
Build SystemTurborepo
UI FrameworkReact 18+
LanguageTypeScript (strict mode)
BundlerVite (apps), Rollup (packages)
State ManagementZustand
StylingTailwind CSS
Component Libraryshadcn/ui + custom

Architecture Diagram

┌─────────────────────────────────────────────────────────────┐
│                         apps/                                │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │
│  │   web-app    │  │ admin-panel  │  │    docs-site     │  │
│  │   (Vite)     │  │   (Vite)     │  │   (Next.js 14)   │  │
│  │              │  │              │  │                  │  │
│  │ User-facing  │  │ Admin ops    │  │ Documentation    │  │
│  │ medical AI   │  │ dashboard    │  │ & guides         │  │
│  └──────┬───────┘  └──────┬───────┘  └────────┬─────────┘  │
│         │                 │                    │            │
│         └─────────────────┼────────────────────┘            │
│                           │                                 │
│                           ▼                                 │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    packages/                         │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │   │
│  │  │   ui     │ │  types   │ │  utils   │ │  api-  │ │   │
│  │  │          │ │          │ │          │ │ client │ │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └────────┘ │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐           │   │
│  │  │  config  │ │telemetry │ │ design-  │           │   │
│  │  │          │ │          │ │ tokens   │           │   │
│  │  └──────────┘ └──────────┘ └──────────┘           │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                   Backend (FastAPI)                          │
│                  services/api-gateway/                       │
└─────────────────────────────────────────────────────────────┘

Applications

1. Web App (apps/web-app/)

Main user-facing medical AI assistant application.

Features:

  • Unified Chat/Voice Interface (NEW) - Seamless text and voice mode in single view
  • Medical knowledge retrieval with citations
  • Document upload and management
  • Conversation history with branching
  • PHI-safe data handling

Key Components:

ComponentPathPurpose
UnifiedChatContainersrc/components/unified-chat/Three-panel layout with sidebar, main, context pane
CollapsibleSidebarsrc/components/unified-chat/Conversation list with pinning and search
UnifiedInputAreasrc/components/unified-chat/Text/voice mode toggle
CollapsibleContextPanesrc/components/unified-chat/Citations, clinical context, branches

Entry Point: src/main.tsx Dev Port: 5173 Documentation: See UNIFIED_CHAT_VOICE_UI.md

2. Admin Panel (apps/admin-panel/)

System administration and monitoring dashboard.

Features:

  • Real-time system metrics
  • User management (RBAC)
  • Knowledge base administration
  • Feature flag management
  • Audit log viewer

Entry Point: src/main.tsx Dev Port: 5174

3. Docs Site (apps/docs-site/)

Documentation website built with Next.js 14.

Features:

  • Markdown documentation rendering
  • Navigation from navigation.ts config
  • Support for docs from multiple locations (@root/ prefix)
  • Search functionality (planned)

Entry Point: src/app/layout.tsx Dev Port: 3000


Shared Packages

PackagePurposeKey Exports
@voiceassist/uiReact component libraryButton, Input, Card, ChatMessage, etc.
@voiceassist/typesTypeScript type definitionsAPI types, User, Session, Message, etc.
@voiceassist/utilsUtility functionsPHI detection, formatters, validators
@voiceassist/api-clientHTTP clientType-safe API calls, auto token injection
@voiceassist/configShared configurationsESLint, Prettier, Tailwind presets
@voiceassist/telemetryObservabilityError tracking, analytics helpers
@voiceassist/design-tokensDesign systemColors, typography, spacing tokens

Development Commands

# Install dependencies pnpm install # Start all apps in dev mode pnpm dev # Start specific app pnpm --filter web-app dev pnpm --filter admin-panel dev pnpm --filter docs-site dev # Build all packages pnpm build # Run tests pnpm test # Type checking pnpm type-check # Lint pnpm lint # Storybook (component library) pnpm storybook

State Management

Zustand is used for client-side state management.

// Store structure pattern interface AppStore { // Auth state user: User | null; token: string | null; // UI state sidebarOpen: boolean; theme: "light" | "dark"; // Actions login: (credentials: LoginCredentials) => Promise<void>; logout: () => void; }

API Communication

REST API

Use @voiceassist/api-client for all backend calls:

import { apiClient } from "@voiceassist/api-client"; // Typed API call with auto-token injection const sessions = await apiClient.conversations.list(); const session = await apiClient.conversations.create({ title: "New Chat" });

WebSocket

Real-time communication for streaming responses:

import { useWebSocket } from "@/hooks/useWebSocket"; const { connect, send, messages } = useWebSocket("/ws"); // Send message send({ type: "chat", content: "Hello" }); // Receive streaming response messages.forEach((msg) => { if (msg.type === "assistant_chunk") { appendToResponse(msg.content); } });

Key Design Patterns

1. Feature-based Organization

src/
├── features/
│   ├── chat/
│   │   ├── components/
│   │   ├── hooks/
│   │   ├── services/
│   │   └── index.ts
│   ├── auth/
│   └── admin/

2. Type-safe API Layer

All API calls are typed end-to-end using shared types from @voiceassist/types.

3. PHI Protection

Client-side PHI detection using @voiceassist/utils:

import { detectPHI, redactPHI } from "@voiceassist/utils"; if (detectPHI(userInput)) { // Warn user or apply redaction const safe = redactPHI(userInput); }


Version History

VersionDateChanges
1.1.02025-12-03Updated status to Production Ready (Phase 3.5 done)
1.0.02025-11-27Initial architecture document

VoiceAssist Real-time Architecture

Last Updated: 2025-11-27 Status: Production Ready

Related Documentation:


Overview

VoiceAssist uses WebSocket connections for real-time bidirectional communication, enabling:

  • Streaming chat responses - Token-by-token LLM output
  • Voice interactions - Speech-to-text and text-to-speech
  • Live updates - Typing indicators, connection status

Architecture Diagram

┌─────────────────────────────────────────────────────────────────────────┐
│                              Client                                      │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────────┐ │
│  │   Chat UI       │  │   Voice Input   │  │   Connection Manager    │ │
│  │                 │  │   (Web Audio)   │  │   - Reconnection        │ │
│  │   - Messages    │  │   - Mic capture │  │   - Heartbeat           │ │
│  │   - Streaming   │  │   - STT stream  │  │   - Token refresh       │ │
│  └────────┬────────┘  └────────┬────────┘  └────────────┬────────────┘ │
│           │                    │                        │               │
│           └────────────────────┼────────────────────────┘               │
│                                │                                        │
│                         ┌──────▼──────┐                                │
│                         │  WebSocket  │                                │
│                         │   Client    │                                │
│                         └──────┬──────┘                                │
└────────────────────────────────┼────────────────────────────────────────┘
                                 │
                          WSS/WS │
                                 │
┌────────────────────────────────┼────────────────────────────────────────┐
│                                │                                        │
│                         ┌──────▼──────┐                                │
│                         │  WebSocket  │                                │
│                         │   Handler   │                                │
│                         │  (FastAPI)  │                                │
│                         └──────┬──────┘                                │
│                                │                                        │
│           ┌────────────────────┼────────────────────┐                  │
│           │                    │                    │                   │
│    ┌──────▼──────┐      ┌──────▼──────┐     ┌──────▼──────┐           │
│    │   Chat      │      │   Voice     │     │ Connection  │           │
│    │   Service   │      │   Service   │     │   Manager   │           │
│    │             │      │             │     │             │           │
│    │ - RAG Query │      │ - STT       │     │ - Sessions  │           │
│    │ - LLM Call  │      │ - TTS       │     │ - Heartbeat │           │
│    │ - Streaming │      │ - VAD       │     │ - Auth      │           │
│    └──────┬──────┘      └──────┬──────┘     └─────────────┘           │
│           │                    │                                        │
│           └────────────────────┼────────────────────────────────────────┤
│                                │                                        │
│                         ┌──────▼──────┐                                │
│                         │   OpenAI    │                                │
│                         │   API       │                                │
│                         │             │                                │
│                         │ - GPT-4     │                                │
│                         │ - Whisper   │                                │
│                         │ - TTS       │                                │
│                         └─────────────┘                                │
│                                                                         │
│                              Backend                                    │
└─────────────────────────────────────────────────────────────────────────┘

Connection Lifecycle

1. Connection Establishment

Client                                    Server
  │                                         │
  ├──── WebSocket Connect ─────────────────►│
  │     (with token & conversationId)       │
  │                                         │
  │◄──── connection_established ────────────┤
  │      { connectionId, serverTime }       │
  │                                         │

2. Message Exchange

Client                                    Server
  │                                         │
  ├──── message ───────────────────────────►│
  │     { content: "Hello" }                │
  │                                         │
  │◄──── thinking ──────────────────────────┤
  │                                         │
  │◄──── assistant_chunk ───────────────────┤
  │      { content: "Hi" }                  │
  │◄──── assistant_chunk ───────────────────┤
  │      { content: " there" }              │
  │◄──── assistant_chunk ───────────────────┤
  │      { content: "!" }                   │
  │                                         │
  │◄──── message_complete ──────────────────┤
  │      { messageId, totalTokens }         │
  │                                         │

3. Heartbeat

Client                                    Server
  │                                         │
  ├──── ping ──────────────────────────────►│
  │                                         │
  │◄──── pong ──────────────────────────────┤
  │                                         │

WebSocket Endpoints

EndpointPurpose
/api/realtime/wsMain chat WebSocket
/api/voice/wsVoice-specific WebSocket (future)

Query Parameters

ParameterRequiredDescription
conversationIdYesUUID of the conversation session
tokenYesJWT access token

Connection URL Example

// Development ws://localhost:8000/api/realtime/ws?conversationId=uuid&token=jwt // Production wss://assist.asimo.io/api/realtime/ws?conversationId=uuid&token=jwt

Message Types

Client → Server

TypeDescription
messageSend user message
pingHeartbeat ping
stopCancel current response
voice_startBegin voice input (future)
voice_chunkAudio data chunk (future)
voice_endEnd voice input (future)

Server → Client

TypeDescription
connection_establishedConnection successful
thinkingAI is processing
assistant_chunkStreaming response chunk
message_completeResponse finished
errorError occurred
pongHeartbeat response
voice_transcriptSpeech-to-text result (future)
voice_audioTTS audio chunk (future)

Streaming Response Flow

RAG + LLM Pipeline

User Message → WebSocket Handler
                    │
                    ▼
            ┌───────────────┐
            │  RAG Service  │ ← Retrieves relevant context
            │               │   from Qdrant vector store
            └───────┬───────┘
                    │
                    ▼
            ┌───────────────┐
            │  LLM Client   │ ← Calls OpenAI with streaming
            │               │
            └───────┬───────┘
                    │
          ┌─────────┼─────────┐
          │         │         │
          ▼         ▼         ▼
       chunk_1   chunk_2   chunk_n
          │         │         │
          └─────────┼─────────┘
                    │
                    ▼
            WebSocket Send
            (per chunk)

Streaming Implementation

# Backend (FastAPI WebSocket handler) async def handle_message(websocket, message): # Send thinking indicator await websocket.send_json({"type": "thinking"}) # Get RAG context context = await rag_service.retrieve(message.content) # Stream LLM response async for chunk in llm_client.stream_chat(message.content, context): await websocket.send_json({ "type": "assistant_chunk", "content": chunk.content }) # Send completion await websocket.send_json({ "type": "message_complete", "messageId": str(uuid.uuid4()), "totalTokens": chunk.usage.total_tokens })

Voice Architecture (Future Enhancement)

Voice Input Flow

Microphone → Web Audio API → VAD (Voice Activity Detection)
                                      │
                                      ▼
                              Audio Chunks (PCM)
                                      │
                                      ▼
                              WebSocket Send
                                      │
                                      ▼
                              Server VAD + STT
                                      │
                                      ▼
                              Transcript Event

Voice Output Flow

LLM Response Text → TTS Service (OpenAI/ElevenLabs)
                           │
                           ▼
                    Audio Stream (MP3/PCM)
                           │
                           ▼
                    WebSocket Send (chunks)
                           │
                           ▼
                    Web Audio API Playback

Error Handling

Reconnection Strategy

class WebSocketClient { private reconnectAttempts = 0; private maxReconnectAttempts = 5; private baseDelay = 1000; // 1 second async reconnect() { const delay = Math.min( this.baseDelay * Math.pow(2, this.reconnectAttempts), 30000, // max 30 seconds ); await sleep(delay); this.reconnectAttempts++; if (this.reconnectAttempts < this.maxReconnectAttempts) { await this.connect(); } else { this.emit("connection_failed"); } } }

Error Types

Error CodeDescriptionClient Action
auth_failedInvalid/expired tokenRefresh token and reconnect
session_not_foundInvalid conversation IDCreate new session
rate_limitedToo many requestsBackoff and retry
server_errorInternal server errorRetry with backoff

Performance Considerations

Client-side

  1. Buffer chunks - Don't update DOM on every chunk
  2. Throttle renders - Use requestAnimationFrame
  3. Heartbeat interval - 30 seconds recommended

Server-side

  1. Connection pooling - Reuse OpenAI connections
  2. Chunk size - Optimize for network vs. latency
  3. Memory management - Clean up closed connections

Security

  1. Authentication - JWT token required in query params
  2. Rate limiting - Per-user connection limits
  3. Message validation - Schema validation on all messages
  4. TLS - WSS required in production


Version History

VersionDateChanges
1.0.02025-11-27Initial architecture document

Security & Compliance Guide

Overview

VoiceAssist V2 is designed as a HIPAA-compliant, zero-trust medical AI assistant that handles Protected Health Information (PHI). This document outlines security requirements, implementation strategies, and compliance procedures.

Table of Contents

  1. HIPAA Compliance
  2. Zero-Trust Architecture
  3. Encryption
  4. Authentication & Authorization
  5. PHI Detection & Redaction
  6. Audit Logging
  7. Network Security
  8. Data Retention & Disposal
  9. Incident Response
  10. Security Monitoring
  11. Compliance Checklists

HIPAA Compliance

HIPAA Security Rule Requirements

VoiceAssist implements the following HIPAA Security Rule requirements:

Administrative Safeguards

1. Security Management Process

  • Risk Analysis: Annual security risk assessments
  • Risk Management: Documented mitigation strategies
  • Sanction Policy: Employee discipline for violations
  • Information System Activity Review: Regular audit log reviews

2. Assigned Security Responsibility

  • Designated Security Official (Admin role)
  • Security incident response team
  • Regular security training

3. Workforce Security

  • Authorization/Supervision procedures
  • Workforce clearance procedures
  • Termination procedures (access revocation)

4. Information Access Management

  • Access Authorization policies
  • Access Establishment/Modification procedures
  • Role-Based Access Control (RBAC)

5. Security Awareness and Training

  • Security reminders (quarterly)
  • Protection from malicious software
  • Log-in monitoring
  • Password management training

6. Security Incident Procedures

  • Incident response plan
  • Incident reporting procedures
  • Incident documentation

7. Contingency Plan

  • Data backup plan (automated daily backups)
  • Disaster recovery plan
  • Emergency mode operation plan
  • Testing and revision procedures

8. Evaluation

  • Annual security evaluations
  • Periodic technical and non-technical evaluations

9. Business Associate Agreements

  • OpenAI API (Business Associate Agreement required)
  • UpToDate API (BAA required)
  • OpenEvidence API (BAA required)
  • Cloud hosting provider (BAA required if using cloud)

Physical Safeguards

1. Facility Access Controls

  • Contingency operations (backup power, redundancy)
  • Facility security plan (datacenter access controls)
  • Access control and validation procedures
  • Maintenance records

2. Workstation Use

  • Workstation security policies
  • Screen lock requirements (5 minutes idle)
  • Encrypted workstations

3. Workstation Security

  • Physical security of workstations
  • Restricted access to terminals

4. Device and Media Controls

  • Disposal procedures (secure wipe/destroy)
  • Media re-use procedures
  • Accountability tracking
  • Data backup and storage

Technical Safeguards

1. Access Control

  • Unique User Identification (via JWT tokens with email, Phase 2; Nextcloud OIDC in Phase 6+)
  • Emergency Access Procedure (admin override)
  • Automatic Logoff (access tokens expire after 15 minutes, refresh tokens after 7 days)
  • Encryption and Decryption (AES-256)
  • Rate limiting on authentication endpoints to prevent brute force attacks

2. Audit Controls

  • Hardware, software, and procedural mechanisms to record and examine activity

3. Integrity

  • Mechanism to authenticate ePHI is not improperly altered or destroyed
  • Digital signatures for critical data

4. Person or Entity Authentication

  • Verify that a person or entity seeking access is who they claim to be
  • Multi-factor authentication available

5. Transmission Security

  • Integrity controls (checksums, digital signatures)
  • Encryption (TLS 1.3 for all network communications)

HIPAA Implementation in VoiceAssist

HIPAA RequirementVoiceAssist Implementation
Access ControlRBAC via JWT tokens (Phase 2), Nextcloud OIDC integration (Phase 6+)
Audit LoggingComprehensive audit logs (all PHI access tracked)
AuthenticationJWT with bcrypt password hashing (Phase 2), OIDC/OAuth2 + optional MFA (Phase 6+)
Encryption at RestAES-256 encryption for database and file storage
Encryption in TransitTLS 1.3 for all communications
Data BackupAutomated daily backups with encryption
Emergency AccessAdmin override with audit trail
Session ManagementAccess tokens (15-min), refresh tokens (7-day), rate limiting on auth endpoints
PHI MinimizationPHI detection service redacts unnecessary PHI
Audit TrailImmutable audit logs stored separately

Zero-Trust Architecture

Zero-Trust Principles

  1. Never Trust, Always Verify: Every request is authenticated and authorized
  2. Least Privilege Access: Users/services get minimum required permissions
  3. Assume Breach: Design assumes attacker has network access
  4. Verify Explicitly: Use all available data points for authorization decisions
  5. Microsegmentation: Network isolation between services

Implementation

1. Service-to-Service Authentication

Docker Compose (Phases 0-10):

# Each service authenticates via API keys services: api-gateway: environment: - SERVICE_API_KEY=${API_GATEWAY_KEY} medical-kb: environment: - SERVICE_API_KEY=${MEDICAL_KB_KEY} - REQUIRED_API_KEYS=${API_GATEWAY_KEY}

Kubernetes (Phases 11-14):

# Service mesh (Linkerd) provides mTLS --- apiVersion: v1 kind: Service metadata: annotations: linkerd.io/inject: enabled spec: # mTLS automatically enabled

2. Network Segmentation

Docker Compose:

networks: public: # API Gateway only internal: # Microservices database: # Database access only internal: true # No external access

Kubernetes:

apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: api-gateway-policy spec: podSelector: matchLabels: app: api-gateway policyTypes: - Ingress - Egress ingress: - from: - podSelector: {} ports: - protocol: TCP port: 8000 egress: - to: - podSelector: matchLabels: app: auth-service ports: - protocol: TCP port: 8002

3. Identity-Based Access

# Every API request requires: # 1. Valid JWT token from Nextcloud OIDC # 2. Role-based permission check # 3. Resource-level access validation @router.get("/medical-record/{record_id}") async def get_medical_record( record_id: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # 1. User already authenticated (JWT valid) # 2. Check user role if current_user.role not in ["doctor", "nurse", "admin"]: raise HTTPException(status_code=403) # 3. Check resource-level access record = db.query(MedicalRecord).filter( MedicalRecord.id == record_id, MedicalRecord.authorized_users.contains(current_user.id) ).first() if not record: raise HTTPException(status_code=404) # 4. Log access audit_log.log_access( user_id=current_user.id, resource="medical_record", resource_id=record_id, action="read" ) return record

4. Short-Lived Credentials

# JWT tokens expire after 1 hour JWT_EXPIRATION = 3600 # seconds # Refresh tokens expire after 7 days REFRESH_TOKEN_EXPIRATION = 604800 # seconds # Service-to-service tokens rotate every 5 minutes SERVICE_TOKEN_EXPIRATION = 300 # seconds

5. Continuous Verification

# Every request goes through middleware that verifies: # - Token validity # - Token not revoked # - User still has required permissions # - Rate limiting # - Anomaly detection @app.middleware("http") async def security_middleware(request: Request, call_next): # Verify token token = request.headers.get("Authorization", "").replace("Bearer ", "") if not verify_token(token): return JSONResponse(status_code=401, content={"error": "Invalid token"}) # Check if token revoked if await redis.get(f"revoked:{token}"): return JSONResponse(status_code=401, content={"error": "Token revoked"}) # Rate limiting user_id = get_user_from_token(token) if not await rate_limiter.check(user_id): return JSONResponse(status_code=429, content={"error": "Rate limit exceeded"}) # Anomaly detection if await detect_anomaly(user_id, request): await alert_security_team(user_id, request) response = await call_next(request) return response

Encryption

Encryption at Rest

1. Database Encryption

PostgreSQL (Transparent Data Encryption):

-- Enable pgcrypto extension CREATE EXTENSION pgcrypto; -- Encrypt sensitive columns CREATE TABLE medical_records ( id UUID PRIMARY KEY, patient_id UUID NOT NULL, diagnosis TEXT NOT NULL, -- Encrypted column notes TEXT, -- Encrypted column created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), encryption_key_id VARCHAR(255) NOT NULL ); -- Encrypt data before insert INSERT INTO medical_records (id, patient_id, diagnosis, notes, encryption_key_id) VALUES ( gen_random_uuid(), 'patient-uuid', pgp_sym_encrypt('Patient has diabetes', 'encryption_key'), pgp_sym_encrypt('Notes about treatment', 'encryption_key'), 'key-id-123' ); -- Decrypt on read SELECT id, patient_id, pgp_sym_decrypt(diagnosis::bytea, 'encryption_key') AS diagnosis, pgp_sym_decrypt(notes::bytea, 'encryption_key') AS notes FROM medical_records;

Application-Level Encryption:

from cryptography.fernet import Fernet import os class EncryptionService: def __init__(self): # Use environment variable for encryption key # In production, use key management service (AWS KMS, Azure Key Vault, etc.) self.key = os.environ.get("ENCRYPTION_KEY").encode() self.cipher = Fernet(self.key) def encrypt(self, data: str) -> bytes: """Encrypt plaintext data""" return self.cipher.encrypt(data.encode()) def decrypt(self, encrypted_data: bytes) -> str: """Decrypt encrypted data""" return self.cipher.decrypt(encrypted_data).decode() # Usage in models class MedicalRecord(Base): __tablename__ = "medical_records" id = Column(UUID, primary_key=True) patient_id = Column(UUID, nullable=False) _diagnosis = Column("diagnosis", LargeBinary) # Encrypted _notes = Column("notes", LargeBinary) # Encrypted @property def diagnosis(self) -> str: if self._diagnosis: return encryption_service.decrypt(self._diagnosis) return None @diagnosis.setter def diagnosis(self, value: str): if value: self._diagnosis = encryption_service.encrypt(value)

2. File Storage Encryption

import boto3 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend class SecureFileStorage: def __init__(self): self.s3 = boto3.client('s3') self.bucket = os.environ.get("S3_BUCKET") def upload_file(self, file_data: bytes, file_name: str, user_id: str): # Generate unique encryption key for this file file_key = os.urandom(32) iv = os.urandom(16) # Encrypt file cipher = Cipher( algorithms.AES(file_key), modes.GCM(iv), backend=default_backend() ) encryptor = cipher.encryptor() encrypted_data = encryptor.update(file_data) + encryptor.finalize() # Store encryption key in database (encrypted with master key) encryption_key_record = FileEncryptionKey( file_id=file_name, encrypted_key=master_encrypt(file_key), iv=iv, user_id=user_id ) db.add(encryption_key_record) db.commit() # Upload to S3 with server-side encryption self.s3.put_object( Bucket=self.bucket, Key=file_name, Body=encrypted_data, ServerSideEncryption='AES256' )

3. Backup Encryption

#!/bin/bash # backup-encrypted.sh BACKUP_DIR="/opt/backups" DATE=$(date +%Y%m%d_%H%M%S) ENCRYPTION_KEY="$BACKUP_ENCRYPTION_KEY" # From environment # Backup PostgreSQL and encrypt docker exec voiceassist-prod-postgres-1 pg_dump -U voiceassist voiceassist | \ gzip | \ openssl enc -aes-256-cbc -salt -pbkdf2 -k "$ENCRYPTION_KEY" \ > "$BACKUP_DIR/voiceassist_db_$DATE.sql.gz.enc" # Backup files and encrypt tar czf - /data/voiceassist | \ openssl enc -aes-256-cbc -salt -pbkdf2 -k "$ENCRYPTION_KEY" \ > "$BACKUP_DIR/voiceassist_data_$DATE.tar.gz.enc" echo "Encrypted backups created"

Encryption in Transit

1. TLS Configuration

Traefik TLS Configuration:

# traefik.yml entryPoints: websecure: address: ":443" http: tls: options: strict tls: options: strict: minVersion: VersionTLS13 cipherSuites: - TLS_AES_256_GCM_SHA384 - TLS_CHACHA20_POLY1305_SHA256 curvePreferences: - CurveP521 - CurveP384

2. Internal Service Communication

Docker Compose (Phases 0-10):

# Use internal networks + API key authentication services: api-gateway: networks: - public - internal environment: - TLS_CERT=/certs/cert.pem - TLS_KEY=/certs/key.pem

Kubernetes (Phases 11-14):

# Linkerd provides automatic mTLS --- apiVersion: linkerd.io/v1alpha2 kind: ServiceProfile metadata: name: medical-kb spec: routes: - condition: method: GET pathRegex: /api/.* name: api-route isRetryable: false timeout: 30s

3. Client-to-Server (WebRTC Voice)

// WebRTC with DTLS-SRTP encryption const peerConnection = new RTCPeerConnection({ iceServers: [{ urls: "stun:stun.l.google.com:19302" }], // Force DTLS-SRTP encryption bundlePolicy: "max-bundle", rtcpMuxPolicy: "require", }); // Verify encryption is active peerConnection.getStats().then((stats) => { stats.forEach((report) => { if (report.type === "transport") { console.log("DTLS State:", report.dtlsState); // Must be 'connected' console.log("SRTP Cipher:", report.srtpCipher); // e.g., 'AES_CM_128_HMAC_SHA1_80' } }); });

Authentication & Authorization

Authentication Flow (Phase 2: JWT-based)

Current Implementation (Phase 2):

1. User → Web App (email + password)
2. Web App → API Gateway POST /api/auth/login
3. API Gateway → Database (validate credentials)
4. API Gateway verifies password hash (bcrypt)
5. API Gateway → Web App (access token + refresh token)
6. Web App stores tokens securely
7. Web App → API Gateway (requests with Authorization: Bearer <access_token>)
8. API Gateway verifies JWT signature and expiry
9. API Gateway extracts user info from token payload
10. API Gateway → Web App (protected resource)

JWT Token Details (Phase 2 Enhancements):

  • Access Token: 15-minute expiry, HS256 algorithm, contains user ID + email + role
  • Refresh Token: 7-day expiry, used to obtain new access tokens
  • Token Revocation (app/services/token_revocation.py):
    • Redis-based blacklisting for immediate invalidation
    • Dual-level revocation (individual token + all user tokens)
    • Fail-open design (allows requests if Redis unavailable)
    • Automatic TTL management matching token expiry
    • Used for logout, password changes, security breaches
  • Password Security:
    • Hashing: bcrypt via passlib (12 rounds)
    • Validation (app/core/password_validator.py):
      • Minimum 8 characters (configurable)
      • Requires uppercase, lowercase, digits, special characters
      • Rejects common passwords (password, 123456, qwerty, etc.)
      • Detects sequential characters (abc, 123, etc.)
      • Detects repeated characters (aaa, 111, etc.)
      • Strength scoring (0-100): Weak (<40), Medium (40-70), Strong (≥70)
  • Rate Limiting:
    • Registration: 5 requests/hour per IP
    • Login: 10 requests/minute per IP
    • Token refresh: 20 requests/minute per IP
  • Request Tracking (app/core/request_id.py):
    • Unique UUID v4 for each request
    • Returned in X-Request-ID response header
    • Correlated across audit logs for debugging
  • API Response Format (app/core/api_envelope.py):
    • Standardized envelope with success/error/metadata/timestamp
    • Standard error codes (INVALID_CREDENTIALS, TOKEN_EXPIRED, TOKEN_REVOKED, etc.)
    • Request ID correlation in metadata

Future Enhancement (Phase 6+):

Full OIDC integration with Nextcloud:
1. User → VoiceAssist Web App
2. Web App → Nextcloud OIDC (/auth/login)
3. Nextcloud → User (login form)
4. User → Nextcloud (credentials)
5. Nextcloud → Web App (authorization code)
6. Web App → Nextcloud (/token endpoint)
7. Nextcloud → Web App (ID token + access token)
8. Web App → API Gateway (access token)
9. API Gateway → Auth Service (verify token)
10. Auth Service → Nextcloud (validate token)
11. Nextcloud → Auth Service (user info)
12. Auth Service → API Gateway (JWT token with user info + roles)
13. API Gateway → Web App (JWT token)
14. Web App stores JWT in httpOnly cookie

Authorization Levels

RolePermissions
AdminFull system access, user management, audit log access
DoctorRead/write patient records, prescribe medications, view medical knowledge
NurseRead/write patient records, limited prescribing, view medical knowledge
PatientRead own records only, limited voice assistant access
ResearcherRead de-identified data only, no PHI access
API ServiceService-specific permissions (e.g., file-indexer can read files)

RBAC Implementation

from enum import Enum from typing import List class Role(str, Enum): ADMIN = "admin" DOCTOR = "doctor" NURSE = "nurse" PATIENT = "patient" RESEARCHER = "researcher" class Permission(str, Enum): READ_PATIENT_RECORD = "read:patient_record" WRITE_PATIENT_RECORD = "write:patient_record" DELETE_PATIENT_RECORD = "delete:patient_record" PRESCRIBE_MEDICATION = "prescribe:medication" VIEW_AUDIT_LOGS = "view:audit_logs" MANAGE_USERS = "manage:users" ACCESS_DEIDENTIFIED_DATA = "access:deidentified_data" # Role-Permission mapping ROLE_PERMISSIONS = { Role.ADMIN: [p for p in Permission], # All permissions Role.DOCTOR: [ Permission.READ_PATIENT_RECORD, Permission.WRITE_PATIENT_RECORD, Permission.PRESCRIBE_MEDICATION, ], Role.NURSE: [ Permission.READ_PATIENT_RECORD, Permission.WRITE_PATIENT_RECORD, ], Role.PATIENT: [ Permission.READ_PATIENT_RECORD, # Own records only ], Role.RESEARCHER: [ Permission.ACCESS_DEIDENTIFIED_DATA, ], } def require_permission(permission: Permission): """Decorator to enforce permission requirements""" def decorator(func): async def wrapper(*args, current_user: User, **kwargs): user_permissions = ROLE_PERMISSIONS.get(current_user.role, []) if permission not in user_permissions: raise HTTPException( status_code=403, detail=f"Permission denied: requires {permission}" ) return await func(*args, current_user=current_user, **kwargs) return wrapper return decorator # Usage @router.delete("/patient-record/{record_id}") @require_permission(Permission.DELETE_PATIENT_RECORD) async def delete_patient_record( record_id: str, current_user: User = Depends(get_current_user) ): # Only admins can reach here pass

PHI Detection & Redaction

PHI Detection Service

import re from typing import List, Dict import spacy class PHIDetector: """Detect and redact Protected Health Information""" def __init__(self): # Load NLP model for NER self.nlp = spacy.load("en_core_web_sm") # PHI patterns (18 HIPAA identifiers) self.patterns = { "name": r"\b[A-Z][a-z]+ [A-Z][a-z]+\b", "ssn": r"\b\d{3}-\d{2}-\d{4}\b", "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", "mrn": r"\bMRN:?\s*\d{6,10}\b", "date": r"\b\d{1,2}/\d{1,2}/\d{2,4}\b", "zipcode": r"\b\d{5}(-\d{4})?\b", "ip_address": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", "account_number": r"\b[A-Z]{2}\d{6,10}\b", } def detect(self, text: str) -> List[Dict]: """Detect all PHI in text""" phi_detected = [] # Regex-based detection for phi_type, pattern in self.patterns.items(): matches = re.finditer(pattern, text) for match in matches: phi_detected.append({ "type": phi_type, "value": match.group(), "start": match.start(), "end": match.end() }) # NLP-based detection (names, locations) doc = self.nlp(text) for ent in doc.ents: if ent.label_ in ["PERSON", "GPE", "LOC", "ORG", "DATE"]: phi_detected.append({ "type": ent.label_.lower(), "value": ent.text, "start": ent.start_char, "end": ent.end_char }) return phi_detected def redact(self, text: str, redaction_char="*") -> str: """Redact all detected PHI""" phi_list = self.detect(text) # Sort by position (reverse order to maintain indices) phi_list.sort(key=lambda x: x["start"], reverse=True) result = text for phi in phi_list: redacted = redaction_char * (phi["end"] - phi["start"]) result = result[:phi["start"]] + redacted + result[phi["end"]:] return result def anonymize(self, text: str) -> str: """Replace PHI with placeholder tokens""" phi_list = self.detect(text) phi_list.sort(key=lambda x: x["start"], reverse=True) result = text for phi in phi_list: placeholder = f"[{phi['type'].upper()}]" result = result[:phi["start"]] + placeholder + result[phi["end"]:] return result # Usage phi_detector = PHIDetector() # Example text text = "Patient John Doe (SSN: 123-45-6789) visited on 01/15/2024. Contact: john.doe@email.com, 555-123-4567." # Detect PHI detected = phi_detector.detect(text) # [{'type': 'name', 'value': 'John Doe', ...}, {'type': 'ssn', 'value': '123-45-6789', ...}, ...] # Redact PHI redacted = phi_detector.redact(text) # "Patient ******** (SSN: ***-**-****) visited on **/**/****. Contact: *******************, ***-***-****." # Anonymize PHI anonymized = phi_detector.anonymize(text) # "Patient [NAME] (SSN: [SSN]) visited on [DATE]. Contact: [EMAIL], [PHONE]."

PHI Logging Policy

import logging from functools import wraps class PHISafeLogger: """Logger that automatically redacts PHI""" def __init__(self, name: str): self.logger = logging.getLogger(name) self.phi_detector = PHIDetector() def _redact_message(self, message: str) -> str: """Redact PHI from log message""" return self.phi_detector.redact(message) def info(self, message: str, **kwargs): self.logger.info(self._redact_message(message), **kwargs) def warning(self, message: str, **kwargs): self.logger.warning(self._redact_message(message), **kwargs) def error(self, message: str, **kwargs): self.logger.error(self._redact_message(message), **kwargs) # Usage logger = PHISafeLogger(__name__) logger.info(f"Patient John Doe logged in") # Logs: "Patient ******** logged in"

Tool PHI Security Rules

VoiceAssist's tools system (see TOOLS_AND_INTEGRATIONS.md) implements PHI-aware security controls to ensure compliance with HIPAA.

Tool PHI Classification

All tools are classified by their ability to handle PHI:

Tool NameAllows PHIExecution LocationExternal APIRationale
get_calendar_events✅ YesLocal/NextcloudNoCalendar data may contain patient appointments
create_calendar_event✅ YesLocal/NextcloudNoEvent titles/descriptions may reference patients
search_nextcloud_files✅ YesLocal/NextcloudNoFile names and metadata may contain PHI
retrieve_nextcloud_file✅ YesLocal/NextcloudNoFile contents are clinical documents with PHI
calculate_medical_score✅ YesLocal computeNoCalculations use patient-specific data (age, labs, etc.)
generate_differential_diagnosis✅ YesLocal LLMNoDDx generated from patient symptoms and history
search_openevidence❌ NoExternal APIYesExternal service - PHI must be stripped before sending
search_pubmed❌ NoExternal APIYesExternal service - PHI must be stripped before sending
search_medical_guidelines❌ NoLocal vector DBNoGeneral medical knowledge, no patient data
web_search_medical❌ NoExternal APIYesExternal service - PHI must be stripped before sending

Key Principles:

  1. Local PHI Tools: Tools that access PHI (calendar, files, calculations, DDx) execute locally or via Nextcloud (same network)
  2. External Non-PHI Tools: Tools that call external APIs (OpenEvidence, PubMed, web search) must never receive PHI
  3. PHI Detection: All tool arguments are scanned for PHI before execution
  4. Violation Prevention: If PHI is detected in arguments to a non-PHI tool, execution is blocked with PHI_VIOLATION error

PHI Detection in Tool Arguments

# server/app/services/orchestration/tool_executor.py from app.services.phi.detector import PHIDetector from app.services.tools.registry import TOOL_REGISTRY phi_detector = PHIDetector() async def execute_tool( tool_name: str, args: dict, user: UserContext, trace_id: str, ) -> ToolResult: """ Execute tool with PHI detection and enforcement. PHI Security Rules: 1. Detect PHI in all tool arguments 2. If PHI detected and tool.allows_phi = False, raise PHI_VIOLATION 3. If PHI detected and tool.allows_phi = True, route to local execution 4. Log all PHI detections to audit log """ tool_def = TOOL_REGISTRY[tool_name] # Scan all arguments for PHI phi_result = await phi_detector.detect_in_dict(args) if phi_result.contains_phi: # Log PHI detection audit_logger.info( "PHI detected in tool arguments", extra={ "tool_name": tool_name, "user_id": user.id, "trace_id": trace_id, "phi_types": phi_result.phi_types, # e.g., ["name", "mrn", "date"] "allows_phi": tool_def.allows_phi, } ) # Enforce PHI policy if not tool_def.allows_phi: # BLOCK: Tool cannot handle PHI raise ToolPHIViolationError( f"Tool '{tool_name}' cannot process PHI. " f"Detected: {', '.join(phi_result.phi_types)}. " f"Use a local tool or remove PHI from query." ) # Execute tool (PHI check passed) return await tool_def.execute(args, user, trace_id)

PHI Routing for AI Models

When generating tool calls via OpenAI Realtime API or other LLMs:

# server/app/services/orchestration/query_orchestrator.py async def route_query_to_llm( query: str, user: UserContext, trace_id: str, ) -> LLMResponse: """ Route query to appropriate LLM based on PHI content. PHI Routing Rules: - PHI detected → Local Llama 3.1 8B (on-prem) - No PHI → OpenAI GPT-4 (cloud) """ # Detect PHI in user query phi_result = await phi_detector.detect(query) if phi_result.contains_phi: # Route to LOCAL LLM llm_provider = "llama_local" model = "llama-3.1-8b-instruct" endpoint = "http://llm-service:8000/v1/chat/completions" audit_logger.info( "PHI detected - routing to local LLM", extra={ "query_length": len(query), "phi_types": phi_result.phi_types, "model": model, "user_id": user.id, "trace_id": trace_id, } ) else: # Route to CLOUD LLM llm_provider = "openai" model = "gpt-4-turbo" endpoint = "https://api.openai.com/v1/chat/completions" audit_logger.info( "No PHI detected - routing to cloud LLM", extra={ "query_length": len(query), "model": model, "user_id": user.id, "trace_id": trace_id, } ) # Make LLM request with tool definitions response = await llm_client.chat_completion( endpoint=endpoint, model=model, messages=[{"role": "user", "content": query}], tools=get_available_tools(phi_detected=phi_result.contains_phi), ) return response

Tool Definition PHI Flags

Tool definitions include allows_phi flag:

# server/app/tools/calendar_tool.py from app.tools.base import ToolDefinition calendar_tool = ToolDefinition( name="create_calendar_event", description="Create an event in the user's calendar", category="calendar", allows_phi=True, # ← PHI flag requires_confirmation=True, timeout_seconds=30, execute=create_calendar_event_impl, )
# server/app/tools/medical_search_tool.py openevidence_tool = ToolDefinition( name="search_openevidence", description="Search evidence-based medicine database", category="medical_search", allows_phi=False, # ← PHI flag (external API) requires_confirmation=False, timeout_seconds=10, execute=search_openevidence_impl, )

PHI Audit Trail

All tool invocations with PHI are logged to the audit log:

# After tool execution if phi_result.contains_phi: await audit_log_service.log_event( event_type="TOOL_CALL_PHI", user_id=user.id, resource_type="tool", resource_id=tool_name, action="execute", metadata={ "tool_name": tool_name, "phi_detected": True, "phi_types": phi_result.phi_types, "tool_allows_phi": tool_def.allows_phi, "execution_status": status, "duration_ms": duration_ms, "trace_id": trace_id, } )

PHI Error Responses

When PHI is detected in arguments to a non-PHI tool:

{ "success": false, "error": { "code": "PHI_VIOLATION", "message": "Tool 'search_openevidence' cannot process PHI. Detected: name, mrn. Use a local tool or remove PHI from query.", "details": { "tool_name": "search_openevidence", "allows_phi": false, "phi_types_detected": ["name", "mrn"], "suggested_tools": ["search_medical_guidelines", "generate_differential_diagnosis"] } }, "trace_id": "550e8400-e29b-41d4-a716-446655440000", "timestamp": "2025-11-20T12:34:56.789Z" }

Frontend Handling:

  • Display user-friendly error message
  • Suggest alternative tools that allow PHI
  • Allow user to rephrase query without PHI

Related Documentation:


Audit Logging

For logging conventions and metrics, see OBSERVABILITY.md.

Audit Log Requirements

Every access to PHI must be logged with:

  1. Who: User ID, role, email
  2. What: Action performed (read, write, delete, authentication events)
  3. When: Timestamp (UTC with timezone support)
  4. Where: IP address, service, endpoint, request ID
  5. Why: Purpose/reason (stored in metadata)
  6. Result: Success/failure with error details

Phase 2 Implementation Status

✅ IMPLEMENTED - Comprehensive audit logging system deployed in Phase 2:

Key Features:

  • Immutable audit trail with SHA-256 integrity verification
  • Authentication event logging (registration, login, logout, token refresh/revocation)
  • Comprehensive metadata capture including IP address, user agent, request ID
  • JSONB metadata field for extensible additional context
  • Composite indexes for efficient queries by user, action, timestamp
  • Automated integrity verification to detect tampering
  • Fail-safe logging ensuring audit logs are created even if errors occur

Database Schema: audit_logs table (PostgreSQL with JSONB)

Service Layer:

  • app/services/audit_service.py - Audit logging service
  • app/models/audit_log.py - Audit log ORM model

Usage in Authentication Flow:

  • All authentication events automatically logged
  • Token revocation events captured
  • Failed login attempts tracked
  • Request IDs correlated for debugging

Audit Log Implementation (Phase 2)

from sqlalchemy import Column, String, DateTime, JSON, Text from datetime import datetime import hashlib class AuditLog(Base): __tablename__ = "audit_logs" id = Column(UUID, primary_key=True, default=uuid.uuid4) timestamp = Column(DateTime, nullable=False, default=datetime.utcnow) user_id = Column(UUID, nullable=False) user_role = Column(String(50), nullable=False) action = Column(String(100), nullable=False) # read, write, delete, export, etc. resource_type = Column(String(100), nullable=False) # patient_record, prescription, etc. resource_id = Column(String(255)) ip_address = Column(String(45)) user_agent = Column(Text) request_id = Column(String(100)) service_name = Column(String(100)) success = Column(Boolean, nullable=False) error_message = Column(Text) metadata = Column(JSON) # Additional context hash = Column(String(64), nullable=False) # Integrity verification def __init__(self, **kwargs): super().__init__(**kwargs) # Calculate hash for integrity self.hash = self.calculate_hash() def calculate_hash(self) -> str: """Calculate hash to detect tampering""" data = f"{self.timestamp}{self.user_id}{self.action}{self.resource_type}{self.resource_id}" return hashlib.sha256(data.encode()).hexdigest() def verify_integrity(self) -> bool: """Verify audit log has not been tampered with""" expected_hash = self.calculate_hash() return self.hash == expected_hash class AuditService: """Service for creating audit logs""" @staticmethod async def log_access( user_id: str, user_role: str, action: str, resource_type: str, resource_id: str = None, request: Request = None, success: bool = True, error_message: str = None, metadata: dict = None ): """Create audit log entry""" log_entry = AuditLog( user_id=user_id, user_role=user_role, action=action, resource_type=resource_type, resource_id=resource_id, ip_address=request.client.host if request else None, user_agent=request.headers.get("user-agent") if request else None, request_id=request.state.request_id if request else None, service_name="voiceassist", success=success, error_message=error_message, metadata=metadata ) db.add(log_entry) db.commit() # Also send to immutable log storage (e.g., WORM storage, blockchain) await send_to_immutable_storage(log_entry) # Decorator for automatic audit logging def audit_log(action: str, resource_type: str): def decorator(func): @wraps(func) async def wrapper(*args, current_user: User, **kwargs): success = True error_message = None try: result = await func(*args, current_user=current_user, **kwargs) return result except Exception as e: success = False error_message = str(e) raise finally: # Log regardless of success/failure resource_id = kwargs.get("record_id") or kwargs.get("patient_id") await AuditService.log_access( user_id=current_user.id, user_role=current_user.role, action=action, resource_type=resource_type, resource_id=resource_id, request=kwargs.get("request"), success=success, error_message=error_message ) return wrapper return decorator # Usage @router.get("/patient-record/{record_id}") @audit_log(action="read", resource_type="patient_record") async def get_patient_record( record_id: str, current_user: User = Depends(get_current_user), request: Request = None ): # Audit log created automatically return db.query(PatientRecord).filter_by(id=record_id).first()

Audit Log Retention

# Retain audit logs for 6 years (HIPAA requirement) AUDIT_LOG_RETENTION_YEARS = 6 # Archive old logs to cold storage async def archive_old_audit_logs(): """Archive audit logs older than 1 year to cold storage""" cutoff_date = datetime.utcnow() - timedelta(days=365) # Export to JSON old_logs = db.query(AuditLog).filter(AuditLog.timestamp < cutoff_date).all() # Write to encrypted archive with open(f"/archive/audit_logs_{cutoff_date.year}.json.enc", "w") as f: encrypted_data = encrypt_data(json.dumps([log.to_dict() for log in old_logs])) f.write(encrypted_data) # Verify integrity for log in old_logs: if not log.verify_integrity(): alert_security_team(f"Audit log integrity violation: {log.id}") # Delete from active database (after successful archive) db.query(AuditLog).filter(AuditLog.timestamp < cutoff_date).delete() db.commit()

Network Security

Firewall Rules

# UFW rules for production server sudo ufw default deny incoming sudo ufw default allow outgoing # Allow SSH (change port if using non-standard) sudo ufw allow 22/tcp # Allow HTTP/HTTPS sudo ufw allow 80/tcp sudo ufw allow 443/tcp # Deny all other ports sudo ufw enable

Network Policies (Kubernetes)

--- # Only API Gateway can receive external traffic apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: api-gateway-policy namespace: voiceassist spec: podSelector: matchLabels: app: api-gateway policyTypes: - Ingress - Egress ingress: - from: - namespaceSelector: {} # From any namespace ports: - protocol: TCP port: 8000 egress: - to: - podSelector: matchLabels: app: auth-service ports: - protocol: TCP port: 8002 --- # Database only accessible by specific services apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: postgres-policy namespace: voiceassist spec: podSelector: matchLabels: app: postgres policyTypes: - Ingress ingress: - from: - podSelector: matchLabels: app: api-gateway - podSelector: matchLabels: app: auth-service - podSelector: matchLabels: app: medical-kb ports: - protocol: TCP port: 5432

Data Retention & Disposal

Retention Policy

Data TypeRetention PeriodDisposal Method
Medical Records6 years after last visitSecure wipe + shred (physical)
Audit Logs6 yearsEncrypted archive, then secure wipe
Voice Recordings30 days (unless saved)Secure wipe
Temporary Files24 hoursAutomatic secure deletion
Backups30 days (rolling)Encrypt, then secure wipe
De-identified DataIndefiniteN/A (no PHI)

Secure Deletion

import os import random def secure_delete(file_path: str, passes: int = 7): """ Securely delete file using DOD 5220.22-M standard (7-pass) """ if not os.path.exists(file_path): return file_size = os.path.getsize(file_path) with open(file_path, "ba+") as f: for pass_num in range(passes): f.seek(0) if pass_num in [0, 2, 4]: # Write zeros f.write(b'\x00' * file_size) elif pass_num in [1, 3, 5]: # Write ones f.write(b'\xFF' * file_size) else: # Write random data f.write(os.urandom(file_size)) f.flush() os.fsync(f.fileno()) # Finally, delete the file os.remove(file_path) # Log deletion audit_log.log_deletion(file_path) # Scheduled cleanup job @celery.task def cleanup_expired_files(): """Clean up files older than retention period""" cutoff_date = datetime.utcnow() - timedelta(days=30) expired_files = db.query(TemporaryFile).filter( TemporaryFile.created_at < cutoff_date ).all() for file_record in expired_files: # Secure delete physical file secure_delete(file_record.file_path) # Delete database record db.delete(file_record) db.commit()

Incident Response

Incident Response Plan

1. Preparation

  • Incident response team identified
  • Contact list maintained
  • Incident response playbooks documented
  • Regular drills conducted (quarterly)

2. Detection & Analysis

  • 24/7 monitoring via Prometheus/Grafana
  • Automated alerts for suspicious activity
  • Log analysis for anomalies
  • User reports

3. Containment

  • Short-term: Isolate affected systems, revoke compromised credentials
  • Long-term: Apply patches, update firewall rules

4. Eradication

  • Remove malware/backdoors
  • Close vulnerabilities
  • Reset all passwords

5. Recovery

  • Restore from clean backups
  • Verify system integrity
  • Gradual service restoration

6. Post-Incident

  • Incident report (within 60 days for HIPAA breach)
  • Lessons learned meeting
  • Update security controls
  • Notify affected users (if PHI breach)

Security Incident Examples

Unauthorized Access Attempt:

# Alert triggered when multiple failed login attempts @app.middleware("http") async def detect_brute_force(request: Request, call_next): user_ip = request.client.host # Check failed login count failed_count = await redis.get(f"failed_login:{user_ip}") if failed_count and int(failed_count) > 5: # Block IP await redis.setex(f"blocked:{user_ip}", 3600, "1") # Alert security team await alert_security_team( severity="high", message=f"Brute force attack detected from {user_ip}", metadata={"ip": user_ip, "failed_attempts": failed_count} ) return JSONResponse(status_code=403, content={"error": "Blocked"}) return await call_next(request)

Data Breach Response:

async def handle_data_breach(affected_users: List[str], breach_type: str): """ HIPAA Breach Notification Rule: Notify within 60 days """ # 1. Document breach breach_report = BreachReport( incident_id=str(uuid.uuid4()), discovered_at=datetime.utcnow(), breach_type=breach_type, affected_user_count=len(affected_users), description="Unauthorized access to patient records", mitigation_steps="Access revoked, passwords reset, audit log reviewed", reported_to_authorities=False ) db.add(breach_report) db.commit() # 2. Notify affected users (email) for user_id in affected_users: await send_breach_notification_email(user_id, breach_report) # 3. Notify HHS if >500 individuals affected if len(affected_users) > 500: await notify_hhs(breach_report) # 4. Post on website if >500 individuals in same state if breach_report.requires_media_notice(): await post_media_notice(breach_report) # 5. Document in breach log audit_log.log_breach(breach_report)

Security Monitoring

Metrics to Monitor

# Prometheus alerts groups: - name: security_alerts rules: # Failed login attempts - alert: HighFailedLoginRate expr: rate(failed_login_total[5m]) > 10 for: 1m labels: severity: warning annotations: summary: "High rate of failed login attempts" # Unauthorized access attempts - alert: UnauthorizedAccessAttempt expr: rate(http_requests_total{status="403"}[5m]) > 5 for: 1m labels: severity: high annotations: summary: "Multiple unauthorized access attempts detected" # Unusual data export volume - alert: UnusualDataExport expr: rate(data_export_bytes_total[10m]) > 1000000000 # 1GB/10min for: 5m labels: severity: critical annotations: summary: "Unusual volume of data exports detected" # PHI access outside business hours - alert: PHIAccessAfterHours expr: phi_access_total{hour="<8"} > 0 OR phi_access_total{hour=">18"} > 0 for: 1m labels: severity: warning annotations: summary: "PHI accessed outside business hours"

Security Dashboard (Grafana)

{ "dashboard": { "title": "Security Monitoring", "panels": [ { "title": "Failed Login Attempts (Last 24h)", "targets": [ { "expr": "sum(increase(failed_login_total[24h]))" } ] }, { "title": "Unauthorized Access by IP", "targets": [ { "expr": "topk(10, sum by (ip) (http_requests_total{status=\"403\"}))" } ] }, { "title": "PHI Access by User", "targets": [ { "expr": "sum by (user_id) (phi_access_total)" } ] }, { "title": "Audit Log Integrity Checks", "targets": [ { "expr": "audit_log_integrity_violations_total" } ] } ] } }

Compliance Checklists

Pre-Production Checklist

  • All sensitive data encrypted at rest (AES-256)
  • All network traffic encrypted in transit (TLS 1.3)
  • OIDC authentication configured with Nextcloud
  • RBAC implemented and tested
  • PHI detection service deployed and tested
  • Audit logging enabled for all PHI access
  • Backup encryption enabled
  • Firewall rules configured (deny by default)
  • Network policies configured (Kubernetes)
  • Business Associate Agreements signed (OpenAI, UpToDate, etc.)
  • Incident response plan documented
  • Security monitoring dashboard configured
  • Automatic session timeout (30 minutes)
  • Password policy enforced (min 12 characters, complexity)
  • MFA available (optional but recommended)
  • Vulnerability scanning completed
  • Penetration testing completed
  • Security training completed for all users
  • HIPAA compliance review completed
  • Privacy policy published

Annual Security Review

  • Review audit logs for unusual activity
  • Test backup restoration
  • Test incident response procedures
  • Update risk assessment
  • Review and update access controls
  • Vulnerability assessment
  • Penetration testing
  • Review Business Associate Agreements
  • Staff security training refresh
  • Update security policies
  • Review and test disaster recovery plan
  • Verify audit log integrity
  • Review encryption keys (rotation)

References