Files
cloudlysis/runner/tests/e2e_integration.rs
Vlad Durnea 1298d9a3df
Some checks failed
ci / ui (push) Failing after 30s
ci / rust (push) Failing after 2m34s
Monorepo consolidation: workspace, shared types, transport plans, docker/swam assets
2026-03-30 11:40:42 +03:00

1120 lines
38 KiB
Rust

use async_nats::jetstream::{
self, consumer::pull::Config as PullConfig, consumer::AckPolicy, stream::Config as StreamConfig,
};
use futures::StreamExt;
use runner::config::Settings;
use runner::effects::run_effect_worker;
use runner::observability::Metrics;
use runner::outbox::OutboxRelay;
use runner::saga::{run_saga_worker, SagaPrograms, SagaRuntime};
use runner::storage::KvClient;
use runner::stream::JetStreamClient;
use runner::types::{
AggregateEventEnvelope, CheckpointKey, CommandId, EffectName, EffectResultEnvelope,
EffectResultType, MessageMetadata, SagaName, TenantId,
};
use serde_json::json;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
fn nats_url_or_skip() -> Option<String> {
std::env::var("RUNNER_TEST_NATS_URL").ok()
}
fn unique_suffix() -> String {
Uuid::now_v7().simple().to_string()
}
fn open_temp_storage(dir: &std::path::Path) -> KvClient {
let path = dir.join("runner.mdbx").to_string_lossy().to_string();
KvClient::open(path).unwrap()
}
fn tenant_from_suffix(suffix: &str) -> TenantId {
TenantId::new(format!("t{}", suffix))
}
fn command_from_suffix(suffix: &str) -> CommandId {
CommandId::new(format!("c{}", suffix))
}
fn write_fixture_saga_files(
dir: &std::path::Path,
tenant_id: &TenantId,
command_id: &CommandId,
correlation_id: &str,
) -> (String, String) {
let program_path = dir.join("saga_on_event.json");
let manifest_path = dir.join("sagas.yaml");
let template = r#"
{
"specVersion": "1.1",
"id": "e2e_saga",
"name": "e2e_saga",
"inputs": [
{ "name": "saga_state", "type": "Any" },
{ "name": "event", "type": "Any" }
],
"nodes": [
{
"id": "const",
"type": "Const",
"data": {
"value": {
"new_saga_state": {},
"work_items": [
{
"kind": "effect_command",
"tenant_id": "__TENANT_ID__",
"command_id": "__COMMAND_ID__",
"effect_name": "noop",
"payload": { "ok": true },
"metadata": { "correlation_id": "__CORRELATION_ID__" }
}
],
"schedules": []
}
}
},
{ "id": "output", "type": "Output", "data": {} }
],
"edges": [
{ "id": "e1", "source": "const", "sourceHandle": "out", "target": "output", "targetHandle": "value" }
],
"outputNodeId": "output"
}
"#;
let program = template
.replace("__TENANT_ID__", tenant_id.as_str())
.replace("__COMMAND_ID__", command_id.as_str())
.replace("__CORRELATION_ID__", correlation_id);
std::fs::write(&program_path, program).unwrap();
std::fs::write(
&manifest_path,
format!(
r#"
sagas:
- name: noop
trigger_subjects: ["tenant.*.aggregate.*.*"]
on_event: "{}"
"#,
program_path.to_string_lossy()
),
)
.unwrap();
(
manifest_path.to_string_lossy().to_string(),
program_path.to_string_lossy().to_string(),
)
}
fn write_fixture_effects_manifest(dir: &std::path::Path) -> String {
let path = dir.join("effects.yaml");
std::fs::write(
&path,
r#"
effects:
- name: noop
provider: noop
config: {}
"#,
)
.unwrap();
path.to_string_lossy().to_string()
}
async fn ensure_base_streams(nats_url: &str) {
let client = async_nats::connect(nats_url).await.unwrap();
let js = jetstream::new(client);
let _ = js
.get_or_create_stream(StreamConfig {
name: "AGGREGATE_EVENTS".to_string(),
subjects: vec!["tenant.*.aggregate.*.*".to_string()],
..Default::default()
})
.await
.unwrap();
let _ = js
.get_or_create_stream(StreamConfig {
name: "WORKFLOW_COMMANDS".to_string(),
subjects: vec![
"tenant.*.effect.*.*".to_string(),
"tenant.*.workflow.*.*".to_string(),
],
..Default::default()
})
.await
.unwrap();
let _ = js
.get_or_create_stream(StreamConfig {
name: "WORKFLOW_EVENTS".to_string(),
subjects: vec![
"tenant.*.effect_result.*.*".to_string(),
"tenant.*.workflow_event.*.*".to_string(),
],
..Default::default()
})
.await
.unwrap();
}
async fn wait_for_effect_result(
nats_url: &str,
settings: &Settings,
consumer_name: String,
subject_filter: String,
) -> EffectResultEnvelope {
let client = async_nats::connect(nats_url).await.unwrap();
let js = jetstream::new(client);
let stream = js
.get_stream(&settings.workflow_events_stream)
.await
.unwrap();
let durable_name = consumer_name.clone();
let consumer = stream
.get_or_create_consumer(
consumer_name.as_str(),
PullConfig {
durable_name: Some(durable_name),
ack_policy: AckPolicy::Explicit,
filter_subject: subject_filter,
..Default::default()
},
)
.await
.unwrap();
let mut messages = consumer.messages().await.unwrap();
let msg = tokio::time::timeout(Duration::from_secs(10), messages.next())
.await
.unwrap()
.unwrap()
.unwrap();
let _ = msg.ack().await;
serde_json::from_slice(&msg.payload).unwrap()
}
async fn count_messages(
nats_url: &str,
stream_name: String,
consumer_name: String,
subject_filter: String,
max: usize,
duration: Duration,
) -> usize {
let client = async_nats::connect(nats_url).await.unwrap();
let js = jetstream::new(client);
let stream = js.get_stream(&stream_name).await.unwrap();
let consumer = stream
.get_or_create_consumer(
consumer_name.as_str(),
PullConfig {
durable_name: Some(consumer_name.to_string()),
ack_policy: AckPolicy::Explicit,
filter_subject: subject_filter,
..Default::default()
},
)
.await
.unwrap();
let mut messages = consumer.messages().await.unwrap();
let start = tokio::time::Instant::now();
let mut count = 0usize;
while count < max && start.elapsed() < duration {
let next = tokio::time::timeout(Duration::from_millis(500), messages.next()).await;
match next {
Ok(Some(Ok(msg))) => {
count += 1;
let _ = msg.ack().await;
}
Ok(Some(Err(_))) => {}
Ok(None) => break,
Err(_) => {}
}
}
count
}
#[test]
#[ignore]
fn integration_full_happy_path_workflow() {
let Some(nats_url) = nats_url_or_skip() else {
return;
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let tmp = tempfile::tempdir().unwrap();
let suffix = unique_suffix();
let tenant = tenant_from_suffix(&suffix);
let command_id = command_from_suffix(&suffix);
let correlation_id = format!("corr_{}", suffix);
let (saga_manifest_path, _) =
write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id);
let effects_manifest_path = write_fixture_effects_manifest(tmp.path());
let settings = Settings {
nats_url: nats_url.clone(),
consumer_durable_prefix: format!("runner_{}", suffix),
saga_manifest_path,
effects_manifest_path,
saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())],
effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())],
..Default::default()
};
ensure_base_streams(&nats_url).await;
let metrics = Arc::new(Metrics::default());
let storage = open_temp_storage(tmp.path());
let shutdown = Arc::new(tokio::sync::Notify::new());
let draining = Arc::new(AtomicBool::new(false));
let tenant_gate = Arc::new(runner::tenant_placement::TenantGate::new(None));
let programs = Arc::new(SagaPrograms::load(&settings).unwrap());
let runtime = SagaRuntime::default();
let saga_task = tokio::spawn(run_saga_worker(
settings.clone(),
storage.clone(),
programs,
runtime.clone(),
metrics.clone(),
tenant_gate.clone(),
None,
shutdown.clone(),
draining.clone(),
));
let js = JetStreamClient::connect(&settings).await.unwrap();
let outbox_task = tokio::spawn(OutboxRelay.run(
settings.clone(),
storage.clone(),
js,
metrics.clone(),
tenant_gate.clone(),
shutdown.clone(),
draining.clone(),
));
let effect_task = tokio::spawn(run_effect_worker(
settings.clone(),
storage.clone(),
metrics.clone(),
tenant_gate.clone(),
None,
Arc::new(tokio::sync::Notify::new()),
shutdown.clone(),
draining.clone(),
));
let event = AggregateEventEnvelope {
tenant_id: tenant.clone(),
event_id: Uuid::now_v7(),
aggregate_id: "a1".to_string(),
aggregate_type: "Account".to_string(),
version: 1,
event_type: "Created".to_string(),
payload: json!({"correlation_id": correlation_id}),
command_id: Uuid::now_v7(),
timestamp: chrono::Utc::now(),
};
let client = async_nats::connect(&nats_url).await.unwrap();
let js_ctx = jetstream::new(client);
let payload = serde_json::to_vec(&event).unwrap();
js_ctx
.publish(
format!("tenant.{}.aggregate.Account.a1", tenant.as_str()),
payload.into(),
)
.await
.unwrap();
let result = wait_for_effect_result(
&nats_url,
&settings,
format!("e2e_result_{}", suffix),
format!(
"tenant.{}.effect_result.noop.{}",
tenant.as_str(),
command_id.as_str()
),
)
.await;
assert_eq!(result.tenant_id, tenant);
assert_eq!(result.command_id, command_id);
assert_eq!(result.effect_name, EffectName::new("noop"));
assert_eq!(result.result_type, EffectResultType::Succeeded);
assert_eq!(
result.metadata.correlation_id.as_ref().map(|v| v.as_str()),
Some(correlation_id.as_str())
);
draining.store(true, std::sync::atomic::Ordering::Relaxed);
shutdown.notify_waiters();
let _ = tokio::time::timeout(Duration::from_secs(2), saga_task).await;
let _ = tokio::time::timeout(Duration::from_secs(2), outbox_task).await;
let _ = tokio::time::timeout(Duration::from_secs(2), effect_task).await;
});
}
#[test]
#[ignore]
fn integration_crash_restart_preserves_atomicity() {
let Some(nats_url) = nats_url_or_skip() else {
return;
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let tmp = tempfile::tempdir().unwrap();
let suffix = unique_suffix();
let tenant = tenant_from_suffix(&suffix);
let command_id = command_from_suffix(&suffix);
let correlation_id = format!("corr_{}", suffix);
let (saga_manifest_path, _) =
write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id);
let effects_manifest_path = write_fixture_effects_manifest(tmp.path());
let settings = Settings {
nats_url: nats_url.clone(),
consumer_durable_prefix: format!("runner_{}", suffix),
saga_manifest_path,
effects_manifest_path,
saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())],
effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())],
..Default::default()
};
ensure_base_streams(&nats_url).await;
let metrics = Arc::new(Metrics::default());
let storage = open_temp_storage(tmp.path());
let event = AggregateEventEnvelope {
tenant_id: tenant.clone(),
event_id: Uuid::now_v7(),
aggregate_id: "a1".to_string(),
aggregate_type: "Account".to_string(),
version: 1,
event_type: "Created".to_string(),
payload: json!({"correlation_id": correlation_id}),
command_id: Uuid::now_v7(),
timestamp: chrono::Utc::now(),
};
let client = async_nats::connect(&nats_url).await.unwrap();
let js_ctx = jetstream::new(client);
let payload = serde_json::to_vec(&event).unwrap();
js_ctx
.publish(
format!("tenant.{}.aggregate.Account.a1", tenant.as_str()),
payload.into(),
)
.await
.unwrap();
let mut crash_settings = settings.clone();
crash_settings.test_saga_crash_after_commit = true;
crash_settings.test_outbox_crash_after_dispatch = true;
crash_settings.test_effect_crash_after_dedupe_before_ack = true;
let shutdown1 = Arc::new(tokio::sync::Notify::new());
let draining1 = Arc::new(AtomicBool::new(false));
let tenant_gate1 = Arc::new(runner::tenant_placement::TenantGate::new(None));
let programs = Arc::new(SagaPrograms::load(&crash_settings).unwrap());
let runtime = SagaRuntime::default();
let saga_task = tokio::spawn(run_saga_worker(
crash_settings.clone(),
storage.clone(),
programs,
runtime.clone(),
metrics.clone(),
tenant_gate1.clone(),
None,
shutdown1.clone(),
draining1.clone(),
));
let js = JetStreamClient::connect(&crash_settings).await.unwrap();
let outbox_task = tokio::spawn(OutboxRelay.run(
crash_settings.clone(),
storage.clone(),
js,
metrics.clone(),
tenant_gate1.clone(),
shutdown1.clone(),
draining1.clone(),
));
let effect_task = tokio::spawn(run_effect_worker(
crash_settings.clone(),
storage.clone(),
metrics.clone(),
tenant_gate1.clone(),
None,
Arc::new(tokio::sync::Notify::new()),
shutdown1.clone(),
draining1.clone(),
));
let _ = tokio::time::timeout(Duration::from_secs(5), saga_task).await;
let _ = tokio::time::timeout(Duration::from_secs(5), outbox_task).await;
let _ = tokio::time::timeout(Duration::from_secs(5), effect_task).await;
let shutdown2 = Arc::new(tokio::sync::Notify::new());
let draining2 = Arc::new(AtomicBool::new(false));
let tenant_gate2 = Arc::new(runner::tenant_placement::TenantGate::new(None));
let programs = Arc::new(SagaPrograms::load(&settings).unwrap());
let saga_task = tokio::spawn(run_saga_worker(
settings.clone(),
storage.clone(),
programs,
runtime.clone(),
metrics.clone(),
tenant_gate2.clone(),
None,
shutdown2.clone(),
draining2.clone(),
));
let js = JetStreamClient::connect(&settings).await.unwrap();
let outbox_task = tokio::spawn(OutboxRelay.run(
settings.clone(),
storage.clone(),
js,
metrics.clone(),
tenant_gate2.clone(),
shutdown2.clone(),
draining2.clone(),
));
let effect_task = tokio::spawn(run_effect_worker(
settings.clone(),
storage.clone(),
metrics.clone(),
tenant_gate2.clone(),
None,
Arc::new(tokio::sync::Notify::new()),
shutdown2.clone(),
draining2.clone(),
));
let result = wait_for_effect_result(
&nats_url,
&settings,
format!("e2e_result_{}", suffix),
format!(
"tenant.{}.effect_result.noop.{}",
tenant.as_str(),
command_id.as_str()
),
)
.await;
assert_eq!(result.result_type, EffectResultType::Succeeded);
draining2.store(true, std::sync::atomic::Ordering::Relaxed);
shutdown2.notify_waiters();
let _ = tokio::time::timeout(Duration::from_secs(2), saga_task).await;
let _ = tokio::time::timeout(Duration::from_secs(2), outbox_task).await;
let _ = tokio::time::timeout(Duration::from_secs(2), effect_task).await;
});
}
#[test]
#[ignore]
fn integration_outbox_restart_does_not_duplicate_effect_command_publish() {
let Some(nats_url) = nats_url_or_skip() else {
return;
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let suffix = unique_suffix();
let tenant = tenant_from_suffix(&suffix);
let command_id = command_from_suffix(&suffix);
let settings = Settings {
nats_url: nats_url.clone(),
consumer_durable_prefix: format!("runner_{}", suffix),
test_outbox_crash_after_dispatch: true,
effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())],
..Default::default()
};
ensure_base_streams(&nats_url).await;
let metrics = Arc::new(Metrics::default());
let tmp = tempfile::tempdir().unwrap();
let storage = open_temp_storage(tmp.path());
storage
.put_outbox_item(
&tenant,
"effect",
&runner::types::WorkId::new_v7(),
&runner::types::WorkItem::EffectCommand(runner::types::EffectCommandEnvelope {
tenant_id: tenant.clone(),
command_id: command_id.clone(),
effect_name: EffectName::new("noop"),
payload: json!({"ok": true}),
metadata: MessageMetadata::default(),
}),
)
.unwrap();
let client = async_nats::connect(&nats_url).await.unwrap();
let js_ctx = jetstream::new(client);
let stream = js_ctx
.get_stream(&settings.workflow_commands_stream)
.await
.unwrap();
let consumer = stream
.get_or_create_consumer(
format!("cmd_count_{}", suffix).as_str(),
PullConfig {
durable_name: Some(format!("cmd_count_{}", suffix)),
ack_policy: AckPolicy::Explicit,
filter_subject: format!(
"tenant.{}.effect.noop.{}",
tenant.as_str(),
command_id.as_str()
),
..Default::default()
},
)
.await
.unwrap();
let shutdown1 = Arc::new(tokio::sync::Notify::new());
let draining1 = Arc::new(AtomicBool::new(false));
let tenant_gate1 = Arc::new(runner::tenant_placement::TenantGate::new(None));
let js = JetStreamClient::connect(&settings).await.unwrap();
let outbox_task = tokio::spawn(OutboxRelay.run(
settings.clone(),
storage.clone(),
js,
metrics.clone(),
tenant_gate1.clone(),
shutdown1.clone(),
draining1.clone(),
));
let _ = tokio::time::timeout(Duration::from_secs(5), outbox_task).await;
let mut restart_settings = settings.clone();
restart_settings.test_outbox_crash_after_dispatch = false;
let shutdown2 = Arc::new(tokio::sync::Notify::new());
let draining2 = Arc::new(AtomicBool::new(false));
let tenant_gate2 = Arc::new(runner::tenant_placement::TenantGate::new(None));
let js = JetStreamClient::connect(&restart_settings).await.unwrap();
let outbox_task = tokio::spawn(OutboxRelay.run(
restart_settings,
storage.clone(),
js,
metrics.clone(),
tenant_gate2.clone(),
shutdown2.clone(),
draining2.clone(),
));
let mut messages = consumer.messages().await.unwrap();
let mut count = 0usize;
let start = tokio::time::Instant::now();
while start.elapsed() < Duration::from_secs(5) {
if let Ok(Some(msg)) =
tokio::time::timeout(Duration::from_millis(500), messages.next()).await
{
let msg = msg.unwrap();
count += 1;
let _ = msg.ack().await;
}
}
draining2.store(true, std::sync::atomic::Ordering::Relaxed);
shutdown2.notify_waiters();
let _ = tokio::time::timeout(Duration::from_secs(2), outbox_task).await;
assert_eq!(count, 1);
});
}
#[test]
#[ignore]
fn integration_saga_redelivery_does_not_duplicate_effect_command_publish() {
let Some(nats_url) = nats_url_or_skip() else {
return;
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let tmp = tempfile::tempdir().unwrap();
let suffix = unique_suffix();
let tenant = tenant_from_suffix(&suffix);
let command_id = command_from_suffix(&suffix);
let correlation_id = format!("corr_{}", suffix);
let (saga_manifest_path, _) =
write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id);
let effects_manifest_path = write_fixture_effects_manifest(tmp.path());
let settings = Settings {
nats_url: nats_url.clone(),
consumer_durable_prefix: format!("runner_{}", suffix),
saga_manifest_path,
effects_manifest_path,
saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())],
effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())],
..Default::default()
};
ensure_base_streams(&nats_url).await;
let metrics = Arc::new(Metrics::default());
let storage = open_temp_storage(tmp.path());
let event = AggregateEventEnvelope {
tenant_id: tenant.clone(),
event_id: Uuid::now_v7(),
aggregate_id: "a1".to_string(),
aggregate_type: "Account".to_string(),
version: 1,
event_type: "Created".to_string(),
payload: json!({"correlation_id": correlation_id}),
command_id: Uuid::now_v7(),
timestamp: chrono::Utc::now(),
};
let client = async_nats::connect(&nats_url).await.unwrap();
let js_ctx = jetstream::new(client);
let payload = serde_json::to_vec(&event).unwrap();
js_ctx
.publish(
format!("tenant.{}.aggregate.Account.a1", tenant.as_str()),
payload.into(),
)
.await
.unwrap();
let mut crash_settings = settings.clone();
crash_settings.test_saga_crash_after_commit = true;
let shutdown1 = Arc::new(tokio::sync::Notify::new());
let draining1 = Arc::new(AtomicBool::new(false));
let tenant_gate1 = Arc::new(runner::tenant_placement::TenantGate::new(None));
let programs = Arc::new(SagaPrograms::load(&crash_settings).unwrap());
let runtime = SagaRuntime::default();
let saga_task = tokio::spawn(run_saga_worker(
crash_settings.clone(),
storage.clone(),
programs,
runtime.clone(),
metrics.clone(),
tenant_gate1.clone(),
None,
shutdown1.clone(),
draining1.clone(),
));
let _ = tokio::time::timeout(Duration::from_secs(5), saga_task).await;
let shutdown2 = Arc::new(tokio::sync::Notify::new());
let draining2 = Arc::new(AtomicBool::new(false));
let tenant_gate2 = Arc::new(runner::tenant_placement::TenantGate::new(None));
let programs = Arc::new(SagaPrograms::load(&settings).unwrap());
let saga_task = tokio::spawn(run_saga_worker(
settings.clone(),
storage.clone(),
programs,
runtime.clone(),
metrics.clone(),
tenant_gate2.clone(),
None,
shutdown2.clone(),
draining2.clone(),
));
let js = JetStreamClient::connect(&settings).await.unwrap();
let outbox_task = tokio::spawn(OutboxRelay.run(
settings.clone(),
storage.clone(),
js,
metrics.clone(),
tenant_gate2.clone(),
shutdown2.clone(),
draining2.clone(),
));
let count = count_messages(
&nats_url,
settings.workflow_commands_stream.clone(),
format!("saga_cmd_count_{}", suffix),
format!(
"tenant.{}.effect.noop.{}",
tenant.as_str(),
command_id.as_str()
),
10,
Duration::from_secs(10),
)
.await;
draining2.store(true, std::sync::atomic::Ordering::Relaxed);
shutdown2.notify_waiters();
let _ = tokio::time::timeout(Duration::from_secs(2), saga_task).await;
let _ = tokio::time::timeout(Duration::from_secs(2), outbox_task).await;
assert_eq!(count, 1);
});
}
#[test]
#[ignore]
fn integration_effect_redelivery_does_not_duplicate_result_publish() {
let Some(nats_url) = nats_url_or_skip() else {
return;
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let tmp = tempfile::tempdir().unwrap();
let effects_manifest_path = write_fixture_effects_manifest(tmp.path());
let suffix = unique_suffix();
let tenant = tenant_from_suffix(&suffix);
let command_id = command_from_suffix(&suffix);
let settings = Settings {
nats_url: nats_url.clone(),
consumer_durable_prefix: format!("runner_{}", suffix),
effects_manifest_path,
effect_command_subject_filters: vec![format!("tenant.{}.effect.*.*", tenant.as_str())],
..Default::default()
};
ensure_base_streams(&nats_url).await;
let storage = open_temp_storage(tmp.path());
let metrics = Arc::new(Metrics::default());
let cmd = runner::types::EffectCommandEnvelope {
tenant_id: tenant.clone(),
command_id: command_id.clone(),
effect_name: EffectName::new("noop"),
payload: json!({"ok": true}),
metadata: MessageMetadata::default(),
};
let js = JetStreamClient::connect(&settings).await.unwrap();
js.publish_effect_command(&cmd).await.unwrap();
let mut crash_settings = settings.clone();
crash_settings.test_effect_crash_after_dedupe_before_ack = true;
let shutdown1 = Arc::new(tokio::sync::Notify::new());
let draining1 = Arc::new(AtomicBool::new(false));
let tenant_gate1 = Arc::new(runner::tenant_placement::TenantGate::new(None));
let effect_task = tokio::spawn(run_effect_worker(
crash_settings.clone(),
storage.clone(),
metrics.clone(),
tenant_gate1.clone(),
None,
Arc::new(tokio::sync::Notify::new()),
shutdown1.clone(),
draining1.clone(),
));
let _ = tokio::time::timeout(Duration::from_secs(10), effect_task).await;
let shutdown2 = Arc::new(tokio::sync::Notify::new());
let draining2 = Arc::new(AtomicBool::new(false));
let tenant_gate2 = Arc::new(runner::tenant_placement::TenantGate::new(None));
let effect_task = tokio::spawn(run_effect_worker(
settings.clone(),
storage.clone(),
metrics.clone(),
tenant_gate2.clone(),
None,
Arc::new(tokio::sync::Notify::new()),
shutdown2.clone(),
draining2.clone(),
));
let count = count_messages(
&nats_url,
settings.workflow_events_stream.clone(),
format!("effect_res_count_{}", suffix),
format!(
"tenant.{}.effect_result.noop.{}",
tenant.as_str(),
command_id.as_str()
),
10,
Duration::from_secs(10),
)
.await;
draining2.store(true, std::sync::atomic::Ordering::Relaxed);
shutdown2.notify_waiters();
let _ = tokio::time::timeout(Duration::from_secs(2), effect_task).await;
assert_eq!(count, 1);
});
}
#[test]
#[ignore]
fn integration_scale_out_two_saga_replicas_no_duplicate_outbox() {
let Some(nats_url) = nats_url_or_skip() else {
return;
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let tmp = tempfile::tempdir().unwrap();
let suffix = unique_suffix();
let tenant = tenant_from_suffix(&suffix);
let command_id = command_from_suffix(&suffix);
let correlation_id = format!("corr_{}", suffix);
let (saga_manifest_path, _) =
write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id);
let settings = Settings {
nats_url: nats_url.clone(),
consumer_durable_prefix: format!("shared_{}", suffix),
saga_manifest_path,
saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())],
..Default::default()
};
ensure_base_streams(&nats_url).await;
let metrics = Arc::new(Metrics::default());
let storage = open_temp_storage(tmp.path());
let shutdown = Arc::new(tokio::sync::Notify::new());
let draining = Arc::new(AtomicBool::new(false));
let tenant_gate = Arc::new(runner::tenant_placement::TenantGate::new(None));
let programs = Arc::new(SagaPrograms::load(&settings).unwrap());
let runtime = SagaRuntime::default();
let saga_a = tokio::spawn(run_saga_worker(
settings.clone(),
storage.clone(),
programs.clone(),
runtime.clone(),
metrics.clone(),
tenant_gate.clone(),
None,
shutdown.clone(),
draining.clone(),
));
let saga_b = tokio::spawn(run_saga_worker(
settings.clone(),
storage.clone(),
programs.clone(),
runtime.clone(),
metrics.clone(),
tenant_gate.clone(),
None,
shutdown.clone(),
draining.clone(),
));
let publish_count = 50usize;
let client = async_nats::connect(&nats_url).await.unwrap();
let js_ctx = jetstream::new(client);
let mut max_seq = 0u64;
for _ in 0..publish_count {
let event = AggregateEventEnvelope {
tenant_id: tenant.clone(),
event_id: Uuid::now_v7(),
aggregate_id: "a1".to_string(),
aggregate_type: "Account".to_string(),
version: 1,
event_type: "Created".to_string(),
payload: json!({"correlation_id": correlation_id}),
command_id: Uuid::now_v7(),
timestamp: chrono::Utc::now(),
};
let payload = serde_json::to_vec(&event).unwrap();
let ack = js_ctx
.publish(
format!("tenant.{}.aggregate.Account.a1", tenant.as_str()),
payload.into(),
)
.await
.unwrap()
.await
.unwrap();
max_seq = max_seq.max(ack.sequence);
}
let start = tokio::time::Instant::now();
loop {
let outbox = storage.list_outbox_all(10_000).unwrap();
if outbox.len() > publish_count {
panic!("outbox duplicates detected: {}", outbox.len());
}
if outbox.len() == publish_count {
tokio::time::sleep(Duration::from_millis(500)).await;
let outbox2 = storage.list_outbox_all(10_000).unwrap();
assert_eq!(outbox2.len(), publish_count);
break;
}
if start.elapsed() > Duration::from_secs(10) {
panic!("timed out waiting for outbox items: {}", outbox.len());
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
let checkpoint_key = CheckpointKey::new(&tenant, &SagaName::new("noop"));
let start = tokio::time::Instant::now();
loop {
let checkpoint = storage
.get_checkpoint(&checkpoint_key)
.unwrap()
.unwrap_or(0);
if checkpoint >= max_seq {
break;
}
if start.elapsed() > Duration::from_secs(5) {
panic!(
"checkpoint did not advance to max_seq (checkpoint={}, max_seq={})",
checkpoint, max_seq
);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
draining.store(true, std::sync::atomic::Ordering::Relaxed);
shutdown.notify_waiters();
let _ = tokio::time::timeout(Duration::from_secs(2), saga_a).await;
let _ = tokio::time::timeout(Duration::from_secs(2), saga_b).await;
});
}
#[test]
#[ignore]
fn integration_draining_pauses_processing_and_resume_completes() {
let Some(nats_url) = nats_url_or_skip() else {
return;
};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let tmp = tempfile::tempdir().unwrap();
let suffix = unique_suffix();
let tenant = tenant_from_suffix(&suffix);
let command_id = command_from_suffix(&suffix);
let correlation_id = format!("corr_{}", suffix);
let (saga_manifest_path, _) =
write_fixture_saga_files(tmp.path(), &tenant, &command_id, &correlation_id);
let settings = Settings {
nats_url: nats_url.clone(),
consumer_durable_prefix: format!("drain_{}", suffix),
saga_manifest_path,
saga_trigger_subject_filters: vec![format!("tenant.{}.aggregate.*.*", tenant.as_str())],
..Default::default()
};
ensure_base_streams(&nats_url).await;
let metrics = Arc::new(Metrics::default());
let storage = open_temp_storage(tmp.path());
let shutdown = Arc::new(tokio::sync::Notify::new());
let draining = Arc::new(AtomicBool::new(false));
let tenant_gate = Arc::new(runner::tenant_placement::TenantGate::new(None));
let programs = Arc::new(SagaPrograms::load(&settings).unwrap());
let runtime = SagaRuntime::default();
let saga = tokio::spawn(run_saga_worker(
settings.clone(),
storage.clone(),
programs,
runtime,
metrics.clone(),
tenant_gate.clone(),
None,
shutdown.clone(),
draining.clone(),
));
let publish_count = 80usize;
let client = async_nats::connect(&nats_url).await.unwrap();
let js_ctx = jetstream::new(client);
for _ in 0..publish_count {
let event = AggregateEventEnvelope {
tenant_id: tenant.clone(),
event_id: Uuid::now_v7(),
aggregate_id: "a1".to_string(),
aggregate_type: "Account".to_string(),
version: 1,
event_type: "Created".to_string(),
payload: json!({"correlation_id": correlation_id}),
command_id: Uuid::now_v7(),
timestamp: chrono::Utc::now(),
};
let payload = serde_json::to_vec(&event).unwrap();
js_ctx
.publish(
format!("tenant.{}.aggregate.Account.a1", tenant.as_str()),
payload.into(),
)
.await
.unwrap();
}
let start = tokio::time::Instant::now();
let paused_at;
loop {
let count = storage.list_outbox_all(10_000).unwrap().len();
if count > 0 {
draining.store(true, std::sync::atomic::Ordering::Relaxed);
paused_at = count;
break;
}
if start.elapsed() > Duration::from_secs(5) {
panic!("no progress before drain");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
tokio::time::sleep(Duration::from_millis(500)).await;
let after = storage.list_outbox_all(10_000).unwrap().len();
assert!(after <= paused_at + 2);
tokio::time::sleep(Duration::from_millis(500)).await;
let after2 = storage.list_outbox_all(10_000).unwrap().len();
assert_eq!(after2, after);
draining.store(false, std::sync::atomic::Ordering::Relaxed);
let start = tokio::time::Instant::now();
loop {
let count = storage.list_outbox_all(10_000).unwrap().len();
if count == publish_count {
break;
}
if start.elapsed() > Duration::from_secs(10) {
panic!("timed out waiting after resume: {}", count);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
shutdown.notify_waiters();
let _ = tokio::time::timeout(Duration::from_secs(2), saga).await;
});
}