mod jetstream; pub use jetstream::JetStreamClient; use crate::config::Settings; use crate::observability::Observability; use crate::project::{ProjectionManifest, ProjectionOutput, ProjectionRuntime}; use crate::storage::KvClient; use crate::tenant_placement::TenantPlacement; use crate::types::{ CheckpointKey, EventEnvelope, ProjectionError, StreamSequence, TenantId, ViewId, ViewKey, }; use async_nats::jetstream::consumer::DeliverPolicy; use async_nats::jetstream::AckKind; use futures::StreamExt; use runtime_function::Program; use serde_json::Value as JsonValue; use std::collections::{HashMap, HashSet}; use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use uuid::Uuid; #[derive(Debug, Clone, Default)] pub struct RunOptions { pub consumer_durable_name: Option, pub consumer_filter_subject: Option, pub consumer_deliver_policy: Option, pub tenant_filter: Option, pub view_type_filter: Option>, pub stop_at_sequence: Option, pub shutdown: Option>, pub ready: Option>, pub observability: Option, pub storage: Option, pub tenant_placement: Option, } pub async fn run_projection(settings: Settings) -> Result<(), ProjectionError> { match settings.consumer_mode { crate::config::ConsumerMode::Single => { run_projection_with_options(settings, RunOptions::default()).await } crate::config::ConsumerMode::PerView => run_projection_per_view(settings).await, } } pub async fn run_projection_with_signals( settings: Settings, shutdown: Arc, ready: Arc, observability: Observability, tenant_placement: TenantPlacement, ) -> Result<(), ProjectionError> { match settings.consumer_mode { crate::config::ConsumerMode::Single => { run_projection_with_options( settings, RunOptions { shutdown: Some(shutdown), ready: Some(ready), observability: Some(observability), tenant_placement: Some(tenant_placement), ..RunOptions::default() }, ) .await } crate::config::ConsumerMode::PerView => { run_projection_per_view_with_options( settings, RunOptions { shutdown: Some(shutdown), ready: Some(ready), observability: Some(observability), tenant_placement: Some(tenant_placement), ..RunOptions::default() }, ) .await } } } pub async fn run_projection_per_view(settings: Settings) -> Result<(), ProjectionError> { run_projection_per_view_with_options(settings, RunOptions::default()).await } async fn run_projection_per_view_with_options( settings: Settings, options: RunOptions, ) -> Result<(), ProjectionError> { settings .validate() .map_err(ProjectionError::ValidationError)?; let (manifest, _) = load_manifest_and_programs(&settings)?; let filter_subject = settings .subject_filters .first() .cloned() .unwrap_or_else(|| "tenant.*.aggregate.*.*".to_string()); let shutdown = options.shutdown.clone(); let ready = options.ready.clone(); let observability = options.observability.clone(); let storage = options.storage.clone(); let tenant_placement = options.tenant_placement.clone(); let mut tasks = Vec::new(); if let Some(ready) = &ready { ready.store(true, Ordering::Relaxed); } for def in manifest.all() { let view_type = def.view_type.as_str().to_string(); let durable_name = per_view_durable_name(&settings.durable_name, &view_type); let mut settings = settings.clone(); settings.consumer_mode = crate::config::ConsumerMode::Single; let filter_subject = filter_subject.clone(); let shutdown = shutdown.clone(); let observability = observability.clone(); let storage = storage.clone(); let tenant_placement = tenant_placement.clone(); tasks.push(tokio::spawn(async move { run_projection_with_options( settings, RunOptions { consumer_durable_name: Some(durable_name), consumer_filter_subject: Some(filter_subject), view_type_filter: Some(vec![view_type]), shutdown, ready: None, observability, storage, tenant_placement, ..RunOptions::default() }, ) .await })); } for task in tasks { match task.await { Ok(Ok(())) => {} Ok(Err(e)) => return Err(e), Err(e) => return Err(ProjectionError::StreamError(e.to_string())), } } if let Some(ready) = &ready { ready.store(false, Ordering::Relaxed); } Ok(()) } fn per_view_durable_name(base: &str, view_type: &str) -> String { let mut out = String::with_capacity(base.len() + view_type.len() + 1); out.push_str(base); out.push('_'); for ch in view_type.chars() { if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' { out.push(ch); } else { out.push('_'); } } out } pub async fn run_projection_with_options( settings: Settings, options: RunOptions, ) -> Result<(), ProjectionError> { let mut options = options; settings .validate() .map_err(ProjectionError::ValidationError)?; if options.consumer_filter_subject.is_none() { if let Some(tp) = &options.tenant_placement { if let Some(single) = tp.single_hosted_tenant() { options.consumer_filter_subject = Some(format!("tenant.{}.aggregate.*.*", single.as_str())); } } } let (manifest, programs) = load_manifest_and_programs(&settings)?; let storage = if let Some(storage) = options.storage.clone() { storage } else { KvClient::open(settings.storage_path.clone()) .map_err(|e| ProjectionError::StorageError(e.to_string()))? }; let view_type_filter = options.view_type_filter.as_ref().map(|items| { items .iter() .map(|s| s.to_string()) .collect::>() }); let jetstream = if options.consumer_durable_name.is_some() || options.consumer_filter_subject.is_some() || options.consumer_deliver_policy.is_some() { let durable_name = options .consumer_durable_name .clone() .unwrap_or_else(|| settings.durable_name.clone()); let filter_subject = options .consumer_filter_subject .clone() .or_else(|| settings.subject_filters.first().cloned()) .unwrap_or_else(|| "tenant.*.aggregate.*.*".to_string()); let deliver_policy = options .consumer_deliver_policy .unwrap_or(DeliverPolicy::All); let consumer_options = crate::stream::jetstream::ConsumerOptions { durable_name, filter_subject, deliver_policy, }; JetStreamClient::connect_with(&settings, consumer_options).await? } else { JetStreamClient::connect(&settings).await? }; let stop_at_sequence = options.stop_at_sequence; let runtime = ProjectionRuntime::default(); let observability = options.observability.clone().unwrap_or_default(); let stop_check_enabled = stop_at_sequence.is_some() && options.tenant_filter.is_some(); let mut tick = tokio::time::interval(Duration::from_millis(200)); tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); if let Some(ready) = &options.ready { ready.store(true, Ordering::Relaxed); } let shutdown = options .shutdown .clone() .unwrap_or_else(|| Arc::new(tokio::sync::Notify::new())); if options.shutdown.is_none() { spawn_shutdown_listener(shutdown.clone()); } let mut messages = jetstream.messages().await?; loop { if stop_check_enabled && caught_up(&options, &manifest, &storage, stop_at_sequence.unwrap())? { break; } enum NextMsg { Msg(T), Tick, } let next = tokio::select! { _ = shutdown.notified() => break, msg = messages.next() => NextMsg::Msg(msg), _ = tick.tick(), if stop_check_enabled => NextMsg::Tick, }; let next_msg = match next { NextMsg::Msg(msg) => msg, NextMsg::Tick => continue, }; let Some(msg) = next_msg else { break; }; let msg = match msg { Ok(m) => m, Err(e) => { tracing::error!(error = %e, "JetStream message stream error"); continue; } }; let info = match msg.info() { Ok(i) => i, Err(e) => { tracing::error!(error = %e, "Failed to parse JetStream message info"); let _ = msg.ack().await; continue; } }; let sequence = info.stream_sequence; let delivered = info.delivered; let envelope: EventEnvelope = match serde_json::from_slice(&msg.payload) { Ok(e) => e, Err(e) => { tracing::error!(error = %e, "Failed to decode event envelope"); let _ = msg.ack().await; continue; } }; let tenant_id = resolve_tenant_id(&settings, &envelope); if let Some(filter) = &options.tenant_filter { if filter.as_str() != tenant_id.as_str() { let _ = msg.ack().await; continue; } } if let Some(tp) = &options.tenant_placement { if tp.is_draining(&tenant_id) || !tp.is_hosted(&tenant_id) { let _ = msg.ack_with(AckKind::Term).await; continue; } } let ctx = ProcessContext { settings: &settings, delivered, sequence, tenant_id: &tenant_id, envelope: &envelope, manifest: &manifest, programs: &programs, storage: &storage, observability: &observability, view_type_filter: view_type_filter.as_ref(), }; let runtime = runtime.clone(); let mut backoff_ms = settings.storage_backoff_ms.max(1); let decision = loop { let runtime = runtime.clone(); let result = process_message_with_storage(ctx.clone(), move |current_view, event, program| { let runtime = runtime.clone(); Box::pin( async move { runtime.project_program(current_view, event, program).await }, ) }) .await; match result { Ok(decision) => break decision, Err(ProjectionError::StorageError(e)) => { tracing::error!(error = %e, backoff_ms = backoff_ms, "Storage error, backing off"); let sleep = tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)); tokio::select! { _ = shutdown.notified() => return Ok(()), _ = sleep => {} } backoff_ms = (backoff_ms.saturating_mul(2)).min(settings.storage_backoff_max_ms.max(1)); continue; } Err(e) => return Err(e), } }; match decision { AckDecision::Ack => { if let Err(e) = msg.ack().await { tracing::error!(error = %e, "Ack failed"); } } AckDecision::Term => { if let Err(e) = msg.ack_with(AckKind::Term).await { tracing::error!(error = %e, "Term ack failed"); } } AckDecision::None => {} } } if let Some(ready) = &options.ready { ready.store(false, Ordering::Relaxed); } Ok(()) } fn caught_up( options: &RunOptions, manifest: &ProjectionManifest, storage: &S, target: u64, ) -> Result { let Some(tenant_filter) = &options.tenant_filter else { return Ok(false); }; let target_view_types = options.view_type_filter.as_ref().map_or_else( || { manifest .all() .map(|d| d.view_type.as_str().to_string()) .collect::>() }, |types| types.clone(), ); for view_type in target_view_types { let cp_key = CheckpointKey::new(tenant_filter, &crate::types::ViewType::new(view_type)); let cp = storage.get_checkpoint(&cp_key)?.unwrap_or(0); if cp < target { return Ok(false); } } Ok(true) } fn spawn_shutdown_listener(shutdown: Arc) { tokio::spawn(async move { #[cfg(unix)] { use tokio::signal::unix::{signal, SignalKind}; let mut sigterm = signal(SignalKind::terminate()).ok(); let mut sigint = signal(SignalKind::interrupt()).ok(); tokio::select! { _ = tokio::signal::ctrl_c() => {}, _ = async { if let Some(s) = &mut sigterm { let _ = s.recv().await; } } => {}, _ = async { if let Some(s) = &mut sigint { let _ = s.recv().await; } } => {}, } } #[cfg(not(unix))] { let _ = tokio::signal::ctrl_c().await; } shutdown.notify_waiters(); }); } pub async fn rebuild_view( mut settings: Settings, tenant_id: TenantId, view_type: crate::types::ViewType, start_sequence: u64, ) -> Result<(), ProjectionError> { let storage = KvClient::open(settings.storage_path.clone()) .map_err(|e| ProjectionError::StorageError(e.to_string()))?; storage.delete_view_prefix(&tenant_id, &view_type)?; storage.delete_checkpoint(&CheckpointKey::new(&tenant_id, &view_type))?; let durable_name = format!( "{}_rebuild_{}_{}", settings.durable_name, view_type.as_str(), Uuid::now_v7() ); let filter_subject = if tenant_id.is_empty() { "tenant.*.aggregate.*.*".to_string() } else { format!("tenant.{}.aggregate.*.*", tenant_id.as_str()) }; let jetstream = JetStreamClient::connect_with( &settings, crate::stream::jetstream::ConsumerOptions { durable_name: durable_name.clone(), filter_subject: filter_subject.clone(), deliver_policy: DeliverPolicy::ByStartSequence { start_sequence }, }, ) .await?; let tail = jetstream.stream_last_sequence().await?; settings.subject_filters = vec![filter_subject]; settings.durable_name = durable_name; run_projection_with_options( settings, RunOptions { tenant_filter: Some(tenant_id), view_type_filter: Some(vec![view_type.as_str().to_string()]), stop_at_sequence: Some(tail), consumer_deliver_policy: Some(DeliverPolicy::ByStartSequence { start_sequence }), ..RunOptions::default() }, ) .await } pub async fn backfill_to_tail( mut settings: Settings, tenant_id: TenantId, start_sequence: u64, ) -> Result<(), ProjectionError> { let durable_name = format!("{}_backfill_{}", settings.durable_name, Uuid::now_v7()); let filter_subject = if tenant_id.is_empty() { "tenant.*.aggregate.*.*".to_string() } else { format!("tenant.{}.aggregate.*.*", tenant_id.as_str()) }; let jetstream = JetStreamClient::connect_with( &settings, crate::stream::jetstream::ConsumerOptions { durable_name: durable_name.clone(), filter_subject: filter_subject.clone(), deliver_policy: DeliverPolicy::ByStartSequence { start_sequence }, }, ) .await?; let tail = jetstream.stream_last_sequence().await?; settings.subject_filters = vec![filter_subject]; settings.durable_name = durable_name; run_projection_with_options( settings, RunOptions { tenant_filter: Some(tenant_id), stop_at_sequence: Some(tail), consumer_deliver_policy: Some(DeliverPolicy::ByStartSequence { start_sequence }), ..RunOptions::default() }, ) .await } #[derive(Debug, Clone)] pub struct HealthReport { pub storage_ok: bool, pub nats_ok: bool, pub stream_last_sequence: Option, pub lags: Vec<(String, u64)>, } pub async fn health_report( settings: Settings, tenant_id: TenantId, ) -> Result { let storage = KvClient::open(settings.storage_path.clone()) .map_err(|e| ProjectionError::StorageError(e.to_string()))?; let storage_ok = true; let (manifest, _) = load_manifest_and_programs(&settings)?; let jetstream = JetStreamClient::connect(&settings).await?; let stream_last_sequence = jetstream.stream_last_sequence().await.ok(); let nats_ok = stream_last_sequence.is_some(); let mut lags = Vec::new(); if let Some(last) = stream_last_sequence { for def in manifest.all() { let ck = CheckpointKey::new(&tenant_id, &def.view_type); let cp = storage.get_checkpoint(&ck)?.unwrap_or(0); lags.push((def.view_type.as_str().to_string(), last.saturating_sub(cp))); } } Ok(HealthReport { storage_ok, nats_ok, stream_last_sequence, lags, }) } fn load_manifest_and_programs( settings: &Settings, ) -> Result<(ProjectionManifest, HashMap), ProjectionError> { let manifest_raw = std::fs::read_to_string(&settings.manifest_path) .map_err(|e| ProjectionError::ManifestError(e.to_string()))?; let ext = std::path::Path::new(&settings.manifest_path) .extension() .and_then(|e| e.to_str()) .unwrap_or(""); let manifest = match ext { "yaml" | "yml" => ProjectionManifest::load_from_yaml(&manifest_raw) .map_err(|e| ProjectionError::ManifestError(e.to_string()))?, "json" => ProjectionManifest::load_from_json(&manifest_raw) .map_err(|e| ProjectionError::ManifestError(e.to_string()))?, _ => { return Err(ProjectionError::ManifestError(format!( "Unsupported manifest format: {}", settings.manifest_path ))); } }; manifest.validate()?; let mut programs: HashMap = HashMap::new(); for def in manifest.all() { let raw = std::fs::read_to_string(&def.project_program) .map_err(|e| ProjectionError::ManifestError(e.to_string()))?; let program = runtime_function::Program::from_json(&raw) .map_err(|e| ProjectionError::ManifestError(format!("Program parse error: {}", e)))?; programs.insert(def.view_type.as_str().to_string(), program); } Ok((manifest, programs)) } #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum AckDecision { Ack, Term, None, } trait Storage: Clone + Send + Sync + 'static { fn get_checkpoint( &self, key: &CheckpointKey, ) -> Result, ProjectionError>; fn get_view(&self, key: &ViewKey) -> Result, ProjectionError>; fn commit_view_and_checkpoint_ordered( &self, view_key: &ViewKey, view_value: &JsonValue, checkpoint_key: &CheckpointKey, sequence: StreamSequence, ) -> Result<(), ProjectionError>; fn advance_checkpoint_ordered( &self, key: &CheckpointKey, sequence: StreamSequence, ) -> Result<(), ProjectionError>; fn put_poison(&self, key: &str, value: &JsonValue) -> Result<(), ProjectionError>; } impl Storage for KvClient { fn get_checkpoint( &self, key: &CheckpointKey, ) -> Result, ProjectionError> { self.get_checkpoint(key) } fn get_view(&self, key: &ViewKey) -> Result, ProjectionError> { self.get_view(key) } fn commit_view_and_checkpoint_ordered( &self, view_key: &ViewKey, view_value: &JsonValue, checkpoint_key: &CheckpointKey, sequence: StreamSequence, ) -> Result<(), ProjectionError> { self.commit_view_and_advance_checkpoint_ordered( view_key, view_value, checkpoint_key, sequence, ) } fn advance_checkpoint_ordered( &self, key: &CheckpointKey, sequence: StreamSequence, ) -> Result<(), ProjectionError> { self.advance_checkpoint_ordered(key, sequence) } fn put_poison(&self, key: &str, value: &JsonValue) -> Result<(), ProjectionError> { self.put_json(key, value) } } #[derive(Clone, Copy)] struct ProcessContext<'a, S: Storage> { settings: &'a Settings, delivered: i64, sequence: StreamSequence, tenant_id: &'a TenantId, envelope: &'a EventEnvelope, manifest: &'a ProjectionManifest, programs: &'a HashMap, storage: &'a S, observability: &'a Observability, view_type_filter: Option<&'a HashSet>, } async fn process_message_with_storage<'a, S>( ctx: ProcessContext<'a, S>, mut project: impl for<'b> FnMut( &'b JsonValue, &'b EventEnvelope, &'b Program, ) -> Pin< Box, ProjectionError>> + Send + 'b>, >, ) -> Result where S: Storage, { if ctx.settings.max_deliver > 0 && ctx.delivered > ctx.settings.max_deliver { let key = format!("poison:{}:{}", ctx.tenant_id.as_str(), ctx.sequence); let payload_str = String::from_utf8_lossy(ctx.envelope.payload.to_string().as_bytes()).to_string(); let record = serde_json::json!({ "tenant_id": ctx.tenant_id.as_str(), "sequence": ctx.sequence, "delivered": ctx.delivered, "aggregate_id": ctx.envelope.aggregate_id, "aggregate_type": ctx.envelope.aggregate_type, "event_type": ctx.envelope.event_type, "payload": payload_str, }); ctx.storage.put_poison(&key, &record)?; return Ok(AckDecision::Term); } let correlation_id = ctx.envelope.correlation_id.as_ref().map(|v| v.as_str()); let trace_id = ctx.envelope.trace_id.clone().or_else(|| { ctx.envelope .traceparent .as_deref() .and_then(trace_id_from_traceparent) }); for def in ctx.manifest.all() { let view_type = def.view_type.clone(); if let Some(filter) = ctx.view_type_filter { if !filter.contains(view_type.as_str()) { continue; } } let checkpoint_key = CheckpointKey::new(ctx.tenant_id, &view_type); if let Some(cp) = ctx.storage.get_checkpoint(&checkpoint_key)? { if ctx.sequence <= cp { continue; } } let span = ctx.observability.start_processing_span( view_type.as_str(), ctx.tenant_id.as_str(), correlation_id, trace_id.as_ref().map(|v| v.as_str()), ); let program = ctx .programs .get(view_type.as_str()) .ok_or_else(|| ProjectionError::ManifestError("missing program".to_string()))?; let default_view_id = ViewId::new(ctx.envelope.aggregate_id.clone()); let default_view_key = ViewKey::new(ctx.tenant_id, &view_type, &default_view_id); let current_view = ctx .storage .get_view(&default_view_key)? .unwrap_or_else(|| serde_json::json!({})); let output = match project(¤t_view, ctx.envelope, program).await { Ok(v) => v, Err(e) => { ctx.observability.record_error(&span); tracing::error!(error = %e, "Projection runtime error"); return Ok(AckDecision::None); } }; if let Some(output) = output { let view_id = ViewId::new(output.view_id); let view_key = ViewKey::new(ctx.tenant_id, &view_type, &view_id); if let Err(e) = ctx.storage.commit_view_and_checkpoint_ordered( &view_key, &output.new_view, &checkpoint_key, ctx.sequence, ) { ctx.observability.record_error(&span); tracing::error!(error = %e, "Failed to commit view+checkpoint"); return Ok(AckDecision::None); } } else if let Err(e) = ctx .storage .advance_checkpoint_ordered(&checkpoint_key, ctx.sequence) { ctx.observability.record_error(&span); tracing::error!(error = %e, "Failed to advance checkpoint"); return Ok(AckDecision::None); } ctx.observability.record_processed(&span); } Ok(AckDecision::Ack) } fn trace_id_from_traceparent(traceparent: &str) -> Option { shared::trace_id_from_traceparent(traceparent).map(|s| shared::TraceId::new(s.to_string())) } fn resolve_tenant_id(settings: &Settings, envelope: &EventEnvelope) -> TenantId { if settings.multi_tenant_enabled { if envelope.tenant_id.is_empty() { if let Some(default) = &settings.default_tenant_id { return TenantId::new(default); } } return envelope.tenant_id.clone(); } if let Some(default) = &settings.default_tenant_id { return TenantId::new(default); } TenantId::default() } #[cfg(test)] mod tests { use super::*; use crate::project::ProjectionDefinition; use crate::types::ViewType; fn test_program() -> Program { let program_json = r#" { "specVersion": "1.1", "id": "test", "name": "Test", "inputs": [], "nodes": [ {"id": "const", "type": "Const", "data": {"value": {"view_id": "a1", "new_view": {"ok": true}}}}, {"id": "output", "type": "Output", "data": {}} ], "edges": [ {"id": "e1", "source": "const", "sourceHandle": "out", "target": "output", "targetHandle": "value"} ], "outputNodeId": "output" } "#; serde_json::from_str(program_json).unwrap() } #[derive(Clone, Default)] struct FakeStorage { checkpoint: Option, fail_commit: bool, poison_keys: std::sync::Arc>>, } impl Storage for FakeStorage { fn get_checkpoint( &self, _key: &CheckpointKey, ) -> Result, ProjectionError> { Ok(self.checkpoint) } fn get_view(&self, _key: &ViewKey) -> Result, ProjectionError> { Ok(Some(serde_json::json!({}))) } fn commit_view_and_checkpoint_ordered( &self, _view_key: &ViewKey, _view_value: &JsonValue, _checkpoint_key: &CheckpointKey, _sequence: StreamSequence, ) -> Result<(), ProjectionError> { if self.fail_commit { return Err(ProjectionError::StorageError("commit failed".to_string())); } Ok(()) } fn advance_checkpoint_ordered( &self, _key: &CheckpointKey, _sequence: StreamSequence, ) -> Result<(), ProjectionError> { Ok(()) } fn put_poison(&self, key: &str, _value: &JsonValue) -> Result<(), ProjectionError> { self.poison_keys.lock().unwrap().push(key.to_string()); Ok(()) } } fn test_manifest() -> ProjectionManifest { let mut manifest = ProjectionManifest::new(); manifest.register(ProjectionDefinition { view_type: ViewType::new("User"), project_program: "/tmp/prog".to_string(), }); manifest } #[tokio::test] async fn checkpoint_gate_skips_and_still_acks() { let settings = Settings::default(); let tenant_id = TenantId::new("t1"); let envelope = EventEnvelope { tenant_id: tenant_id.clone(), event_id: None, aggregate_id: "a1".to_string(), aggregate_type: "Account".to_string(), version: None, event_type: "created".to_string(), payload: serde_json::json!({"x": 1}), command_id: None, timestamp: None, correlation_id: None, traceparent: None, trace_id: None, }; let manifest = test_manifest(); let programs = HashMap::from([("User".to_string(), test_program())]); let storage = FakeStorage { checkpoint: Some(10), ..Default::default() }; let obs = Observability::default(); let ctx = ProcessContext { settings: &settings, delivered: 1, sequence: 5, tenant_id: &tenant_id, envelope: &envelope, manifest: &manifest, programs: &programs, storage: &storage, observability: &obs, view_type_filter: None, }; let decision = process_message_with_storage(ctx, |_current, _event, _program| { Box::pin(async move { Ok(Some(ProjectionOutput { view_id: "a1".to_string(), new_view: serde_json::json!({"ok": true}), })) }) }) .await .unwrap(); assert_eq!(decision, AckDecision::Ack); } #[tokio::test] async fn commit_failure_prevents_ack() { let settings = Settings::default(); let tenant_id = TenantId::new("t1"); let envelope = EventEnvelope { tenant_id: tenant_id.clone(), event_id: None, aggregate_id: "a1".to_string(), aggregate_type: "Account".to_string(), version: None, event_type: "created".to_string(), payload: serde_json::json!({"x": 1}), command_id: None, timestamp: None, correlation_id: None, traceparent: None, trace_id: None, }; let manifest = test_manifest(); let programs = HashMap::from([("User".to_string(), test_program())]); let storage = FakeStorage { checkpoint: None, fail_commit: true, ..Default::default() }; let obs = Observability::default(); let ctx = ProcessContext { settings: &settings, delivered: 1, sequence: 11, tenant_id: &tenant_id, envelope: &envelope, manifest: &manifest, programs: &programs, storage: &storage, observability: &obs, view_type_filter: None, }; let decision = process_message_with_storage(ctx, |_current, _event, _program| { Box::pin(async move { Ok(Some(ProjectionOutput { view_id: "a1".to_string(), new_view: serde_json::json!({"ok": true}), })) }) }) .await .unwrap(); assert_eq!(decision, AckDecision::None); } #[tokio::test] async fn poison_policy_terms_after_max_deliver() { let settings = Settings { max_deliver: 2, ..Default::default() }; let tenant_id = TenantId::new("t1"); let envelope = EventEnvelope { tenant_id: tenant_id.clone(), event_id: None, aggregate_id: "a1".to_string(), aggregate_type: "Account".to_string(), version: None, event_type: "created".to_string(), payload: serde_json::json!({"x": 1}), command_id: None, timestamp: None, correlation_id: None, traceparent: None, trace_id: None, }; let manifest = test_manifest(); let programs = HashMap::from([("User".to_string(), test_program())]); let storage = FakeStorage::default(); let obs = Observability::default(); let ctx = ProcessContext { settings: &settings, delivered: 10, sequence: 11, tenant_id: &tenant_id, envelope: &envelope, manifest: &manifest, programs: &programs, storage: &storage, observability: &obs, view_type_filter: None, }; let decision = process_message_with_storage(ctx, |_current, _event, _program| { Box::pin(async move { Ok(Some(ProjectionOutput { view_id: "a1".to_string(), new_view: serde_json::json!({"ok": true}), })) }) }) .await .unwrap(); assert_eq!(decision, AckDecision::Term); assert!(!storage.poison_keys.lock().unwrap().is_empty()); } #[tokio::test] #[ignore] async fn jetstream_redelivery_is_idempotent_with_checkpoint() { let Ok(nats_url) = std::env::var("PROJECTION_TEST_NATS_URL") else { return; }; let id = uuid::Uuid::now_v7().to_string(); let stream_name = format!("projection_test_{}", id); let subject = format!("tenant.t1.aggregate.Account.{}", id); let durable = format!("durable_{}", id); let client = async_nats::connect(&nats_url).await.unwrap(); let jetstream = async_nats::jetstream::new(client); let stream = jetstream .get_or_create_stream(async_nats::jetstream::stream::Config { name: stream_name.clone(), subjects: vec![subject.clone()], ..Default::default() }) .await .unwrap(); let consumer = stream .get_or_create_consumer( &durable, async_nats::jetstream::consumer::pull::Config { durable_name: Some(durable.clone()), deliver_policy: async_nats::jetstream::consumer::DeliverPolicy::All, ack_policy: async_nats::jetstream::consumer::AckPolicy::Explicit, ack_wait: std::time::Duration::from_millis(300), max_deliver: 5, filter_subject: subject.clone(), ..Default::default() }, ) .await .unwrap(); let envelope = EventEnvelope { tenant_id: TenantId::new("t1"), event_id: None, aggregate_id: "a1".to_string(), aggregate_type: "Account".to_string(), version: None, event_type: "created".to_string(), payload: serde_json::json!({"x": 1}), command_id: None, timestamp: Some(chrono::Utc::now()), correlation_id: None, traceparent: None, trace_id: None, }; let payload = serde_json::to_vec(&envelope).unwrap(); jetstream .publish(subject.clone(), payload.into()) .await .unwrap() .await .unwrap(); let program_json = r#" { "specVersion": "1.1", "id": "proj", "name": "Projection", "inputs": [ {"name": "current_view", "type": "Any", "required": true}, {"name": "event", "type": "Any", "required": true} ], "nodes": [ {"id": "event", "type": "InputRef", "data": {"input_name": "event"}}, {"id": "expr", "type": "Expr", "data": {"expression": "({ view_id: input.aggregate_id, new_view: input.payload })"}}, {"id": "output", "type": "Output", "data": {}} ], "edges": [ {"id": "e1", "source": "event", "sourceHandle": "out", "target": "expr", "targetHandle": "input"}, {"id": "e2", "source": "expr", "sourceHandle": "out", "target": "output", "targetHandle": "value"} ], "outputNodeId": "output" } "#; let program: Program = serde_json::from_str(program_json).unwrap(); let manifest = test_manifest(); let programs = HashMap::from([("User".to_string(), program)]); let obs = Observability::default(); let runtime = ProjectionRuntime::default(); let settings = Settings { max_deliver: 10, ..Default::default() }; let storage_dir = tempfile::tempdir().unwrap(); let storage_path = storage_dir.path().join("mdbx"); let storage = KvClient::open(storage_path.to_string_lossy().to_string()).unwrap(); #[derive(Clone)] struct FailOnceStorage { inner: KvClient, should_fail: std::sync::Arc, } impl Storage for FailOnceStorage { fn get_checkpoint( &self, key: &CheckpointKey, ) -> Result, ProjectionError> { self.inner.get_checkpoint(key) } fn get_view(&self, key: &ViewKey) -> Result, ProjectionError> { self.inner.get_view(key) } fn commit_view_and_checkpoint_ordered( &self, view_key: &ViewKey, view_value: &JsonValue, checkpoint_key: &CheckpointKey, sequence: StreamSequence, ) -> Result<(), ProjectionError> { if self .should_fail .compare_exchange( true, false, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst, ) .is_ok() { return Err(ProjectionError::StorageError("fail once".to_string())); } self.inner.commit_view_and_advance_checkpoint_ordered( view_key, view_value, checkpoint_key, sequence, ) } fn advance_checkpoint_ordered( &self, key: &CheckpointKey, sequence: StreamSequence, ) -> Result<(), ProjectionError> { self.inner.advance_checkpoint_ordered(key, sequence) } fn put_poison(&self, key: &str, value: &JsonValue) -> Result<(), ProjectionError> { self.inner.put_json(key, value) } } let failing_storage = FailOnceStorage { inner: storage.clone(), should_fail: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true)), }; let mut messages = consumer.messages().await.unwrap(); let first = tokio::time::timeout(std::time::Duration::from_secs(3), messages.next()) .await .unwrap() .unwrap() .unwrap(); let first_info = first.info().unwrap(); let first_seq = first_info.stream_sequence; let first_ctx = ProcessContext { settings: &settings, delivered: first_info.delivered, sequence: first_seq, tenant_id: &envelope.tenant_id, envelope: &envelope, manifest: &manifest, programs: &programs, storage: &failing_storage, observability: &obs, view_type_filter: None, }; let decision = process_message_with_storage(first_ctx, |current_view, event, program| { let runtime = runtime.clone(); Box::pin(async move { runtime.project_program(current_view, event, program).await }) }) .await .unwrap(); assert_eq!(decision, AckDecision::None); let redelivered = tokio::time::timeout(std::time::Duration::from_secs(5), messages.next()) .await .unwrap() .unwrap() .unwrap(); let redelivered_info = redelivered.info().unwrap(); assert_eq!(redelivered_info.stream_sequence, first_seq); assert!(redelivered_info.delivered >= 2); let second_ctx = ProcessContext { settings: &settings, delivered: redelivered_info.delivered, sequence: redelivered_info.stream_sequence, tenant_id: &envelope.tenant_id, envelope: &envelope, manifest: &manifest, programs: &programs, storage: &storage, observability: &obs, view_type_filter: None, }; let decision = process_message_with_storage(second_ctx, |current_view, event, program| { let runtime = runtime.clone(); Box::pin(async move { runtime.project_program(current_view, event, program).await }) }) .await .unwrap(); assert_eq!(decision, AckDecision::Ack); redelivered.ack().await.unwrap(); let checkpoint_key = CheckpointKey::new(&envelope.tenant_id, &ViewType::new("User")); let cp = storage.get_checkpoint(&checkpoint_key).unwrap().unwrap(); assert_eq!(cp, first_seq); } #[tokio::test] #[ignore] async fn rebuild_from_scratch_produces_identical_view() { let Ok(nats_url) = std::env::var("PROJECTION_TEST_NATS_URL") else { return; }; let id = uuid::Uuid::now_v7().to_string(); let stream_name = format!("projection_rebuild_test_{}", id); let subject = format!("tenant.t1.aggregate.Account.{}", id); let filter_subject = subject.clone(); let client = async_nats::connect(&nats_url).await.unwrap(); let jetstream = async_nats::jetstream::new(client); let _stream = jetstream .get_or_create_stream(async_nats::jetstream::stream::Config { name: stream_name.clone(), subjects: vec![subject.clone()], ..Default::default() }) .await .unwrap(); let n = 50usize; for i in 0..n { let envelope = EventEnvelope { tenant_id: TenantId::new("t1"), event_id: None, aggregate_id: format!("a{}", i), aggregate_type: "Account".to_string(), version: None, event_type: "tick".to_string(), payload: serde_json::json!({"i": i}), command_id: None, timestamp: Some(chrono::Utc::now()), correlation_id: None, traceparent: None, trace_id: None, }; let payload = serde_json::to_vec(&envelope).unwrap(); jetstream .publish(subject.clone(), payload.into()) .await .unwrap() .await .unwrap(); } let dir = tempfile::tempdir().unwrap(); let (manifest_path, _) = write_passthrough_manifest(dir.path(), "Counter"); let storage_dir = tempfile::tempdir().unwrap(); let storage_path = storage_dir.path().join("mdbx"); let durable = format!("durable_{}", id); let settings = Settings { nats_url, stream_name, subject_filters: vec![filter_subject.clone()], durable_name: durable.clone(), storage_path: storage_path.to_string_lossy().to_string(), manifest_path: manifest_path.to_string_lossy().to_string(), multi_tenant_enabled: true, default_tenant_id: None, ..Default::default() }; let stream = jetstream.get_stream(&settings.stream_name).await.unwrap(); let mut stream = stream; let tail = stream.info().await.unwrap().state.last_sequence; let shared_storage = KvClient::open(settings.storage_path.clone()).unwrap(); run_projection_with_options( settings.clone(), RunOptions { tenant_filter: Some(TenantId::new("t1")), view_type_filter: Some(vec!["Counter".to_string()]), stop_at_sequence: Some(tail), consumer_durable_name: Some(durable.clone()), consumer_filter_subject: Some(filter_subject.clone()), consumer_deliver_policy: Some(DeliverPolicy::ByStartSequence { start_sequence: 1 }), storage: Some(shared_storage.clone()), ..RunOptions::default() }, ) .await .unwrap(); let mut before = shared_storage .scan_documents_by_prefix(b"view:t1:Counter:", 10_000) .unwrap() .into_iter() .map(|d| d.into_value()) .collect::>(); before.sort_by_key(|v| v["_id"].as_str().unwrap_or("").to_string()); assert_eq!(before.len(), n); shared_storage .delete_view_prefix(&TenantId::new("t1"), &ViewType::new("Counter")) .unwrap(); shared_storage .delete_checkpoint(&CheckpointKey::new( &TenantId::new("t1"), &ViewType::new("Counter"), )) .unwrap(); let rebuild_durable = format!("{}_rebuild_{}", durable, uuid::Uuid::now_v7()); run_projection_with_options( settings.clone(), RunOptions { tenant_filter: Some(TenantId::new("t1")), view_type_filter: Some(vec!["Counter".to_string()]), stop_at_sequence: Some(tail), consumer_durable_name: Some(rebuild_durable), consumer_filter_subject: Some(filter_subject.clone()), consumer_deliver_policy: Some(DeliverPolicy::ByStartSequence { start_sequence: 1 }), storage: Some(shared_storage.clone()), ..RunOptions::default() }, ) .await .unwrap(); let mut after = shared_storage .scan_documents_by_prefix(b"view:t1:Counter:", 10_000) .unwrap() .into_iter() .map(|d| d.into_value()) .collect::>(); after.sort_by_key(|v| v["_id"].as_str().unwrap_or("").to_string()); assert_eq!(after.len(), n); assert_eq!(before, after); } #[tokio::test] #[ignore] async fn rolling_restart_resumes_from_checkpoint_without_duplication() { let Ok(nats_url) = std::env::var("PROJECTION_TEST_NATS_URL") else { return; }; let id = uuid::Uuid::now_v7().to_string(); let stream_name = format!("projection_restart_test_{}", id); let subject = format!("tenant.t1.aggregate.Account.{}", id); let filter_subject = subject.clone(); let client = async_nats::connect(&nats_url).await.unwrap(); let jetstream = async_nats::jetstream::new(client); let _stream = jetstream .get_or_create_stream(async_nats::jetstream::stream::Config { name: stream_name.clone(), subjects: vec![subject.clone()], ..Default::default() }) .await .unwrap(); let n = 60usize; for i in 0..n { let envelope = EventEnvelope { tenant_id: TenantId::new("t1"), event_id: None, aggregate_id: format!("a{}", i), aggregate_type: "Account".to_string(), version: None, event_type: "tick".to_string(), payload: serde_json::json!({"i": i}), command_id: None, timestamp: Some(chrono::Utc::now()), correlation_id: None, traceparent: None, trace_id: None, }; let payload = serde_json::to_vec(&envelope).unwrap(); jetstream .publish(subject.clone(), payload.into()) .await .unwrap() .await .unwrap(); } let dir = tempfile::tempdir().unwrap(); let (manifest_path, _) = write_passthrough_manifest(dir.path(), "Counter"); let storage_dir = tempfile::tempdir().unwrap(); let storage_path = storage_dir.path().join("mdbx"); let durable = format!("durable_{}", id); let settings = Settings { nats_url, stream_name, subject_filters: vec![filter_subject.clone()], durable_name: durable.clone(), storage_path: storage_path.to_string_lossy().to_string(), manifest_path: manifest_path.to_string_lossy().to_string(), multi_tenant_enabled: true, default_tenant_id: None, ..Default::default() }; let stream = jetstream.get_stream(&settings.stream_name).await.unwrap(); let mut stream = stream; let tail = stream.info().await.unwrap().state.last_sequence; let mid = tail / 2; let shared_storage = KvClient::open(settings.storage_path.clone()).unwrap(); run_projection_with_options( settings.clone(), RunOptions { tenant_filter: Some(TenantId::new("t1")), view_type_filter: Some(vec!["Counter".to_string()]), stop_at_sequence: Some(mid), storage: Some(shared_storage.clone()), ..RunOptions::default() }, ) .await .unwrap(); let ck = CheckpointKey::new(&TenantId::new("t1"), &ViewType::new("Counter")); let cp1 = shared_storage.get_checkpoint(&ck).unwrap().unwrap_or(0); assert!(cp1 >= mid); run_projection_with_options( settings.clone(), RunOptions { tenant_filter: Some(TenantId::new("t1")), view_type_filter: Some(vec!["Counter".to_string()]), stop_at_sequence: Some(tail), storage: Some(shared_storage.clone()), ..RunOptions::default() }, ) .await .unwrap(); let cp2 = shared_storage.get_checkpoint(&ck).unwrap().unwrap_or(0); assert!(cp2 >= cp1); let docs = shared_storage .scan_documents_by_prefix(b"view:t1:Counter:", 10_000) .unwrap(); assert_eq!(docs.len(), n); assert_eq!(cp2, tail); } fn write_passthrough_manifest( dir: &std::path::Path, view_type: &str, ) -> (std::path::PathBuf, std::path::PathBuf) { let program_path = dir.join("passthrough.json"); let manifest_path = dir.join("manifest.yaml"); std::fs::write( &program_path, r#" { "specVersion": "1.1", "id": "passthrough", "name": "Passthrough", "inputs": [ {"name": "current_view", "type": "Any", "required": true}, {"name": "event", "type": "Any", "required": true} ], "nodes": [ {"id": "event", "type": "InputRef", "data": {"input_name": "event"}}, {"id": "expr", "type": "Expr", "data": {"expression": "{ view_id: input.aggregate_id, new_view: input.payload }"}}, {"id": "output", "type": "Output", "data": {}} ], "edges": [ {"id": "e1", "source": "event", "sourceHandle": "out", "target": "expr", "targetHandle": "input"}, {"id": "e2", "source": "expr", "sourceHandle": "out", "target": "output", "targetHandle": "value"} ], "outputNodeId": "output" } "#, ) .unwrap(); std::fs::write( &manifest_path, format!( r#" projections: {view_type}: view_type: "{view_type}" project_program: "{}" "#, program_path.to_string_lossy() ), ) .unwrap(); (manifest_path, program_path) } #[tokio::test] #[ignore] async fn scale_out_two_workers_does_not_duplicate_work() { let Ok(nats_url) = std::env::var("PROJECTION_TEST_NATS_URL") else { return; }; let id = uuid::Uuid::now_v7().to_string(); let stream_name = format!("projection_scale_test_{}", id); let subject = format!("tenant.t1.aggregate.Account.{}", id); let filter_subject = subject.clone(); let client = async_nats::connect(&nats_url).await.unwrap(); let jetstream = async_nats::jetstream::new(client); let _stream = jetstream .get_or_create_stream(async_nats::jetstream::stream::Config { name: stream_name.clone(), subjects: vec![subject.clone()], ..Default::default() }) .await .unwrap(); let n = 200usize; for i in 0..n { let envelope = EventEnvelope { tenant_id: TenantId::new("t1"), event_id: None, aggregate_id: format!("a{}", i), aggregate_type: "Account".to_string(), version: None, event_type: "upsert".to_string(), payload: serde_json::json!({"i": i}), command_id: None, timestamp: Some(chrono::Utc::now()), correlation_id: None, traceparent: None, trace_id: None, }; jetstream .publish( subject.clone(), serde_json::to_vec(&envelope).unwrap().into(), ) .await .unwrap() .await .unwrap(); } let dir = tempfile::tempdir().unwrap(); let (manifest_path, _) = write_passthrough_manifest(dir.path(), "Counter"); let storage_dir = tempfile::tempdir().unwrap(); let storage_path = storage_dir.path().join("mdbx"); let durable = format!("durable_{}", id); let settings = Settings { nats_url, stream_name, subject_filters: vec![filter_subject.clone()], durable_name: durable.clone(), storage_path: storage_path.to_string_lossy().to_string(), manifest_path: manifest_path.to_string_lossy().to_string(), multi_tenant_enabled: true, default_tenant_id: None, consumer_mode: crate::config::ConsumerMode::Single, ..Default::default() }; let shared_storage = KvClient::open(settings.storage_path.clone()).unwrap(); let jetstream_client = JetStreamClient::connect_with( &settings, crate::stream::jetstream::ConsumerOptions { durable_name: durable.clone(), filter_subject: filter_subject.clone(), deliver_policy: DeliverPolicy::All, }, ) .await .unwrap(); let tail = jetstream_client.stream_last_sequence().await.unwrap(); let opts = RunOptions { tenant_filter: Some(TenantId::new("t1")), view_type_filter: Some(vec!["Counter".to_string()]), stop_at_sequence: Some(tail), consumer_durable_name: Some(durable.clone()), consumer_filter_subject: Some(filter_subject.clone()), storage: Some(shared_storage.clone()), ..RunOptions::default() }; let t1 = tokio::spawn(run_projection_with_options(settings.clone(), opts.clone())); let t2 = tokio::spawn(run_projection_with_options(settings.clone(), opts)); t1.await.unwrap().unwrap(); t2.await.unwrap().unwrap(); let docs = shared_storage .scan_documents_by_prefix(b"view:t1:Counter:", 10_000) .unwrap(); assert_eq!(docs.len(), n); let ck = CheckpointKey::new(&TenantId::new("t1"), &ViewType::new("Counter")); let cp = shared_storage.get_checkpoint(&ck).unwrap().unwrap_or(0); assert_eq!(cp, tail); } #[tokio::test] #[ignore] async fn rolling_restart_with_two_workers_preserves_correctness() { let Ok(nats_url) = std::env::var("PROJECTION_TEST_NATS_URL") else { return; }; let id = uuid::Uuid::now_v7().to_string(); let stream_name = format!("projection_scale_restart_test_{}", id); let subject = format!("tenant.t1.aggregate.Account.{}", id); let filter_subject = subject.clone(); let client = async_nats::connect(&nats_url).await.unwrap(); let jetstream = async_nats::jetstream::new(client); let _stream = jetstream .get_or_create_stream(async_nats::jetstream::stream::Config { name: stream_name.clone(), subjects: vec![subject.clone()], ..Default::default() }) .await .unwrap(); let n = 150usize; for i in 0..n { let envelope = EventEnvelope { tenant_id: TenantId::new("t1"), event_id: None, aggregate_id: format!("a{}", i), aggregate_type: "Account".to_string(), version: None, event_type: "upsert".to_string(), payload: serde_json::json!({"i": i}), command_id: None, timestamp: Some(chrono::Utc::now()), correlation_id: None, traceparent: None, trace_id: None, }; jetstream .publish( subject.clone(), serde_json::to_vec(&envelope).unwrap().into(), ) .await .unwrap() .await .unwrap(); } let dir = tempfile::tempdir().unwrap(); let (manifest_path, _) = write_passthrough_manifest(dir.path(), "Counter"); let storage_dir = tempfile::tempdir().unwrap(); let storage_path = storage_dir.path().join("mdbx"); let durable = format!("durable_{}", id); let settings = Settings { nats_url, stream_name, subject_filters: vec![filter_subject.clone()], durable_name: durable.clone(), storage_path: storage_path.to_string_lossy().to_string(), manifest_path: manifest_path.to_string_lossy().to_string(), multi_tenant_enabled: true, default_tenant_id: None, consumer_mode: crate::config::ConsumerMode::Single, ..Default::default() }; let shared_storage = KvClient::open(settings.storage_path.clone()).unwrap(); let jetstream_client = JetStreamClient::connect_with( &settings, crate::stream::jetstream::ConsumerOptions { durable_name: durable.clone(), filter_subject: filter_subject.clone(), deliver_policy: DeliverPolicy::All, }, ) .await .unwrap(); let tail = jetstream_client.stream_last_sequence().await.unwrap(); let shutdown1 = Arc::new(tokio::sync::Notify::new()); let shutdown2 = Arc::new(tokio::sync::Notify::new()); let opts1 = RunOptions { tenant_filter: Some(TenantId::new("t1")), view_type_filter: Some(vec!["Counter".to_string()]), stop_at_sequence: None, consumer_durable_name: Some(durable.clone()), consumer_filter_subject: Some(filter_subject.clone()), shutdown: Some(shutdown1.clone()), storage: Some(shared_storage.clone()), ..RunOptions::default() }; let opts2 = RunOptions { tenant_filter: Some(TenantId::new("t1")), view_type_filter: Some(vec!["Counter".to_string()]), stop_at_sequence: Some(tail), consumer_durable_name: Some(durable.clone()), consumer_filter_subject: Some(filter_subject.clone()), shutdown: Some(shutdown2.clone()), storage: Some(shared_storage.clone()), ..RunOptions::default() }; let t1 = tokio::spawn(run_projection_with_options(settings.clone(), opts1)); let t2 = tokio::spawn(run_projection_with_options(settings.clone(), opts2)); tokio::time::sleep(std::time::Duration::from_millis(50)).await; shutdown1.notify_waiters(); t1.await.unwrap().unwrap(); t2.await.unwrap().unwrap(); let docs = shared_storage .scan_documents_by_prefix(b"view:t1:Counter:", 10_000) .unwrap(); assert_eq!(docs.len(), n); let ck = CheckpointKey::new(&TenantId::new("t1"), &ViewType::new("Counter")); let cp = shared_storage.get_checkpoint(&ck).unwrap().unwrap_or(0); assert_eq!(cp, tail); } #[test] fn per_view_durable_name_sanitizes() { assert_eq!(per_view_durable_name("base", "User"), "base_User"); assert_eq!(per_view_durable_name("base", "My View"), "base_My_View"); assert_eq!(per_view_durable_name("base", "a/b"), "base_a_b"); } }