563 lines
23 KiB
Markdown
563 lines
23 KiB
Markdown
# Development Plan: Runner Node
|
||
|
||
## Overview
|
||
|
||
This plan breaks down the Runner node implementation into milestones ordered by dependency. Each milestone includes:
|
||
- **Tasks** with clear deliverables
|
||
- **Test Requirements** (unit tests + tautological tests + integration tests where applicable)
|
||
- **Dependencies** on previous milestones
|
||
|
||
**Development Approach:**
|
||
1. Complete one milestone at a time
|
||
2. Write tests before implementation (TDD where applicable)
|
||
3. All tests must pass before moving to the next milestone
|
||
4. Mark tasks complete with `[x]` as you progress
|
||
|
||
---
|
||
|
||
## Milestone 0: Repo Bootstrap (Dev Ergonomics + Guardrails)
|
||
|
||
**Goal:** Establish baseline development workflows and guardrails so later milestones can be executed and verified consistently.
|
||
|
||
### Tasks
|
||
- [x] **0.1** Define canonical local commands
|
||
- `cargo test`
|
||
- `cargo fmt --check`
|
||
- `cargo clippy -- -D warnings`
|
||
- `cargo run -- --help`
|
||
- [x] **0.2** Add minimal CI entrypoints (repository-level)
|
||
- Run fmt + clippy + tests on every PR
|
||
- Ensure CI uses the same commands as **0.1**
|
||
- [x] **0.3** Define integration-test gating pattern (NATS-required)
|
||
- Use ignored tests that run only when `RUNNER_TEST_NATS_URL` is set
|
||
- Make the plan’s “ignored by default” integration tests follow the same convention
|
||
- [x] **0.4** Define baseline operational invariants (written as checklist items for later milestones)
|
||
- Never ack before durable commit
|
||
- Never delete outbox item before durable confirmation
|
||
- Never execute effect twice for the same `(tenant_id, command_id)`
|
||
- Always propagate `tenant_id`, `correlation_id`, and `trace_id`
|
||
|
||
### Tests
|
||
- [x] **T0.1** Tautological test: baseline test harness runs
|
||
```rust
|
||
#[test]
|
||
fn test_harness_runs() {
|
||
assert!(true);
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Milestone 1: Project Foundation
|
||
|
||
**Goal:** Set up the Runner as a standalone Rust container with correct dependencies and a stable module layout.
|
||
|
||
### Tasks
|
||
- [x] **1.1** Initialize Cargo project
|
||
- Create `src/lib.rs` and `src/main.rs`
|
||
- Configure `Cargo.toml` with the madapes registry
|
||
- [x] **1.2** Configure dependencies (aligned with Aggregate/Projection)
|
||
- `edge-storage` (KvStore)
|
||
- `runtime-function` (Saga `on_event` / `compensation`)
|
||
- `edge-logger-client` (structured logs client)
|
||
- `query-engine` (optional admin/debug queries)
|
||
- `async-nats` (JetStream)
|
||
- `tokio`, `serde`, `serde_json`, `thiserror`, `anyhow`, `tracing`, `tracing-subscriber`, `uuid`, `chrono`, `axum`
|
||
- [x] **1.3** Establish initial module layout
|
||
```
|
||
src/
|
||
├── lib.rs
|
||
├── main.rs
|
||
├── types/
|
||
│ ├── mod.rs
|
||
│ ├── id.rs
|
||
│ ├── keys.rs
|
||
│ ├── envelope.rs
|
||
│ └── error.rs
|
||
├── config/
|
||
│ ├── mod.rs
|
||
│ └── settings.rs
|
||
├── storage/
|
||
│ ├── mod.rs
|
||
│ └── kv.rs
|
||
├── stream/
|
||
│ ├── mod.rs
|
||
│ └── jetstream.rs
|
||
├── saga/
|
||
│ ├── mod.rs
|
||
│ ├── manifest.rs
|
||
│ └── runtime.rs
|
||
├── outbox/
|
||
│ ├── mod.rs
|
||
│ └── relay.rs
|
||
├── effects/
|
||
│ ├── mod.rs
|
||
│ ├── manifest.rs
|
||
│ ├── runtime.rs
|
||
│ └── providers/
|
||
│ └── mod.rs
|
||
├── schedule/
|
||
│ ├── mod.rs
|
||
│ └── scheduler.rs
|
||
├── http/
|
||
│ └── mod.rs
|
||
└── observability/
|
||
└── mod.rs
|
||
```
|
||
- [x] **1.4** Configure clippy and rustfmt (CI-friendly)
|
||
|
||
### Tests
|
||
- [x] **T1.1** Project compiles successfully
|
||
- [x] **T1.2** Dependencies resolve from madapes registry
|
||
- [x] **T1.3** Clippy passes with no warnings
|
||
|
||
---
|
||
|
||
## Milestone 2: Core Types + Keyspaces
|
||
|
||
**Goal:** Implement the foundational types that the Runner depends on: multi-tenancy, envelope shapes, stable key composition, and error model.
|
||
|
||
### Dependencies
|
||
- Milestone 1 (project foundation)
|
||
|
||
### Tasks
|
||
- [x] **2.1** Implement `TenantId` type (same semantics as other nodes)
|
||
- String wrapper, default empty for single-tenant
|
||
- Display, FromStr, Serialize, Deserialize
|
||
- [x] **2.2** Implement workflow identity types
|
||
- `SagaName`, `EffectName` (string wrappers)
|
||
- `CorrelationId` (string wrapper)
|
||
- `WorkId` (UUID v7 or string wrapper depending on work kind)
|
||
- [x] **2.3** Implement stable key composition helpers
|
||
- `saga:{tenant_id}:{saga_name}:{correlation_id}`
|
||
- `checkpoint:{tenant_id}:{saga_name}`
|
||
- `outbox:{tenant_id}:{work_kind}:{work_id}`
|
||
- `schedule:{tenant_id}:{saga_name}:{correlation_id}:{due_at}`
|
||
- `dedupe:{tenant_id}:{saga_name}:{event_id}`
|
||
- `dedupe:{tenant_id}:effect:{command_id}`
|
||
- [x] **2.4** Define message envelopes used by the Runner (serde types)
|
||
- Aggregate event envelope (consumed from `AGGREGATE_EVENTS`)
|
||
- Effect command envelope (consumed from workflow command stream)
|
||
- Effect result event envelope (produced to workflow event stream)
|
||
- Gateway command submission shape (compatible with Aggregate gateway request fields)
|
||
- Forward-compatible decoding (unknown fields ignored where practical)
|
||
- [x] **2.5** Implement Runner error model
|
||
- Storage errors, stream errors, decode errors, runtime-function errors
|
||
- Tenant access errors
|
||
- Policy errors (poison message, quarantine, etc.)
|
||
|
||
### Tests
|
||
- [x] **T2.1** `TenantId` round-trips serialization and defaults to empty
|
||
- [x] **T2.2** Key composition produces stable strings across all keyspaces
|
||
- [x] **T2.3** Envelope decoding ignores unknown fields (forward compatibility)
|
||
- [x] **T2.4** Tautological test: core types are Send + Sync
|
||
|
||
---
|
||
|
||
## Milestone 3: Configuration
|
||
|
||
**Goal:** Implement settings and startup validation for modes (saga/effect/combined), multi-tenancy, storage, and JetStream.
|
||
|
||
### Dependencies
|
||
- Milestone 2 (core types)
|
||
|
||
### Tasks
|
||
- [x] **3.1** Define `Settings` struct
|
||
- NATS URL
|
||
- Storage path
|
||
- Mode: saga/effect/combined
|
||
- Multi-tenancy enabled flag + default tenant behavior
|
||
- Stream names:
|
||
- `AGGREGATE_EVENTS` (existing)
|
||
- `WORKFLOW_COMMANDS` (runner work distribution)
|
||
- `WORKFLOW_EVENTS` (effect results + optional workflow facts)
|
||
- Subject filters for saga triggers and effect commands
|
||
- Consumer configuration: durable name strategy, deliver group, max in-flight, ack wait, max deliver
|
||
- Backpressure configuration: per-key concurrency, batching, relay polling intervals
|
||
- Manifest paths: sagas and effects (YAML/JSON)
|
||
- [x] **3.2** Implement config loading from environment variables
|
||
- [x] **3.3** Implement config loading from file (YAML/TOML/JSON)
|
||
- Environment overrides file
|
||
- [x] **3.4** Implement config validation
|
||
- Required fields present
|
||
- Stream names and consumer naming rules valid
|
||
- Manifests load and validate at startup
|
||
|
||
### Tests
|
||
- [x] **T3.1** Settings loads from environment variables
|
||
- [x] **T3.2** Settings validation catches missing/invalid values
|
||
- [x] **T3.3** Tautological test: Settings is Clone + Debug
|
||
|
||
---
|
||
|
||
## Milestone 4: Storage Layer (KvStore Transactions)
|
||
|
||
**Goal:** Integrate `edge-storage` `KvStore` and provide transaction APIs that enforce the Runner’s atomicity requirements.
|
||
|
||
### Dependencies
|
||
- Milestone 2 (core types)
|
||
- Milestone 3 (configuration)
|
||
|
||
### Tasks
|
||
- [x] **4.1** Create `KvClient` wrapper
|
||
- Opens MDBX-backed KvStore at configured path
|
||
- Tenant-aware key helpers for all namespaces
|
||
- [x] **4.2** Implement saga state primitives
|
||
- `get_saga_state(key) -> Option<Value>`
|
||
- `put_saga_state(key, value)`
|
||
- [x] **4.3** Implement checkpoint primitives
|
||
- `get_checkpoint(key) -> Option<u64>`
|
||
- `put_checkpoint(key, u64)`
|
||
- [x] **4.4** Implement outbox primitives
|
||
- `put_outbox_item(key, item)`
|
||
- `list_outbox_prefix(tenant_id, ...)` (for relay scanning)
|
||
- `delete_outbox_item(key)`
|
||
- [x] **4.5** Implement schedule primitives
|
||
- `put_schedule_item(key, payload)`
|
||
- `scan_due_schedule_items(now)` (prefix scan + due filtering)
|
||
- `delete_schedule_item(key)`
|
||
- [x] **4.6** Implement atomic commit API for saga processing
|
||
- One transaction: update saga state + write outbox items + advance checkpoint + record dedupe marker(s)
|
||
- Provide an API surface that makes partial updates difficult
|
||
|
||
### Tests
|
||
- [x] **T4.1** Saga state round-trip: put/get returns identical JSON
|
||
- [x] **T4.2** Checkpoint round-trip: put/get returns identical value
|
||
- [x] **T4.3** Atomicity: if transaction fails, state/outbox/checkpoint are not partially committed
|
||
- [x] **T4.4** Outbox delete removes keys reliably
|
||
- [x] **T4.5** Schedule scan only returns due items and respects tenant scoping
|
||
|
||
---
|
||
|
||
## Milestone 5: JetStream Integration (Worker Pool Semantics)
|
||
|
||
**Goal:** Consume and produce JetStream messages with correct delivery semantics, ack discipline, idempotency, and backpressure.
|
||
|
||
### Dependencies
|
||
- Milestone 4 (storage layer)
|
||
|
||
### Tasks
|
||
- [x] **5.1** Implement JetStream client wrapper
|
||
- Connect to NATS and create JetStream context
|
||
- Bind to required streams (create-if-missing for workflow streams if enabled by config)
|
||
- [x] **5.2** Implement saga trigger consumer
|
||
- Durable consumer filtered to aggregate event subjects (tenant-aware)
|
||
- Deliver group support for worker pool replicas
|
||
- Extract stream sequence from message metadata
|
||
- [x] **5.3** Implement effect command consumer
|
||
- Durable consumer filtered to effect command subjects (tenant-aware)
|
||
- Deliver group support
|
||
- [x] **5.4** Implement publish wrappers
|
||
- Publish effect result events with headers (tenant-id, command-id, effect-name, trace-id)
|
||
- Publish optional workflow facts if needed
|
||
- [x] **5.5** Enforce ack discipline
|
||
- Ack only after the relevant storage transaction commits and/or downstream publish is confirmed
|
||
- [x] **5.6** Implement poison-message policy wiring
|
||
- Consumer max-deliver configured
|
||
- After max attempts: quarantine record in KV + TERM ack
|
||
|
||
### Tests
|
||
- [x] **T5.1** Unit test: checkpoint/dedupe gates skip already-processed items
|
||
- [x] **T5.2** Unit test: ack is not performed when storage commit fails
|
||
- [x] **T5.3** Integration test: JetStream redelivery is idempotent (ignored by default; enabled with `RUNNER_TEST_NATS_URL=...`)
|
||
|
||
---
|
||
|
||
## Milestone 6: Saga Runtime (Deterministic Execution)
|
||
|
||
**Goal:** Execute Saga logic as deterministic `runtime-function` DAG programs, producing work items that are persisted into the outbox.
|
||
|
||
### Dependencies
|
||
- Milestone 5 (JetStream consumption)
|
||
|
||
### Tasks
|
||
- [x] **6.1** Define Saga program invocation contract
|
||
- Input: `{ saga_state, event }`
|
||
- Output: `{ new_saga_state, work_items[], schedules[] }`
|
||
- [x] **6.2** Implement `runtime-function` execution wrapper
|
||
- Gas limits and timeouts
|
||
- Deterministic inputs only (no I/O, no clock access)
|
||
- [x] **6.3** Implement Saga manifest
|
||
- Defines sagas, trigger filters, program references (`on_event`, `compensation`)
|
||
- Validate referenced programs exist
|
||
- [x] **6.4** Implement saga processing pipeline
|
||
- Load saga state
|
||
- Execute program
|
||
- Atomic commit (state + outbox + checkpoint + dedupe + schedule items)
|
||
|
||
### Tests
|
||
- [x] **T6.1** Unit test: same inputs produce same outputs (determinism)
|
||
- [x] **T6.2** Unit test: pipeline writes checkpoint only if state/outbox commit succeeds
|
||
- [x] **T6.3** Unit test: dedupe prevents duplicate transitions for the same event_id
|
||
|
||
---
|
||
|
||
## Milestone 7: Outbox Relay (Reliable Dispatch)
|
||
|
||
**Goal:** Reliably deliver outbox work items to their destinations without dual-write gaps, supporting retries and backpressure.
|
||
|
||
### Dependencies
|
||
- Milestone 6 (saga runtime)
|
||
|
||
### Tasks
|
||
- [x] **7.1** Implement outbox relay loop
|
||
- Poll `outbox:` prefix in KV with bounded batch size
|
||
- Emit metrics for outbox depth and dispatch latency
|
||
- [x] **7.2** Implement dispatch targets
|
||
- Aggregate commands via Gateway submission (HTTP/gRPC, tenant-scoped)
|
||
- Always send `x-tenant-id` and propagate `correlation_id`/`trace_id` metadata
|
||
- Effect commands published to `WORKFLOW_COMMANDS`
|
||
- [x] **7.3** Implement idempotency for relay dispatch
|
||
- Safe retries: dispatch operation is idempotent using `command_id` (and/or JetStream `Nats-Msg-Id`)
|
||
- Only delete outbox item after durable confirmation (publish ack / gateway response)
|
||
- [x] **7.4** Implement backpressure controls
|
||
- Max in-flight dispatches per tenant
|
||
- Bounded retries with backoff
|
||
|
||
### Tests
|
||
- [x] **T7.1** Unit test: outbox item is not deleted if dispatch fails
|
||
- [x] **T7.2** Unit test: dispatch success deletes outbox item exactly once
|
||
- [x] **T7.3** Integration test: crash/restart simulation re-dispatches pending outbox items without duplicates (ignored by default)
|
||
|
||
---
|
||
|
||
## Milestone 8: Effect Worker Runtime (Non-Deterministic Execution)
|
||
|
||
**Goal:** Consume effect commands, execute side effects with reliability controls, publish result events, and ensure idempotency.
|
||
|
||
### Dependencies
|
||
- Milestone 5 (effect command consumer)
|
||
- Milestone 4 (dedupe storage)
|
||
|
||
### Tasks
|
||
- [x] **8.1** Define Effect command/result contract
|
||
- Command input: `{ tenant_id, command_id, effect_name, payload, metadata }`
|
||
- Result event output: `{ tenant_id, command_id, effect_name, result_type, payload, timestamp }`
|
||
- [x] **8.2** Implement provider interface
|
||
- Provider receives decoded command and returns a typed outcome
|
||
- Support per-provider configuration via manifest
|
||
- Support secret references resolved from Swarm secrets/config at runtime (no secrets in git)
|
||
- [x] **8.3** Implement reliability controls
|
||
- retries + exponential backoff
|
||
- timeouts
|
||
- circuit breakers per upstream
|
||
- [x] **8.4** Implement idempotency gate
|
||
- Check `dedupe:{tenant_id}:effect:{command_id}` before executing external call
|
||
- Record completion only after result publish is acknowledged
|
||
- [x] **8.5** Publish result events to `WORKFLOW_EVENTS`
|
||
- Include headers for correlation/trace propagation when present
|
||
|
||
### Tests
|
||
- [x] **T8.1** Unit test: idempotency gate prevents double execution for same command_id
|
||
- [x] **T8.2** Unit test: result publish failure does not mark command as completed
|
||
- [x] **T8.3** Integration test: simulated redelivery of effect command does not duplicate external call (ignored by default)
|
||
|
||
---
|
||
|
||
## Milestone 9: Scheduling (Durable Timeouts/Reminders)
|
||
|
||
**Goal:** Implement durable scheduling for saga timeouts using KV-backed schedules rather than in-memory timers as the source of truth.
|
||
|
||
### Dependencies
|
||
- Milestone 4 (schedule primitives)
|
||
- Milestone 6 (saga emits schedules)
|
||
|
||
### Tasks
|
||
- [x] **9.1** Implement scheduler loop
|
||
- Periodically scan due `schedule:` items (tenant-aware)
|
||
- Emit scheduling metrics (due count, scan time, lag)
|
||
- [x] **9.2** Define reminder delivery mechanism
|
||
- Option A: publish a workflow event to `WORKFLOW_EVENTS` that sagas consume
|
||
- Option B: inject an internal event into the saga pipeline without JetStream (only if it preserves restart correctness)
|
||
- [x] **9.3** Ensure idempotency for reminders
|
||
- Reminder keys encode due time and correlation_id, allowing safe retry without duplicates
|
||
|
||
### Tests
|
||
- [x] **T9.1** Unit test: due schedule item is delivered and then deleted
|
||
- [x] **T9.2** Unit test: scheduler is tenant-scoped
|
||
- [x] **T9.3** Integration test: restart re-scans and delivers still-due reminders exactly once (ignored by default)
|
||
|
||
---
|
||
|
||
## Milestone 10: HTTP Endpoints + Operational Controls (Under Gateway)
|
||
|
||
**Goal:** Provide health/readiness/metrics/info and operational controls (drain/reload) aligned with other nodes.
|
||
|
||
### Dependencies
|
||
- Milestone 3 (settings)
|
||
- Milestone 4 (storage)
|
||
- Milestone 5 (JetStream)
|
||
|
||
### Tasks
|
||
- [x] **10.1** Implement `/health`
|
||
- Storage writable check + JetStream connectivity check
|
||
- [x] **10.2** Implement `/ready`
|
||
- Not draining + storage writable + JetStream reachable
|
||
- Stop reporting ready before shutdown/drain to support safe rollouts
|
||
- [x] **10.3** Implement `/metrics`
|
||
- Worker lag, outbox depth, effect latency, schedule lag
|
||
- [x] **10.4** Implement `/info`
|
||
- Build info, mode, stream/consumer names, enabled saga/effect sets
|
||
- [x] **10.5** Implement `/admin/drain`
|
||
- Stop acquiring new work, finish in-flight, flush relay, then report draining state
|
||
- [x] **10.6** Implement `/admin/reload`
|
||
- Hot-reload manifests and (optionally) tenant placement config where safe
|
||
|
||
### Tests
|
||
- [x] **T10.1** Unit test: readiness toggles with draining flag
|
||
- [x] **T10.2** Unit test: health fails when storage is unwritable
|
||
|
||
---
|
||
|
||
## Milestone 11: Container & Deployment
|
||
|
||
**Goal:** Package the Runner as a container and define entrypoint behavior consistent with Aggregate/Projection.
|
||
|
||
### Dependencies
|
||
- Milestone 9 (scheduling)
|
||
- Milestone 10 (operational endpoints)
|
||
|
||
### Tasks
|
||
- [x] **11.1** Create `docker/Dockerfile.rust`
|
||
- Multi-stage build
|
||
- Minimal runtime image
|
||
- Health check wiring (uses `/health`)
|
||
- [x] **11.2** Create `docker-compose.yml` for local dev
|
||
- Runner container (mode configurable)
|
||
- NATS server (JetStream enabled)
|
||
- Optional: Grafana, Victoria Metrics, Loki
|
||
- [x] **11.3** Define container entrypoint behavior
|
||
- Config loading
|
||
- Graceful shutdown on SIGTERM
|
||
- For saga/effect consumers: stop pulling new messages before exit
|
||
- Flush outbox relay safely (do not delete outbox entries without confirmation)
|
||
- Timeout-based forced shutdown policy
|
||
- [x] **11.4** Define environment variables and defaults
|
||
- NATS URL, stream names, subject filters
|
||
- Storage path
|
||
- Mode: saga/effect/combined
|
||
- Multi-tenancy enabled flag + default tenant behavior
|
||
- Consumer settings: durable name strategy, deliver group, max in-flight, ack wait, max deliver
|
||
- [x] **11.5** Create release build optimization
|
||
- LTO, strip, single codegen unit
|
||
|
||
### Tests
|
||
- [x] **T11.1** Container builds successfully
|
||
```bash
|
||
docker build -f docker/Dockerfile.rust --build-arg PACKAGE=runner --build-arg BIN=runner -t cloudlysis/runner:local .
|
||
docker run cloudlysis/runner:local --help
|
||
```
|
||
- [x] **T11.2** Container starts with valid config
|
||
```bash
|
||
docker run -e RUNNER_NATS_URL=nats://nats:4222 runner:latest
|
||
```
|
||
|
||
---
|
||
|
||
## Milestone 12: Provisioning, Scalability, and Docker Swarm Deployment
|
||
|
||
**Goal:** Support horizontal scaling and safe rollouts in Docker Swarm with clear worker-pool semantics for JetStream consumers and outbox relay.
|
||
|
||
### Dependencies
|
||
- Milestone 11 (container & deployment)
|
||
|
||
### Tasks
|
||
- [x] **12.1** Define the scaling model for saga + effect workers
|
||
- Saga workers: durable consumer filtered to aggregate event subjects
|
||
- Effect workers: durable consumer filtered to effect command subjects
|
||
- Deliver group strategy so replicas share workload without duplication
|
||
- Consumer configuration requirements (ack policy, max in-flight, ack wait, max deliver)
|
||
- [x] **12.2** Implement replica-safe processing invariants
|
||
- Ack only after storage transaction commits
|
||
- Outbox relay deletes only after durable confirmation (publish ack / gateway response)
|
||
- Optional per-key serialization for workflows that require strict ordering
|
||
- [x] **12.3** Add tenant-aware provisioning option (sharding)
|
||
- Optional tenant-range sharding by subject filters (e.g., `tenant.<range>.*`)
|
||
- Placement constraints for Swarm nodes (e.g., `node.labels.tenant_range==<range>`)
|
||
- Strategy for adding/removing shards and rebalancing tenants
|
||
- [x] **12.4** Optional: NATS KV-backed tenant placement config
|
||
- Define a `TENANT_PLACEMENT` bucket to store tenant → shard assignments
|
||
- Watch for config changes and apply without restart
|
||
- [x] **12.5** Create Swarm stack definition (`swarm/stacks/platform.yml`)
|
||
- Service definition(s) for saga/effect/combined modes
|
||
- Replicas configuration
|
||
- Resource limits (CPU, memory)
|
||
- Health check integration (`/health`, `/ready`)
|
||
- Storage volume mapping for `edge-storage` data directory
|
||
- Secrets/config wiring for provider credentials (no secrets in env vars)
|
||
- [x] **12.6** Define rollout + drain strategy
|
||
- Rolling update parameters
|
||
- Drain behavior: stop acquiring work, finish in-flight, flush relay
|
||
- Safe rollback story (old replicas still valid due to idempotency + checkpointing)
|
||
|
||
### Tests
|
||
- [x] **T12.1** Stack file valid
|
||
```bash
|
||
docker stack config -c swarm/stacks/platform.yml
|
||
```
|
||
- [x] **T12.2** Scale-out does not duplicate work (ignored by default; run with `RUNNER_TEST_NATS_URL=... cargo test -- --ignored`)
|
||
- Start 2+ replicas pulling from the same durable consumer deliver group
|
||
- Verify saga checkpoint monotonicity and outbox dispatch idempotency
|
||
- [x] **T12.3** Rolling restart preserves correctness (ignored by default; run with `RUNNER_TEST_NATS_URL=... cargo test -- --ignored`)
|
||
- Restart replicas during active processing
|
||
- Verify no duplicate effects and no lost outbox items
|
||
|
||
---
|
||
|
||
## Milestone 13: Observability + Safety Policies
|
||
|
||
**Goal:** Ensure production-grade logs, metrics, and policy controls consistent with Aggregate/Projection.
|
||
|
||
### Dependencies
|
||
- Milestone 10 (operational endpoints) and prior runtime milestones
|
||
|
||
### Tasks
|
||
- [x] **13.1** Integrate `edge-logger-client` logging pipeline
|
||
- Ensure tenant_id, trace_id, correlation_id are included in structured logs
|
||
- [x] **13.2** Implement core metrics
|
||
- consumer lag, redeliveries, processing latency
|
||
- outbox depth and dispatch latency
|
||
- effect success/failure counts and duration histograms
|
||
- schedule lag and scan durations
|
||
- [x] **13.3** Implement poison message quarantine records
|
||
- Store minimal decoded context and error reason in KV under a deadletter namespace
|
||
- [ ] **13.4** Standardize correlation and trace propagation to match Gateway/Control conventions
|
||
- When submitting commands via the Gateway, include `x-correlation-id` and `traceparent` headers when available in workflow metadata
|
||
- When publishing to JetStream, include `x-correlation-id` and `traceparent` message headers when available
|
||
- Ensure `correlation_id` and `trace_id` appear in logs/spans for dispatch and effect execution paths
|
||
|
||
### Tests
|
||
- [x] **T13.1** Unit test: metrics exporter emits key metrics
|
||
- [x] **T13.2** Unit test: quarantine record is written on poison handling path
|
||
- [ ] **T13.3** Integration test: Gateway-bound requests include `x-correlation-id` when workflow metadata supplies a correlation_id (ignored by default; requires NATS)
|
||
|
||
---
|
||
|
||
## Milestone 14: Integration Hardening (Replay, Scale, Compatibility)
|
||
|
||
**Goal:** Validate end-to-end correctness across saga + outbox + effect execution, and implement operational workflows needed for safe scaling.
|
||
|
||
### Dependencies
|
||
- Milestone 6–13
|
||
|
||
### Tasks
|
||
- [x] **14.1** Implement controlled replay for sagas
|
||
- Reset `checkpoint:{tenant_id}:{saga_name}` with explicit operator intent
|
||
- Safety checks to prevent accidental full replays
|
||
- [x] **14.2** Validate worker-pool semantics
|
||
- deliver groups distribute work across replicas
|
||
- per-key ordering enforcement option for workflows that need it
|
||
- [x] **14.3** Validate graceful drain behavior
|
||
- no new work acquired
|
||
- in-flight finishes
|
||
- outbox relay flush completes
|
||
- [x] **14.4** End-to-end integration test suite
|
||
- Aggregate event → saga transition → outbox command → effect command → effect result → saga completes
|
||
|
||
### Tests
|
||
- [x] **T14.1** Integration test: full happy-path workflow (ignored by default; requires NATS)
|
||
- [x] **T14.2** Integration test: crash/restart across boundaries preserves atomicity (ignored by default; requires NATS)
|
||
- restart after state commit but before dispatch
|
||
- restart after dispatch but before outbox delete
|
||
- restart during effect execution and redelivery
|