2:I[7012,["4765","static/chunks/4765-f5afdf8061f456f3.js","9856","static/chunks/9856-3b185291364d9bef.js","6687","static/chunks/app/docs/%5B...slug%5D/page-e07536548216bee4.js"],"MarkdownRenderer"] 4:I[9856,["4765","static/chunks/4765-f5afdf8061f456f3.js","9856","static/chunks/9856-3b185291364d9bef.js","6687","static/chunks/app/docs/%5B...slug%5D/page-e07536548216bee4.js"],""] 5:I[4126,[],""] 7:I[9630,[],""] 8:I[4278,["9856","static/chunks/9856-3b185291364d9bef.js","8172","static/chunks/8172-b3a2d6fe4ae10d40.js","3185","static/chunks/app/layout-2814fa5d15b84fe4.js"],"HeadingProvider"] 9:I[1476,["9856","static/chunks/9856-3b185291364d9bef.js","8172","static/chunks/8172-b3a2d6fe4ae10d40.js","3185","static/chunks/app/layout-2814fa5d15b84fe4.js"],"Header"] a:I[3167,["9856","static/chunks/9856-3b185291364d9bef.js","8172","static/chunks/8172-b3a2d6fe4ae10d40.js","3185","static/chunks/app/layout-2814fa5d15b84fe4.js"],"Sidebar"] b:I[7409,["9856","static/chunks/9856-3b185291364d9bef.js","8172","static/chunks/8172-b3a2d6fe4ae10d40.js","3185","static/chunks/app/layout-2814fa5d15b84fe4.js"],"PageFrame"] 3:T2fb2, # FHIR Streaming Service **Phase 3 - Voice Mode v4.1** Real-time FHIR data streaming for clinical context enrichment in voice interactions. ## Overview The FHIR Subscription Service enables real-time streaming of clinical data (vitals, labs, observations) to enrich the AI assistant's context during healthcare conversations. ``` ┌─────────────────┐ ┌────────────────────────────────────┐ │ FHIR Server │────▶│ FHIR Subscription Service │ │ (Epic/Cerner) │ │ │ └─────────────────┘ │ ┌─────────┐ ┌──────────────┐ │ │ │WebSocket│ │Context Builder│ │ Vitals ────────▶│ │ /Poll │───▶│ for Thinker │ │ Labs ────────▶│ └─────────┘ └──────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘ │ ▼ Thinker Context Injection "Patient vitals: BP 120/80, HR 72..." ``` ## Features - **WebSocket Subscriptions**: Real-time push from FHIR R5 servers - **Polling Fallback**: For servers without subscription support - **Change Detection**: ETag and Last-Modified tracking - **Context Builder**: Format observations for AI consumption - **Reconnection Handling**: Exponential backoff on failures - **PHI-Aware**: Integrates with PHI routing decisions ## Requirements ```bash pip install httpx websockets python-dateutil ``` **Environment Variables:** ```bash FHIR_SERVER_URL=https://fhir.example.com/r4 FHIR_AUTH_TOKEN=your_bearer_token ``` ## Feature Flag ```yaml # flag_definitions.yaml backend.voice_v4_fhir_streaming: default: false description: "Enable FHIR data streaming" ``` ## Basic Usage ### Initialize and Subscribe ```python from app.services.fhir_subscription_service import ( get_fhir_subscription_service, FHIRResourceType ) # Get service service = get_fhir_subscription_service() await service.initialize() # Subscribe to patient updates subscription = await service.subscribe_to_patient( patient_id="patient-123", resource_types=[ FHIRResourceType.VITAL_SIGNS, FHIRResourceType.LAB_RESULT, ], session_id="voice-session-456" ) print(f"Subscription: {subscription.subscription_id}") ``` ### Stream Observations ```python # Stream observations as they arrive async for observation in service.stream_observations(patient_id="patient-123"): print(f"New: {observation.code_display}: {observation.value_quantity}") ``` ### Get Latest Data ```python # Get latest vitals (one-time query) vitals = await service.get_latest_vitals( patient_id="patient-123", max_results=5 ) for vital in vitals: print(vital.to_context_string()) # Output: "Blood Pressure: 120/80 mmHg" # Get latest labs labs = await service.get_latest_labs( patient_id="patient-123", max_results=10 ) ``` ## Data Structures ### FHIRObservation ```python @dataclass class FHIRObservation: resource_id: str resource_type: FHIRResourceType patient_id: str code: str # LOINC/SNOMED code code_display: str # Human-readable name value: str | None # String value value_quantity: float | None # Numeric value value_unit: str | None # Unit of measure effective_datetime: datetime | None status: str = "final" interpretation: str | None # "High", "Low", "Normal" reference_range: str | None # "70-100" def to_context_string(self) -> str: """Format for Thinker context injection.""" # Returns: "Glucose: 180 mg/dL (High) [ref: 70-100]" ``` ### FHIRSubscription ```python @dataclass class FHIRSubscription: subscription_id: str patient_id: str resource_types: List[FHIRResourceType] status: SubscriptionStatus created_at: datetime last_event_at: datetime | None event_count: int error_message: str | None ``` ### FHIRResourceType ```python class FHIRResourceType(str, Enum): PATIENT = "Patient" OBSERVATION = "Observation" CONDITION = "Condition" MEDICATION_REQUEST = "MedicationRequest" DIAGNOSTIC_REPORT = "DiagnosticReport" VITAL_SIGNS = "vital-signs" LAB_RESULT = "laboratory" ALLERGY_INTOLERANCE = "AllergyIntolerance" ``` ## Configuration ### FHIRConfig ```python @dataclass class FHIRConfig: # Server settings fhir_server_url: str = "" auth_type: str = "bearer" # "bearer", "basic", "oauth2" auth_token: str | None = None # Subscription settings subscription_channel: str = "websocket" # or "polling" subscription_timeout_seconds: int = 3600 max_subscriptions_per_patient: int = 5 # Polling settings polling_interval_seconds: int = 30 max_polling_results: int = 100 # Retry settings max_retries: int = 3 retry_delay_seconds: int = 5 reconnect_delay_seconds: int = 10 # PHI settings require_phi_routing: bool = True log_phi_access: bool = True ``` ### Custom Configuration ```python from app.services.fhir_subscription_service import ( FHIRSubscriptionService, FHIRConfig ) config = FHIRConfig( fhir_server_url="https://fhir.hospital.org/r4", auth_type="bearer", auth_token="your_token", subscription_channel="polling", # Use polling instead of WebSocket polling_interval_seconds=15, ) service = FHIRSubscriptionService(config) await service.initialize() ``` ## Subscription Modes ### WebSocket (Preferred) ```python # Automatic reconnection with exponential backoff # Supports FHIR R5 $subscription-events config = FHIRConfig( subscription_channel="websocket", max_retries=5, reconnect_delay_seconds=10, ) ``` ### Polling (Fallback) ```python # Uses conditional requests (ETag, If-Modified-Since) # Change detection via seen_ids tracking config = FHIRConfig( subscription_channel="polling", polling_interval_seconds=30, ) ``` ## Context Injection for Thinker ### FHIRContextBuilder ```python from app.services.fhir_subscription_service import FHIRContextBuilder # Build vitals context vitals_context = FHIRContextBuilder.build_vitals_context(vitals) # Output: # Recent vital signs: # - Blood Pressure: 120/80 mmHg # - Heart Rate: 72 bpm # - Temperature: 98.6 F # Build labs context (highlights abnormal values) labs_context = FHIRContextBuilder.build_labs_context(labs) # Output: # Recent lab results: # ABNORMAL VALUES: # - Glucose: 180 mg/dL (High) [ref: 70-100] # Other results: # - Hemoglobin A1c: 6.5 % # Build combined summary summary = FHIRContextBuilder.build_clinical_summary( vitals=vitals, labs=labs, max_length=1000 ) ``` ### Convenience Function ```python from app.services.fhir_subscription_service import ( get_patient_context_for_thinker ) # Get formatted context for AI context = await get_patient_context_for_thinker( patient_id="patient-123", include_vitals=True, include_labs=True, max_vitals=5, max_labs=10 ) # Use in Thinker prompt response = await thinker.generate( messages=[{"role": "user", "content": user_question}], system=f"Patient clinical data:\n{context}\n\nAnswer the question." ) ``` ## Event Callbacks ### Register for Updates ```python def on_new_observation(event: StreamingEvent): if event.observation: obs = event.observation print(f"New {obs.code_display}: {obs.value_quantity}") # Check for critical values if obs.interpretation and "critical" in obs.interpretation.lower(): send_alert(obs) # Register callback service._register_callback(patient_id, on_new_observation) # Cleanup service._unregister_callback(patient_id, on_new_observation) ``` ## Error Handling ### Subscription Status ```python subscription = await service.subscribe_to_patient(patient_id) # Check status if subscription.status == SubscriptionStatus.ERROR: print(f"Error: {subscription.error_message}") # Monitor status changes while subscription.status == SubscriptionStatus.ACTIVE: await asyncio.sleep(10) # Subscription auto-reconnects on failure ``` ### Reconnection Behavior ```python # WebSocket reconnection: # 1. Initial failure → wait 10s → retry # 2. Second failure → wait 20s → retry # 3. Third failure → wait 40s → retry # ... up to max 5 minutes between retries # After max_retries, status → ERROR ``` ## Frontend Integration ### VitalsPanel Component ```tsx // See: apps/web-app/src/components/voice/VitalsPanel.tsx interface Vital { code: string; display: string; value: number; unit: string; interpretation?: string; timestamp: string; } function VitalsPanel({ patientId }: { patientId: string }) { const [vitals, setVitals] = useState([]); useEffect(() => { const ws = new WebSocket(`/api/fhir/stream/${patientId}`); ws.onmessage = (event) => { const vital = JSON.parse(event.data); setVitals((prev) => [vital, ...prev.slice(0, 9)]); }; return () => ws.close(); }, [patientId]); return (
{vitals.map((vital, i) => (
{vital.display}
{vital.value} {vital.unit}
))}
); } ``` ## Security Considerations ### PHI Handling ```python # FHIR data contains PHI - ensure proper routing if config.require_phi_routing: # Check PHI router state before streaming phi_state = get_phi_router_state(session_id) if phi_state.mode != "local": # Log PHI access if config.log_phi_access: logger.info(f"PHI access: {patient_id}", extra={ "session_id": session_id, "phi_mode": phi_state.mode, }) ``` ### Authentication ```python # Bearer token (most common) config = FHIRConfig( auth_type="bearer", auth_token=os.getenv("FHIR_AUTH_TOKEN") ) # OAuth2 (for production) # Implement token refresh in custom subclass ``` ## Testing ```python import pytest from unittest.mock import AsyncMock, patch @pytest.mark.asyncio async def test_fhir_subscription(): """Test FHIR subscription creation.""" service = get_fhir_subscription_service() with patch.object(service, '_test_connection', return_value=True): await service.initialize() sub = await service.subscribe_to_patient( patient_id="test-patient", resource_types=[FHIRResourceType.VITAL_SIGNS] ) assert sub is not None assert sub.status == SubscriptionStatus.ACTIVE @pytest.mark.asyncio async def test_context_builder(): """Test FHIR context builder formatting.""" vitals = [ FHIRObservation( resource_id="v1", resource_type=FHIRResourceType.VITAL_SIGNS, patient_id="test", code="8480-6", code_display="Blood Pressure", value="120/80", value_unit="mmHg", ) ] context = FHIRContextBuilder.build_vitals_context(vitals) assert "Blood Pressure" in context assert "120/80" in context ``` ## Related Documentation - [Voice Mode v4 Overview](./voice-mode-v4-overview.md) - [PHI-Aware STT Routing](./phi-aware-stt-routing.md) - [Speaker Diarization Service](./speaker-diarization-service.md) - [Adaptive Quality Service](./adaptive-quality-service.md) 6:["slug","voice/fhir-streaming-service","c"] 0:["X7oMT3VrOffzp0qvbeOas",[[["",{"children":["docs",{"children":[["slug","voice/fhir-streaming-service","c"],{"children":["__PAGE__?{\"slug\":[\"voice\",\"fhir-streaming-service\"]}",{}]}]}]},"$undefined","$undefined",true],["",{"children":["docs",{"children":[["slug","voice/fhir-streaming-service","c"],{"children":["__PAGE__",{},[["$L1",["$","div",null,{"children":[["$","div",null,{"className":"mb-6 flex items-center justify-between gap-4","children":[["$","div",null,{"children":[["$","p",null,{"className":"text-sm text-gray-500 dark:text-gray-400","children":"Docs / Raw"}],["$","h1",null,{"className":"text-3xl font-bold text-gray-900 dark:text-white","children":"FHIR Streaming Service"}],["$","p",null,{"className":"text-sm text-gray-600 dark:text-gray-400","children":["Sourced from"," ",["$","code",null,{"className":"font-mono text-xs","children":["docs/","voice/fhir-streaming-service.md"]}]]}]]}],["$","a",null,{"href":"https://github.com/mohammednazmy/VoiceAssist/edit/main/docs/voice/fhir-streaming-service.md","target":"_blank","rel":"noreferrer","className":"inline-flex items-center gap-2 rounded-md border border-gray-200 dark:border-gray-700 px-3 py-1.5 text-sm text-gray-700 dark:text-gray-200 hover:border-primary-500 dark:hover:border-primary-400 hover:text-primary-700 dark:hover:text-primary-300","children":"Edit on GitHub"}]]}],["$","div",null,{"className":"rounded-lg border border-gray-200 dark:border-gray-800 bg-white dark:bg-gray-900 p-6","children":["$","$L2",null,{"content":"$3"}]}],["$","div",null,{"className":"mt-6 flex flex-wrap gap-2 text-sm","children":[["$","$L4",null,{"href":"/reference/all-docs","className":"inline-flex items-center gap-1 rounded-md bg-gray-100 px-3 py-1 text-gray-700 hover:bg-gray-200 dark:bg-gray-800 dark:text-gray-200 dark:hover:bg-gray-700","children":"← All documentation"}],["$","$L4",null,{"href":"/","className":"inline-flex items-center gap-1 rounded-md bg-gray-100 px-3 py-1 text-gray-700 hover:bg-gray-200 dark:bg-gray-800 dark:text-gray-200 dark:hover:bg-gray-700","children":"Home"}]]}]]}],null],null],null]},[null,["$","$L5",null,{"parallelRouterKey":"children","segmentPath":["children","docs","children","$6","children"],"error":"$undefined","errorStyles":"$undefined","errorScripts":"$undefined","template":["$","$L7",null,{}],"templateStyles":"$undefined","templateScripts":"$undefined","notFound":"$undefined","notFoundStyles":"$undefined"}]],null]},[null,["$","$L5",null,{"parallelRouterKey":"children","segmentPath":["children","docs","children"],"error":"$undefined","errorStyles":"$undefined","errorScripts":"$undefined","template":["$","$L7",null,{}],"templateStyles":"$undefined","templateScripts":"$undefined","notFound":"$undefined","notFoundStyles":"$undefined"}]],null]},[[[["$","link","0",{"rel":"stylesheet","href":"/_next/static/css/7f586cdbbaa33ff7.css","precedence":"next","crossOrigin":"$undefined"}]],["$","html",null,{"lang":"en","className":"h-full","children":["$","body",null,{"className":"__className_f367f3 h-full bg-white dark:bg-gray-900","children":[["$","a",null,{"href":"#main-content","className":"skip-to-content","children":"Skip to main content"}],["$","$L8",null,{"children":[["$","$L9",null,{}],["$","$La",null,{}],["$","main",null,{"id":"main-content","className":"lg:pl-64","role":"main","aria-label":"Documentation content","children":["$","$Lb",null,{"children":["$","$L5",null,{"parallelRouterKey":"children","segmentPath":["children"],"error":"$undefined","errorStyles":"$undefined","errorScripts":"$undefined","template":["$","$L7",null,{}],"templateStyles":"$undefined","templateScripts":"$undefined","notFound":[["$","title",null,{"children":"404: This page could not be found."}],["$","div",null,{"style":{"fontFamily":"system-ui,\"Segoe UI\",Roboto,Helvetica,Arial,sans-serif,\"Apple Color Emoji\",\"Segoe UI Emoji\"","height":"100vh","textAlign":"center","display":"flex","flexDirection":"column","alignItems":"center","justifyContent":"center"},"children":["$","div",null,{"children":[["$","style",null,{"dangerouslySetInnerHTML":{"__html":"body{color:#000;background:#fff;margin:0}.next-error-h1{border-right:1px solid rgba(0,0,0,.3)}@media (prefers-color-scheme:dark){body{color:#fff;background:#000}.next-error-h1{border-right:1px solid rgba(255,255,255,.3)}}"}}],["$","h1",null,{"className":"next-error-h1","style":{"display":"inline-block","margin":"0 20px 0 0","padding":"0 23px 0 0","fontSize":24,"fontWeight":500,"verticalAlign":"top","lineHeight":"49px"},"children":"404"}],["$","div",null,{"style":{"display":"inline-block"},"children":["$","h2",null,{"style":{"fontSize":14,"fontWeight":400,"lineHeight":"49px","margin":0},"children":"This page could not be found."}]}]]}]}]],"notFoundStyles":[]}]}]}]]}]]}]}]],null],null],["$Lc",null]]]] c:[["$","meta","0",{"name":"viewport","content":"width=device-width, initial-scale=1"}],["$","meta","1",{"charSet":"utf-8"}],["$","title","2",{"children":"FHIR Streaming Service | Docs | VoiceAssist Docs"}],["$","meta","3",{"name":"description","content":"Real-time FHIR data streaming for clinical context enrichment in voice interactions."}],["$","meta","4",{"name":"keywords","content":"VoiceAssist,documentation,medical AI,voice assistant,healthcare,HIPAA,API"}],["$","meta","5",{"name":"robots","content":"index, follow"}],["$","meta","6",{"name":"googlebot","content":"index, follow"}],["$","link","7",{"rel":"canonical","href":"https://assistdocs.asimo.io"}],["$","meta","8",{"property":"og:title","content":"VoiceAssist Documentation"}],["$","meta","9",{"property":"og:description","content":"Comprehensive documentation for VoiceAssist - Enterprise Medical AI Assistant"}],["$","meta","10",{"property":"og:url","content":"https://assistdocs.asimo.io"}],["$","meta","11",{"property":"og:site_name","content":"VoiceAssist Docs"}],["$","meta","12",{"property":"og:type","content":"website"}],["$","meta","13",{"name":"twitter:card","content":"summary"}],["$","meta","14",{"name":"twitter:title","content":"VoiceAssist Documentation"}],["$","meta","15",{"name":"twitter:description","content":"Comprehensive documentation for VoiceAssist - Enterprise Medical AI Assistant"}],["$","meta","16",{"name":"next-size-adjust"}]] 1:null