use aggregate::observability::Observability; use aggregate::runtime::RuntimeExecutor; #[cfg(feature = "runtime-v8")] use aggregate::runtime::{execute_apply_program, execute_decide_program}; use aggregate::server::{CommandRequest, HealthChecker}; use aggregate::storage::StorageClient; use aggregate::types::{ AggregateError, AggregateId, AggregateType, Command, Event, TenantId, Version, }; use serde_json::json; use std::time::Duration; use tempfile::TempDir; fn create_test_storage() -> (TempDir, StorageClient) { let dir = TempDir::new().expect("failed to create temp dir"); let path = dir.path().join("test.mdbx"); let storage = StorageClient::open(path.to_string_lossy().to_string()).expect("failed to open storage"); (dir, storage) } #[cfg(feature = "runtime-v8")] fn create_test_decide_program() -> &'static str { r#" function decide(state, command) { if (command.type === "deposit") { return [{ type: "deposited", amount: command.amount }]; } if (command.type === "withdraw") { if (state.balance < command.amount) { return [{ type: "error", message: "insufficient funds" }]; } return [{ type: "withdrawn", amount: command.amount }]; } if (command.type === "open_account") { return [{ type: "account_opened", initial_balance: command.initial_balance || 0 }]; } return []; } "# } #[cfg(feature = "runtime-v8")] fn create_test_apply_program() -> &'static str { r#" function apply(state, event) { if (event.type === "account_opened") { return { balance: event.initial_balance }; } if (event.type === "deposited") { return { balance: (state.balance || 0) + event.amount }; } if (event.type === "withdrawn") { return { balance: state.balance - event.amount }; } return state; } "# } #[test] fn storage_tenant_isolation() { let (_dir, storage) = create_test_storage(); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let tenant_a = TenantId::new("tenant-a"); let tenant_b = TenantId::new("tenant-b"); let aggregate_id = AggregateId::new_v7(); use aggregate::types::Snapshot; let snapshot_a = Snapshot::new( tenant_a.clone(), aggregate_id.clone(), AggregateType::from("Account"), Version::from(1), json!({"balance": 100}), ); storage.put_snapshot(&snapshot_a).await.unwrap(); let result_a = storage .get_snapshot(&tenant_a, &aggregate_id) .await .unwrap(); let result_b = storage .get_snapshot(&tenant_b, &aggregate_id) .await .unwrap(); assert!(result_a.is_some()); assert!(result_b.is_none()); }); } #[test] fn storage_version_conflict() { let (_dir, storage) = create_test_storage(); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let tenant_id = TenantId::new("tenant-a"); let aggregate_id = AggregateId::new_v7(); use aggregate::types::Snapshot; let snapshot_v1 = Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Account"), Version::from(1), json!({"balance": 100}), ); storage.put_snapshot(&snapshot_v1).await.unwrap(); let result = storage.put_snapshot(&snapshot_v1).await; assert!(matches!( result, Err(AggregateError::VersionConflict { .. }) )); }); } #[test] fn storage_latest_version() { let (_dir, storage) = create_test_storage(); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let tenant_id = TenantId::new("tenant-a"); let aggregate_id = AggregateId::new_v7(); let version = storage .get_latest_version(&tenant_id, &aggregate_id) .await .unwrap(); assert!(version.is_none()); use aggregate::types::Snapshot; let snapshot_v1 = Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Account"), Version::from(1), json!({"balance": 100}), ); storage.put_snapshot(&snapshot_v1).await.unwrap(); let version = storage .get_latest_version(&tenant_id, &aggregate_id) .await .unwrap(); assert_eq!(version, Some(Version::from(1))); let snapshot_v3 = Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Account"), Version::from(3), json!({"balance": 300}), ); storage.put_snapshot(&snapshot_v3).await.unwrap(); let version = storage .get_latest_version(&tenant_id, &aggregate_id) .await .unwrap(); assert_eq!(version, Some(Version::from(3))); }); } #[test] fn storage_none_for_nonexistent_aggregate() { let (_dir, storage) = create_test_storage(); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let tenant_id = TenantId::new("tenant-a"); let aggregate_id = AggregateId::new_v7(); let snapshot = storage .get_snapshot(&tenant_id, &aggregate_id) .await .unwrap(); assert!(snapshot.is_none()); }); } #[cfg(feature = "runtime-v8")] #[test] fn runtime_decide_deposit() { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let state = json!({"balance": 100}); let command = json!({"type": "deposit", "amount": 50}); let events = execute_decide_program( &state, &command, create_test_decide_program(), 1_000_000, Duration::from_secs(5), ) .await .unwrap(); assert_eq!(events.len(), 1); assert_eq!(events[0]["type"], "deposited"); assert_eq!(events[0]["amount"], 50); }); } #[cfg(feature = "runtime-v8")] #[test] fn runtime_decide_withdraw_insufficient() { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let state = json!({"balance": 10}); let command = json!({"type": "withdraw", "amount": 100}); let events = execute_decide_program( &state, &command, create_test_decide_program(), 1_000_000, Duration::from_secs(5), ) .await .unwrap(); assert_eq!(events.len(), 1); assert_eq!(events[0]["type"], "error"); }); } #[cfg(feature = "runtime-v8")] #[test] fn runtime_apply_transitions_state() { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let state = json!({"balance": 100}); let event = json!({"type": "deposited", "amount": 50}); let new_state = execute_apply_program( &state, &event, create_test_apply_program(), 1_000_000, Duration::from_secs(5), ) .await .unwrap(); assert_eq!(new_state["balance"], 150); }); } #[cfg(feature = "runtime-v8")] #[test] fn runtime_determinism() { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { let state = json!({"balance": 100}); let command = json!({"type": "deposit", "amount": 50}); let r1 = execute_decide_program( &state, &command, create_test_decide_program(), 1_000_000, Duration::from_secs(5), ) .await .unwrap(); let r2 = execute_decide_program( &state, &command, create_test_decide_program(), 1_000_000, Duration::from_secs(5), ) .await .unwrap(); assert_eq!(r1, r2); }); } #[test] fn command_request_tenant_extraction() { let tenant_id = TenantId::new("acme-corp"); let aggregate_id = AggregateId::new_v7(); let request = CommandRequest::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Account"), json!({"type": "deposit", "amount": 100}), ) .with_header("x-request-id", "req-123") .with_header("x-tenant-id", "override-tenant"); assert_eq!(request.tenant_id, tenant_id); assert_eq!( request.headers.get("x-request-id"), Some(&"req-123".to_string()) ); } #[test] fn health_checker_tracks_state() { let checker = HealthChecker::new(); let status = checker.check(); assert!(status.is_healthy()); assert!(checker.is_ready()); assert!(checker.is_live()); checker.set_storage_healthy(false); checker.set_stream_healthy(false); assert!(!checker.is_ready()); checker.set_storage_healthy(true); checker.set_stream_healthy(true); assert!(checker.is_ready()); } #[test] fn observability_metrics_export() { let obs = Observability::default(); let span = obs.start_command_span("agg-123", "Account", "tenant-a", "deposit", None, None); obs.record_command_success(&span, 2); let metrics = obs.export_metrics(); assert!(metrics.contains("commands_total")); assert!(metrics.contains("command_duration")); } #[test] fn version_increment_and_ordering() { let v0 = Version::initial(); assert_eq!(v0.as_u64(), 0); let v1 = v0.increment(); assert_eq!(v1.as_u64(), 1); assert_eq!(v0.as_u64(), 0); let v2 = v1.increment(); assert_eq!(v2.as_u64(), 2); assert!(v0 < v1); assert!(v1 < v2); } #[test] fn tenant_id_validation() { let valid_ids = vec!["acme-corp", "tenant_123", "my-tenant", "Tenant1"]; let invalid_ids = vec!["tenant@corp", "tenant name", "tenant/id"]; for id in valid_ids { let tenant_id = TenantId::new(id); let chars_valid = tenant_id .as_str() .chars() .all(|c| c.is_alphanumeric() || c == '-' || c == '_'); assert!(chars_valid, "Expected {} to be valid", id); } for id in invalid_ids { let tenant_id = TenantId::new(id); let chars_valid = tenant_id .as_str() .chars() .all(|c| c.is_alphanumeric() || c == '-' || c == '_'); assert!(!chars_valid, "Expected {} to be invalid", id); } } #[test] fn aggregate_id_generation() { let id1 = AggregateId::new_v7(); let id2 = AggregateId::new_v7(); assert_ne!(id1, id2); let display = format!("{}", id1); assert!(!display.is_empty()); } #[test] fn event_creation() { let tenant_id = TenantId::new("tenant-a"); let aggregate_id = AggregateId::new_v7(); let command_id = uuid::Uuid::now_v7(); let event = Event::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Account"), Version::from(1), "deposited".to_string(), json!({"amount": 100}), command_id, ); assert_eq!(event.tenant_id, tenant_id); assert_eq!(event.aggregate_id, aggregate_id); assert_eq!(event.version, Version::from(1)); assert_eq!(event.event_type, "deposited"); } #[test] fn command_creation() { let tenant_id = TenantId::new("tenant-a"); let aggregate_id = AggregateId::new_v7(); let command = Command::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Account"), json!({"type": "deposit", "amount": 100}), ); assert_eq!(command.tenant_id, tenant_id); assert_eq!(command.aggregate_id, aggregate_id); assert_eq!(command.payload["type"], "deposit"); } #[test] fn snapshot_creation() { let tenant_id = TenantId::new("tenant-a"); let aggregate_id = AggregateId::new_v7(); let snapshot = aggregate::types::Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Account"), Version::from(5), json!({"balance": 500}), ); assert_eq!(snapshot.tenant_id, tenant_id); assert_eq!(snapshot.aggregate_id, aggregate_id); assert_eq!(snapshot.version, Version::from(5)); assert_eq!(snapshot.state["balance"], 500); } #[test] fn circuit_breaker_pattern() { use aggregate::storage::CircuitBreaker; let mut cb = CircuitBreaker::new() .with_failure_threshold(3) .with_reset_timeout(Duration::from_millis(50)); assert!(cb.is_closed()); cb.record_failure(); cb.record_failure(); cb.record_failure(); assert!(cb.is_open()); std::thread::sleep(Duration::from_millis(60)); assert!(!cb.is_closed()); assert!(!cb.is_open()); } #[test] fn error_types_are_send_sync() { fn assert_send_sync() {} assert_send_sync::(); assert_send_sync::(); } #[test] fn all_types_are_send_sync() { fn assert_send_sync() {} assert_send_sync::(); assert_send_sync::(); assert_send_sync::(); assert_send_sync::(); assert_send_sync::(); assert_send_sync::(); assert_send_sync::(); assert_send_sync::(); assert_send_sync::(); assert_send_sync::(); } #[test] fn concurrent_storage_operations() { let (_dir, storage) = create_test_storage(); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { use aggregate::types::Snapshot; use std::sync::Arc; use tokio::task::JoinSet; let storage = Arc::new(storage); let mut tasks = JoinSet::new(); for i in 0..10 { let storage = storage.clone(); tasks.spawn(async move { let tenant_id = TenantId::new(format!("tenant-{}", i % 3)); let aggregate_id = AggregateId::new_v7(); let snapshot = Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Account"), Version::from(1), json!({"balance": i * 100}), ); storage.put_snapshot(&snapshot).await.unwrap(); let loaded = storage .get_snapshot(&tenant_id, &aggregate_id) .await .unwrap(); assert!(loaded.is_some()); loaded.unwrap() }); } let mut results = Vec::new(); while let Some(result) = tasks.join_next().await { results.push(result.unwrap()); } assert_eq!(results.len(), 10); }); } #[test] fn tenant_isolation_e2e() { let (_dir, storage) = create_test_storage(); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { use aggregate::types::Snapshot; let tenant_a = TenantId::new("tenant-a"); let tenant_b = TenantId::new("tenant-b"); let aggregate_id = AggregateId::new_v7(); let snapshot_a = Snapshot::new( tenant_a.clone(), aggregate_id.clone(), AggregateType::from("Account"), Version::from(1), json!({"balance": 1000, "owner": "Alice"}), ); let snapshot_b = Snapshot::new( tenant_b.clone(), aggregate_id.clone(), AggregateType::from("Account"), Version::from(1), json!({"balance": 500, "owner": "Bob"}), ); storage.put_snapshot(&snapshot_a).await.unwrap(); storage.put_snapshot(&snapshot_b).await.unwrap(); let loaded_a = storage .get_snapshot(&tenant_a, &aggregate_id) .await .unwrap() .unwrap(); let loaded_b = storage .get_snapshot(&tenant_b, &aggregate_id) .await .unwrap() .unwrap(); assert_eq!(loaded_a.state["owner"], "Alice"); assert_eq!(loaded_a.state["balance"], 1000); assert_eq!(loaded_b.state["owner"], "Bob"); assert_eq!(loaded_b.state["balance"], 500); }); } #[test] fn bank_account_full_scenario() { let (_dir, storage) = create_test_storage(); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { use aggregate::types::Snapshot; let tenant_id = TenantId::new("bank-test"); let aggregate_id = AggregateId::new_v7(); let snapshot_v1 = Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("BankAccount"), Version::from(1), json!({"balance": 0}), ); storage.put_snapshot(&snapshot_v1).await.unwrap(); let snapshot_v2 = Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("BankAccount"), Version::from(2), json!({"balance": 100}), ); storage.put_snapshot(&snapshot_v2).await.unwrap(); let snapshot_v3 = Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("BankAccount"), Version::from(3), json!({"balance": 50}), ); storage.put_snapshot(&snapshot_v3).await.unwrap(); let loaded = storage .get_snapshot(&tenant_id, &aggregate_id) .await .unwrap() .unwrap(); assert_eq!(loaded.version, Version::from(3)); assert_eq!(loaded.state["balance"], 50); let version = storage .get_latest_version(&tenant_id, &aggregate_id) .await .unwrap(); assert_eq!(version, Some(Version::from(3))); }); } #[test] fn version_sequence_integrity() { let (_dir, storage) = create_test_storage(); let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { use aggregate::types::Snapshot; let tenant_id = TenantId::new("version-test"); let aggregate_id = AggregateId::new_v7(); for v in 1..=5 { let snapshot = Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Counter"), Version::from(v), json!({"count": v}), ); storage.put_snapshot(&snapshot).await.unwrap(); } let loaded = storage .get_snapshot(&tenant_id, &aggregate_id) .await .unwrap() .unwrap(); assert_eq!(loaded.version, Version::from(5)); assert_eq!(loaded.state["count"], 5); let duplicate = Snapshot::new( tenant_id.clone(), aggregate_id.clone(), AggregateType::from("Counter"), Version::from(5), json!({"count": 999}), ); let result = storage.put_snapshot(&duplicate).await; assert!(matches!( result, Err(AggregateError::VersionConflict { .. }) )); }); }