use async_nats::jetstream::{ self, consumer::pull::Config as PullConfig, consumer::AckPolicy, stream::Config as StreamConfig, }; use futures::StreamExt; use runner::config::Settings; use runner::effects::run_effect_worker; use runner::observability::Metrics; use runner::outbox::OutboxRelay; use runner::saga::{run_saga_worker, SagaPrograms, SagaRuntime}; use runner::storage::KvClient; use runner::stream::JetStreamClient; use runner::types::{ AggregateEventEnvelope, CheckpointKey, CommandId, EffectName, EffectResultEnvelope, EffectResultType, MessageMetadata, SagaName, TenantId, }; use serde_json::json; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; use uuid::Uuid; fn nats_url_or_skip() -> Option { std::env::var("RUNNER_TEST_NATS_URL").ok() } fn unique_suffix() -> String { Uuid::now_v7().simple().to_string() } fn open_temp_storage(dir: &std::path::Path) -> KvClient { let path = dir.join("runner.mdbx").to_string_lossy().to_string(); KvClient::open(path).unwrap() } fn tenant_from_suffix(suffix: &str) -> TenantId { TenantId::new(format!("t{}", suffix)) } fn command_from_suffix(suffix: &str) -> CommandId { CommandId::new(format!("c{}", suffix)) } fn write_fixture_saga_files( dir: &std::path::Path, tenant_id: &TenantId, command_id: &CommandId, correlation_id: &str, ) -> (String, String) { let program_path = dir.join("saga_on_event.json"); let manifest_path = dir.join("sagas.yaml"); let template = r#" { "specVersion": "1.1", "id": "e2e_saga", "name": "e2e_saga", "inputs": [ { "name": "saga_state", "type": "Any" }, { "name": "event", "type": "Any" } ], "nodes": [ { "id": "const", "type": "Const", "data": { "value": { "new_saga_state": {}, "work_items": [ { "kind": "effect_command", "tenant_id": "__TENANT_ID__", "command_id": "__COMMAND_ID__", "effect_name": "noop", "payload": { "ok": true }, "metadata": { "correlation_id": "__CORRELATION_ID__" } } ], "schedules": [] } } }, { "id": "output", "type": "Output", "data": {} } ], "edges": [ { "id": "e1", "source": "const", "sourceHandle": "out", "target": "output", "targetHandle": "value" } ], "outputNodeId": "output" } "#; let program = template .replace("__TENANT_ID__", tenant_id.as_str()) .replace("__COMMAND_ID__", command_id.as_str()) .replace("__CORRELATION_ID__", correlation_id); std::fs::write(&program_path, program).unwrap(); std::fs::write( &manifest_path, format!( r#" sagas: - name: noop trigger_subjects: ["tenant.*.aggregate.*.*"] on_event: "{}" "#, program_path.to_string_lossy() ), ) .unwrap(); ( manifest_path.to_string_lossy().to_string(), program_path.to_string_lossy().to_string(), ) } fn write_fixture_effects_manifest(dir: &std::path::Path) -> String { let path = dir.join("effects.yaml"); std::fs::write( &path, r#" effects: - name: noop provider: noop config: {} "#, ) .unwrap(); path.to_string_lossy().to_string() } async fn ensure_base_streams(nats_url: &str) { let client = async_nats::connect(nats_url).await.unwrap(); let js = jetstream::new(client); let _ = js .get_or_create_stream(StreamConfig { name: "AGGREGATE_EVENTS".to_string(), subjects: vec!["tenant.*.aggregate.*.*".to_string()], ..Default::default() }) .await .unwrap(); let _ = js .get_or_create_stream(StreamConfig { name: "WORKFLOW_COMMANDS".to_string(), subjects: vec![ "tenant.*.effect.*.*".to_string(), "tenant.*.workflow.*.*".to_string(), ], ..Default::default() }) .await .unwrap(); let _ = js .get_or_create_stream(StreamConfig { name: "WORKFLOW_EVENTS".to_string(), subjects: vec![ "tenant.*.effect_result.*.*".to_string(), "tenant.*.workflow_event.*.*".to_string(), ], ..Default::default() }) .await .unwrap(); } async fn wait_for_effect_result( nats_url: &str, settings: &Settings, consumer_name: String, subject_filter: String, ) -> EffectResultEnvelope { let client = async_nats::connect(nats_url).await.unwrap(); let js = jetstream::new(client); let stream = js .get_stream(&settings.workflow_events_stream) .await .unwrap(); let durable_name = consumer_name.clone(); let consumer = stream .get_or_create_consumer( consumer_name.as_str(), PullConfig { durable_name: Some(durable_name), ack_policy: AckPolicy::Explicit, filter_subject: subject_filter, ..Default::default() }, ) .await .unwrap(); let mut messages = consumer.messages().await.unwrap(); let msg = tokio::time::timeout(Duration::from_secs(10), messages.next()) .await .unwrap() .unwrap() .unwrap(); let _ = msg.ack().await; serde_json::from_slice(&msg.payload).unwrap() } async fn count_messages( nats_url: &str, stream_name: String, consumer_name: String, subject_filter: String, max: usize, duration: Duration, ) -> usize { let client = async_nats::connect(nats_url).await.unwrap(); let js = jetstream::new(client); let stream = js.get_stream(&stream_name).await.unwrap(); let consumer = stream .get_or_create_consumer( consumer_name.as_str(), PullConfig { durable_name: Some(consumer_name.to_string()), ack_policy: AckPolicy::Explicit, filter_subject: subject_filter, ..Default::default() }, ) .await .unwrap(); let mut messages = consumer.messages().await.unwrap(); let start = tokio::time::Instant::now(); let mut count = 0usize; while count < max && start.elapsed() < duration { let next = tokio::time::timeout(Duration::from_millis(500), messages.next()).await; match next { Ok(Some(Ok(msg))) => { count += 1; let _ = msg.ack().await; } Ok(Some(Err(_))) => {} Ok(None) => break, Err(_) => {} } } count } #[test] #[ignore] fn integration_full_happy_path_workflow() { let Some(nats_url) = nats_url_or_skip() else { return; }; let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { let tmp = tempfile::tempdir().unwrap(); let suffix = unique_suffix(); let tenant = tenant_from_suffix(&suffix); let command_id = command_from_suffix(&suffix); let correlation_id = format!("corr_{}", suffix); let (saga_manifest_path, _) = write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id); let effects_manifest_path = write_fixture_effects_manifest(tmp.path()); let settings = Settings { nats_url: nats_url.clone(), consumer_durable_prefix: format!("runner_{}", suffix), saga_manifest_path, effects_manifest_path, saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())], effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())], ..Default::default() }; ensure_base_streams(&nats_url).await; let metrics = Arc::new(Metrics::default()); let storage = open_temp_storage(tmp.path()); let shutdown = Arc::new(tokio::sync::Notify::new()); let draining = Arc::new(AtomicBool::new(false)); let tenant_gate = Arc::new(runner::tenant_placement::TenantGate::new(None)); let programs = Arc::new(SagaPrograms::load(&settings).unwrap()); let runtime = SagaRuntime::default(); let saga_task = tokio::spawn(run_saga_worker( settings.clone(), storage.clone(), programs, runtime.clone(), metrics.clone(), tenant_gate.clone(), None, shutdown.clone(), draining.clone(), )); let js = JetStreamClient::connect(&settings).await.unwrap(); let outbox_task = tokio::spawn(OutboxRelay.run( settings.clone(), storage.clone(), js, metrics.clone(), tenant_gate.clone(), shutdown.clone(), draining.clone(), )); let effect_task = tokio::spawn(run_effect_worker( settings.clone(), storage.clone(), metrics.clone(), tenant_gate.clone(), None, Arc::new(tokio::sync::Notify::new()), shutdown.clone(), draining.clone(), )); let event = AggregateEventEnvelope { tenant_id: tenant.clone(), event_id: Uuid::now_v7(), aggregate_id: "a1".to_string(), aggregate_type: "Account".to_string(), version: 1, event_type: "Created".to_string(), payload: json!({"correlation_id": correlation_id}), command_id: Uuid::now_v7(), timestamp: chrono::Utc::now(), }; let client = async_nats::connect(&nats_url).await.unwrap(); let js_ctx = jetstream::new(client); let payload = serde_json::to_vec(&event).unwrap(); js_ctx .publish( format!("tenant.{}.aggregate.Account.a1", tenant.as_str()), payload.into(), ) .await .unwrap(); let result = wait_for_effect_result( &nats_url, &settings, format!("e2e_result_{}", suffix), format!( "tenant.{}.effect_result.noop.{}", tenant.as_str(), command_id.as_str() ), ) .await; assert_eq!(result.tenant_id, tenant); assert_eq!(result.command_id, command_id); assert_eq!(result.effect_name, EffectName::new("noop")); assert_eq!(result.result_type, EffectResultType::Succeeded); assert_eq!( result.metadata.correlation_id.as_ref().map(|v| v.as_str()), Some(correlation_id.as_str()) ); draining.store(true, std::sync::atomic::Ordering::Relaxed); shutdown.notify_waiters(); let _ = tokio::time::timeout(Duration::from_secs(2), saga_task).await; let _ = tokio::time::timeout(Duration::from_secs(2), outbox_task).await; let _ = tokio::time::timeout(Duration::from_secs(2), effect_task).await; }); } #[test] #[ignore] fn integration_crash_restart_preserves_atomicity() { let Some(nats_url) = nats_url_or_skip() else { return; }; let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { let tmp = tempfile::tempdir().unwrap(); let suffix = unique_suffix(); let tenant = tenant_from_suffix(&suffix); let command_id = command_from_suffix(&suffix); let correlation_id = format!("corr_{}", suffix); let (saga_manifest_path, _) = write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id); let effects_manifest_path = write_fixture_effects_manifest(tmp.path()); let settings = Settings { nats_url: nats_url.clone(), consumer_durable_prefix: format!("runner_{}", suffix), saga_manifest_path, effects_manifest_path, saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())], effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())], ..Default::default() }; ensure_base_streams(&nats_url).await; let metrics = Arc::new(Metrics::default()); let storage = open_temp_storage(tmp.path()); let event = AggregateEventEnvelope { tenant_id: tenant.clone(), event_id: Uuid::now_v7(), aggregate_id: "a1".to_string(), aggregate_type: "Account".to_string(), version: 1, event_type: "Created".to_string(), payload: json!({"correlation_id": correlation_id}), command_id: Uuid::now_v7(), timestamp: chrono::Utc::now(), }; let client = async_nats::connect(&nats_url).await.unwrap(); let js_ctx = jetstream::new(client); let payload = serde_json::to_vec(&event).unwrap(); js_ctx .publish( format!("tenant.{}.aggregate.Account.a1", tenant.as_str()), payload.into(), ) .await .unwrap(); let mut crash_settings = settings.clone(); crash_settings.test_saga_crash_after_commit = true; crash_settings.test_outbox_crash_after_dispatch = true; crash_settings.test_effect_crash_after_dedupe_before_ack = true; let shutdown1 = Arc::new(tokio::sync::Notify::new()); let draining1 = Arc::new(AtomicBool::new(false)); let tenant_gate1 = Arc::new(runner::tenant_placement::TenantGate::new(None)); let programs = Arc::new(SagaPrograms::load(&crash_settings).unwrap()); let runtime = SagaRuntime::default(); let saga_task = tokio::spawn(run_saga_worker( crash_settings.clone(), storage.clone(), programs, runtime.clone(), metrics.clone(), tenant_gate1.clone(), None, shutdown1.clone(), draining1.clone(), )); let js = JetStreamClient::connect(&crash_settings).await.unwrap(); let outbox_task = tokio::spawn(OutboxRelay.run( crash_settings.clone(), storage.clone(), js, metrics.clone(), tenant_gate1.clone(), shutdown1.clone(), draining1.clone(), )); let effect_task = tokio::spawn(run_effect_worker( crash_settings.clone(), storage.clone(), metrics.clone(), tenant_gate1.clone(), None, Arc::new(tokio::sync::Notify::new()), shutdown1.clone(), draining1.clone(), )); let _ = tokio::time::timeout(Duration::from_secs(5), saga_task).await; let _ = tokio::time::timeout(Duration::from_secs(5), outbox_task).await; let _ = tokio::time::timeout(Duration::from_secs(5), effect_task).await; let shutdown2 = Arc::new(tokio::sync::Notify::new()); let draining2 = Arc::new(AtomicBool::new(false)); let tenant_gate2 = Arc::new(runner::tenant_placement::TenantGate::new(None)); let programs = Arc::new(SagaPrograms::load(&settings).unwrap()); let saga_task = tokio::spawn(run_saga_worker( settings.clone(), storage.clone(), programs, runtime.clone(), metrics.clone(), tenant_gate2.clone(), None, shutdown2.clone(), draining2.clone(), )); let js = JetStreamClient::connect(&settings).await.unwrap(); let outbox_task = tokio::spawn(OutboxRelay.run( settings.clone(), storage.clone(), js, metrics.clone(), tenant_gate2.clone(), shutdown2.clone(), draining2.clone(), )); let effect_task = tokio::spawn(run_effect_worker( settings.clone(), storage.clone(), metrics.clone(), tenant_gate2.clone(), None, Arc::new(tokio::sync::Notify::new()), shutdown2.clone(), draining2.clone(), )); let result = wait_for_effect_result( &nats_url, &settings, format!("e2e_result_{}", suffix), format!( "tenant.{}.effect_result.noop.{}", tenant.as_str(), command_id.as_str() ), ) .await; assert_eq!(result.result_type, EffectResultType::Succeeded); draining2.store(true, std::sync::atomic::Ordering::Relaxed); shutdown2.notify_waiters(); let _ = tokio::time::timeout(Duration::from_secs(2), saga_task).await; let _ = tokio::time::timeout(Duration::from_secs(2), outbox_task).await; let _ = tokio::time::timeout(Duration::from_secs(2), effect_task).await; }); } #[test] #[ignore] fn integration_outbox_restart_does_not_duplicate_effect_command_publish() { let Some(nats_url) = nats_url_or_skip() else { return; }; let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { let suffix = unique_suffix(); let tenant = tenant_from_suffix(&suffix); let command_id = command_from_suffix(&suffix); let settings = Settings { nats_url: nats_url.clone(), consumer_durable_prefix: format!("runner_{}", suffix), test_outbox_crash_after_dispatch: true, effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())], ..Default::default() }; ensure_base_streams(&nats_url).await; let metrics = Arc::new(Metrics::default()); let tmp = tempfile::tempdir().unwrap(); let storage = open_temp_storage(tmp.path()); storage .put_outbox_item( &tenant, "effect", &runner::types::WorkId::new_v7(), &runner::types::WorkItem::EffectCommand(runner::types::EffectCommandEnvelope { tenant_id: tenant.clone(), command_id: command_id.clone(), effect_name: EffectName::new("noop"), payload: json!({"ok": true}), metadata: MessageMetadata::default(), }), ) .unwrap(); let client = async_nats::connect(&nats_url).await.unwrap(); let js_ctx = jetstream::new(client); let stream = js_ctx .get_stream(&settings.workflow_commands_stream) .await .unwrap(); let consumer = stream .get_or_create_consumer( format!("cmd_count_{}", suffix).as_str(), PullConfig { durable_name: Some(format!("cmd_count_{}", suffix)), ack_policy: AckPolicy::Explicit, filter_subject: format!( "tenant.{}.effect.noop.{}", tenant.as_str(), command_id.as_str() ), ..Default::default() }, ) .await .unwrap(); let shutdown1 = Arc::new(tokio::sync::Notify::new()); let draining1 = Arc::new(AtomicBool::new(false)); let tenant_gate1 = Arc::new(runner::tenant_placement::TenantGate::new(None)); let js = JetStreamClient::connect(&settings).await.unwrap(); let outbox_task = tokio::spawn(OutboxRelay.run( settings.clone(), storage.clone(), js, metrics.clone(), tenant_gate1.clone(), shutdown1.clone(), draining1.clone(), )); let _ = tokio::time::timeout(Duration::from_secs(5), outbox_task).await; let mut restart_settings = settings.clone(); restart_settings.test_outbox_crash_after_dispatch = false; let shutdown2 = Arc::new(tokio::sync::Notify::new()); let draining2 = Arc::new(AtomicBool::new(false)); let tenant_gate2 = Arc::new(runner::tenant_placement::TenantGate::new(None)); let js = JetStreamClient::connect(&restart_settings).await.unwrap(); let outbox_task = tokio::spawn(OutboxRelay.run( restart_settings, storage.clone(), js, metrics.clone(), tenant_gate2.clone(), shutdown2.clone(), draining2.clone(), )); let mut messages = consumer.messages().await.unwrap(); let mut count = 0usize; let start = tokio::time::Instant::now(); while start.elapsed() < Duration::from_secs(5) { if let Ok(Some(msg)) = tokio::time::timeout(Duration::from_millis(500), messages.next()).await { let msg = msg.unwrap(); count += 1; let _ = msg.ack().await; } } draining2.store(true, std::sync::atomic::Ordering::Relaxed); shutdown2.notify_waiters(); let _ = tokio::time::timeout(Duration::from_secs(2), outbox_task).await; assert_eq!(count, 1); }); } #[test] #[ignore] fn integration_saga_redelivery_does_not_duplicate_effect_command_publish() { let Some(nats_url) = nats_url_or_skip() else { return; }; let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { let tmp = tempfile::tempdir().unwrap(); let suffix = unique_suffix(); let tenant = tenant_from_suffix(&suffix); let command_id = command_from_suffix(&suffix); let correlation_id = format!("corr_{}", suffix); let (saga_manifest_path, _) = write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id); let effects_manifest_path = write_fixture_effects_manifest(tmp.path()); let settings = Settings { nats_url: nats_url.clone(), consumer_durable_prefix: format!("runner_{}", suffix), saga_manifest_path, effects_manifest_path, saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())], effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())], ..Default::default() }; ensure_base_streams(&nats_url).await; let metrics = Arc::new(Metrics::default()); let storage = open_temp_storage(tmp.path()); let event = AggregateEventEnvelope { tenant_id: tenant.clone(), event_id: Uuid::now_v7(), aggregate_id: "a1".to_string(), aggregate_type: "Account".to_string(), version: 1, event_type: "Created".to_string(), payload: json!({"correlation_id": correlation_id}), command_id: Uuid::now_v7(), timestamp: chrono::Utc::now(), }; let client = async_nats::connect(&nats_url).await.unwrap(); let js_ctx = jetstream::new(client); let payload = serde_json::to_vec(&event).unwrap(); js_ctx .publish( format!("tenant.{}.aggregate.Account.a1", tenant.as_str()), payload.into(), ) .await .unwrap(); let mut crash_settings = settings.clone(); crash_settings.test_saga_crash_after_commit = true; let shutdown1 = Arc::new(tokio::sync::Notify::new()); let draining1 = Arc::new(AtomicBool::new(false)); let tenant_gate1 = Arc::new(runner::tenant_placement::TenantGate::new(None)); let programs = Arc::new(SagaPrograms::load(&crash_settings).unwrap()); let runtime = SagaRuntime::default(); let saga_task = tokio::spawn(run_saga_worker( crash_settings.clone(), storage.clone(), programs, runtime.clone(), metrics.clone(), tenant_gate1.clone(), None, shutdown1.clone(), draining1.clone(), )); let _ = tokio::time::timeout(Duration::from_secs(5), saga_task).await; let shutdown2 = Arc::new(tokio::sync::Notify::new()); let draining2 = Arc::new(AtomicBool::new(false)); let tenant_gate2 = Arc::new(runner::tenant_placement::TenantGate::new(None)); let programs = Arc::new(SagaPrograms::load(&settings).unwrap()); let saga_task = tokio::spawn(run_saga_worker( settings.clone(), storage.clone(), programs, runtime.clone(), metrics.clone(), tenant_gate2.clone(), None, shutdown2.clone(), draining2.clone(), )); let js = JetStreamClient::connect(&settings).await.unwrap(); let outbox_task = tokio::spawn(OutboxRelay.run( settings.clone(), storage.clone(), js, metrics.clone(), tenant_gate2.clone(), shutdown2.clone(), draining2.clone(), )); let count = count_messages( &nats_url, settings.workflow_commands_stream.clone(), format!("saga_cmd_count_{}", suffix), format!( "tenant.{}.effect.noop.{}", tenant.as_str(), command_id.as_str() ), 10, Duration::from_secs(10), ) .await; draining2.store(true, std::sync::atomic::Ordering::Relaxed); shutdown2.notify_waiters(); let _ = tokio::time::timeout(Duration::from_secs(2), saga_task).await; let _ = tokio::time::timeout(Duration::from_secs(2), outbox_task).await; assert_eq!(count, 1); }); } #[test] #[ignore] fn integration_effect_redelivery_does_not_duplicate_result_publish() { let Some(nats_url) = nats_url_or_skip() else { return; }; let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { let tmp = tempfile::tempdir().unwrap(); let effects_manifest_path = write_fixture_effects_manifest(tmp.path()); let suffix = unique_suffix(); let tenant = tenant_from_suffix(&suffix); let command_id = command_from_suffix(&suffix); let settings = Settings { nats_url: nats_url.clone(), consumer_durable_prefix: format!("runner_{}", suffix), effects_manifest_path, effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())], ..Default::default() }; ensure_base_streams(&nats_url).await; let storage = open_temp_storage(tmp.path()); let metrics = Arc::new(Metrics::default()); let cmd = runner::types::EffectCommandEnvelope { tenant_id: tenant.clone(), command_id: command_id.clone(), effect_name: EffectName::new("noop"), payload: json!({"ok": true}), metadata: MessageMetadata::default(), }; let js = JetStreamClient::connect(&settings).await.unwrap(); js.publish_effect_command(&cmd).await.unwrap(); let mut crash_settings = settings.clone(); crash_settings.test_effect_crash_after_dedupe_before_ack = true; let shutdown1 = Arc::new(tokio::sync::Notify::new()); let draining1 = Arc::new(AtomicBool::new(false)); let tenant_gate1 = Arc::new(runner::tenant_placement::TenantGate::new(None)); let effect_task = tokio::spawn(run_effect_worker( crash_settings.clone(), storage.clone(), metrics.clone(), tenant_gate1.clone(), None, Arc::new(tokio::sync::Notify::new()), shutdown1.clone(), draining1.clone(), )); let _ = tokio::time::timeout(Duration::from_secs(10), effect_task).await; let shutdown2 = Arc::new(tokio::sync::Notify::new()); let draining2 = Arc::new(AtomicBool::new(false)); let tenant_gate2 = Arc::new(runner::tenant_placement::TenantGate::new(None)); let effect_task = tokio::spawn(run_effect_worker( settings.clone(), storage.clone(), metrics.clone(), tenant_gate2.clone(), None, Arc::new(tokio::sync::Notify::new()), shutdown2.clone(), draining2.clone(), )); let count = count_messages( &nats_url, settings.workflow_events_stream.clone(), format!("effect_res_count_{}", suffix), format!( "tenant.{}.effect_result.noop.{}", tenant.as_str(), command_id.as_str() ), 10, Duration::from_secs(10), ) .await; draining2.store(true, std::sync::atomic::Ordering::Relaxed); shutdown2.notify_waiters(); let _ = tokio::time::timeout(Duration::from_secs(2), effect_task).await; assert_eq!(count, 1); }); } #[test] #[ignore] fn integration_scale_out_two_saga_replicas_no_duplicate_outbox() { let Some(nats_url) = nats_url_or_skip() else { return; }; let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { let tmp = tempfile::tempdir().unwrap(); let suffix = unique_suffix(); let tenant = tenant_from_suffix(&suffix); let command_id = command_from_suffix(&suffix); let correlation_id = format!("corr_{}", suffix); let (saga_manifest_path, _) = write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id); let settings = Settings { nats_url: nats_url.clone(), consumer_durable_prefix: format!("shared_{}", suffix), saga_manifest_path, saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())], ..Default::default() }; ensure_base_streams(&nats_url).await; let metrics = Arc::new(Metrics::default()); let storage = open_temp_storage(tmp.path()); let shutdown = Arc::new(tokio::sync::Notify::new()); let draining = Arc::new(AtomicBool::new(false)); let tenant_gate = Arc::new(runner::tenant_placement::TenantGate::new(None)); let programs = Arc::new(SagaPrograms::load(&settings).unwrap()); let runtime = SagaRuntime::default(); let saga_a = tokio::spawn(run_saga_worker( settings.clone(), storage.clone(), programs.clone(), runtime.clone(), metrics.clone(), tenant_gate.clone(), None, shutdown.clone(), draining.clone(), )); let saga_b = tokio::spawn(run_saga_worker( settings.clone(), storage.clone(), programs.clone(), runtime.clone(), metrics.clone(), tenant_gate.clone(), None, shutdown.clone(), draining.clone(), )); let publish_count = 50usize; let client = async_nats::connect(&nats_url).await.unwrap(); let js_ctx = jetstream::new(client); let mut max_seq = 0u64; for _ in 0..publish_count { let event = AggregateEventEnvelope { tenant_id: tenant.clone(), event_id: Uuid::now_v7(), aggregate_id: "a1".to_string(), aggregate_type: "Account".to_string(), version: 1, event_type: "Created".to_string(), payload: json!({"correlation_id": correlation_id}), command_id: Uuid::now_v7(), timestamp: chrono::Utc::now(), }; let payload = serde_json::to_vec(&event).unwrap(); let ack = js_ctx .publish( format!("tenant.{}.aggregate.Account.a1", tenant.as_str()), payload.into(), ) .await .unwrap() .await .unwrap(); max_seq = max_seq.max(ack.sequence); } let start = tokio::time::Instant::now(); loop { let outbox = storage.list_outbox_all(10_000).unwrap(); if outbox.len() > publish_count { panic!("outbox duplicates detected: {}", outbox.len()); } if outbox.len() == publish_count { tokio::time::sleep(Duration::from_millis(500)).await; let outbox2 = storage.list_outbox_all(10_000).unwrap(); assert_eq!(outbox2.len(), publish_count); break; } if start.elapsed() > Duration::from_secs(10) { panic!("timed out waiting for outbox items: {}", outbox.len()); } tokio::time::sleep(Duration::from_millis(50)).await; } let checkpoint_key = CheckpointKey::new(&tenant, &SagaName::new("noop")); let start = tokio::time::Instant::now(); loop { let checkpoint = storage .get_checkpoint(&checkpoint_key) .unwrap() .unwrap_or(0); if checkpoint >= max_seq { break; } if start.elapsed() > Duration::from_secs(5) { panic!( "checkpoint did not advance to max_seq (checkpoint={}, max_seq={})", checkpoint, max_seq ); } tokio::time::sleep(Duration::from_millis(50)).await; } draining.store(true, std::sync::atomic::Ordering::Relaxed); shutdown.notify_waiters(); let _ = tokio::time::timeout(Duration::from_secs(2), saga_a).await; let _ = tokio::time::timeout(Duration::from_secs(2), saga_b).await; }); } #[test] #[ignore] fn integration_draining_pauses_processing_and_resume_completes() { let Some(nats_url) = nats_url_or_skip() else { return; }; let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { let tmp = tempfile::tempdir().unwrap(); let suffix = unique_suffix(); let tenant = tenant_from_suffix(&suffix); let command_id = command_from_suffix(&suffix); let correlation_id = format!("corr_{}", suffix); let (saga_manifest_path, _) = write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id); let settings = Settings { nats_url: nats_url.clone(), consumer_durable_prefix: format!("drain_{}", suffix), saga_manifest_path, saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())], ..Default::default() }; ensure_base_streams(&nats_url).await; let metrics = Arc::new(Metrics::default()); let storage = open_temp_storage(tmp.path()); let shutdown = Arc::new(tokio::sync::Notify::new()); let draining = Arc::new(AtomicBool::new(false)); let tenant_gate = Arc::new(runner::tenant_placement::TenantGate::new(None)); let programs = Arc::new(SagaPrograms::load(&settings).unwrap()); let runtime = SagaRuntime::default(); let saga = tokio::spawn(run_saga_worker( settings.clone(), storage.clone(), programs, runtime, metrics.clone(), tenant_gate.clone(), None, shutdown.clone(), draining.clone(), )); let publish_count = 80usize; let client = async_nats::connect(&nats_url).await.unwrap(); let js_ctx = jetstream::new(client); for _ in 0..publish_count { let event = AggregateEventEnvelope { tenant_id: tenant.clone(), event_id: Uuid::now_v7(), aggregate_id: "a1".to_string(), aggregate_type: "Account".to_string(), version: 1, event_type: "Created".to_string(), payload: json!({"correlation_id": correlation_id}), command_id: Uuid::now_v7(), timestamp: chrono::Utc::now(), }; let payload = serde_json::to_vec(&event).unwrap(); js_ctx .publish( format!("tenant.{}.aggregate.Account.a1", tenant.as_str()), payload.into(), ) .await .unwrap(); } let start = tokio::time::Instant::now(); let paused_at; loop { let count = storage.list_outbox_all(10_000).unwrap().len(); if count > 0 { draining.store(true, std::sync::atomic::Ordering::Relaxed); paused_at = count; break; } if start.elapsed() > Duration::from_secs(5) { panic!("no progress before drain"); } tokio::time::sleep(Duration::from_millis(50)).await; } tokio::time::sleep(Duration::from_millis(500)).await; let after = storage.list_outbox_all(10_000).unwrap().len(); assert!(after <= paused_at + 2); tokio::time::sleep(Duration::from_millis(500)).await; let after2 = storage.list_outbox_all(10_000).unwrap().len(); assert_eq!(after2, after); draining.store(false, std::sync::atomic::Ordering::Relaxed); let start = tokio::time::Instant::now(); loop { let count = storage.list_outbox_all(10_000).unwrap().len(); if count == publish_count { break; } if start.elapsed() > Duration::from_secs(10) { panic!("timed out waiting after resume: {}", count); } tokio::time::sleep(Duration::from_millis(50)).await; } shutdown.notify_waiters(); let _ = tokio::time::timeout(Duration::from_secs(2), saga).await; }); }