23 KiB
Development Plan: Runner Node
Overview
This plan breaks down the Runner node implementation into milestones ordered by dependency. Each milestone includes:
- Tasks with clear deliverables
- Test Requirements (unit tests + tautological tests + integration tests where applicable)
- Dependencies on previous milestones
Development Approach:
- Complete one milestone at a time
- Write tests before implementation (TDD where applicable)
- All tests must pass before moving to the next milestone
- Mark tasks complete with
[x]as you progress
Milestone 0: Repo Bootstrap (Dev Ergonomics + Guardrails)
Goal: Establish baseline development workflows and guardrails so later milestones can be executed and verified consistently.
Tasks
- 0.1 Define canonical local commands
cargo testcargo fmt --checkcargo clippy -- -D warningscargo run -- --help
- 0.2 Add minimal CI entrypoints (repository-level)
- Run fmt + clippy + tests on every PR
- Ensure CI uses the same commands as 0.1
- 0.3 Define integration-test gating pattern (NATS-required)
- Use ignored tests that run only when
RUNNER_TEST_NATS_URLis set - Make the plan’s “ignored by default” integration tests follow the same convention
- Use ignored tests that run only when
- 0.4 Define baseline operational invariants (written as checklist items for later milestones)
- Never ack before durable commit
- Never delete outbox item before durable confirmation
- Never execute effect twice for the same
(tenant_id, command_id) - Always propagate
tenant_id,correlation_id, andtrace_id
Tests
- T0.1 Tautological test: baseline test harness runs
#[test] fn test_harness_runs() { assert!(true); }
Milestone 1: Project Foundation
Goal: Set up the Runner as a standalone Rust container with correct dependencies and a stable module layout.
Tasks
- 1.1 Initialize Cargo project
- Create
src/lib.rsandsrc/main.rs - Configure
Cargo.tomlwith the madapes registry
- Create
- 1.2 Configure dependencies (aligned with Aggregate/Projection)
edge-storage(KvStore)runtime-function(Sagaon_event/compensation)edge-logger-client(structured logs client)query-engine(optional admin/debug queries)async-nats(JetStream)tokio,serde,serde_json,thiserror,anyhow,tracing,tracing-subscriber,uuid,chrono,axum
- 1.3 Establish initial module layout
src/ ├── lib.rs ├── main.rs ├── types/ │ ├── mod.rs │ ├── id.rs │ ├── keys.rs │ ├── envelope.rs │ └── error.rs ├── config/ │ ├── mod.rs │ └── settings.rs ├── storage/ │ ├── mod.rs │ └── kv.rs ├── stream/ │ ├── mod.rs │ └── jetstream.rs ├── saga/ │ ├── mod.rs │ ├── manifest.rs │ └── runtime.rs ├── outbox/ │ ├── mod.rs │ └── relay.rs ├── effects/ │ ├── mod.rs │ ├── manifest.rs │ ├── runtime.rs │ └── providers/ │ └── mod.rs ├── schedule/ │ ├── mod.rs │ └── scheduler.rs ├── http/ │ └── mod.rs └── observability/ └── mod.rs - 1.4 Configure clippy and rustfmt (CI-friendly)
Tests
- T1.1 Project compiles successfully
- T1.2 Dependencies resolve from madapes registry
- T1.3 Clippy passes with no warnings
Milestone 2: Core Types + Keyspaces
Goal: Implement the foundational types that the Runner depends on: multi-tenancy, envelope shapes, stable key composition, and error model.
Dependencies
- Milestone 1 (project foundation)
Tasks
- 2.1 Implement
TenantIdtype (same semantics as other nodes)- String wrapper, default empty for single-tenant
- Display, FromStr, Serialize, Deserialize
- 2.2 Implement workflow identity types
SagaName,EffectName(string wrappers)CorrelationId(string wrapper)WorkId(UUID v7 or string wrapper depending on work kind)
- 2.3 Implement stable key composition helpers
saga:{tenant_id}:{saga_name}:{correlation_id}checkpoint:{tenant_id}:{saga_name}outbox:{tenant_id}:{work_kind}:{work_id}schedule:{tenant_id}:{saga_name}:{correlation_id}:{due_at}dedupe:{tenant_id}:{saga_name}:{event_id}dedupe:{tenant_id}:effect:{command_id}
- 2.4 Define message envelopes used by the Runner (serde types)
- Aggregate event envelope (consumed from
AGGREGATE_EVENTS) - Effect command envelope (consumed from workflow command stream)
- Effect result event envelope (produced to workflow event stream)
- Gateway command submission shape (compatible with Aggregate gateway request fields)
- Forward-compatible decoding (unknown fields ignored where practical)
- Aggregate event envelope (consumed from
- 2.5 Implement Runner error model
- Storage errors, stream errors, decode errors, runtime-function errors
- Tenant access errors
- Policy errors (poison message, quarantine, etc.)
Tests
- T2.1
TenantIdround-trips serialization and defaults to empty - T2.2 Key composition produces stable strings across all keyspaces
- T2.3 Envelope decoding ignores unknown fields (forward compatibility)
- T2.4 Tautological test: core types are Send + Sync
Milestone 3: Configuration
Goal: Implement settings and startup validation for modes (saga/effect/combined), multi-tenancy, storage, and JetStream.
Dependencies
- Milestone 2 (core types)
Tasks
- 3.1 Define
Settingsstruct- NATS URL
- Storage path
- Mode: saga/effect/combined
- Multi-tenancy enabled flag + default tenant behavior
- Stream names:
AGGREGATE_EVENTS(existing)WORKFLOW_COMMANDS(runner work distribution)WORKFLOW_EVENTS(effect results + optional workflow facts)
- Subject filters for saga triggers and effect commands
- Consumer configuration: durable name strategy, deliver group, max in-flight, ack wait, max deliver
- Backpressure configuration: per-key concurrency, batching, relay polling intervals
- Manifest paths: sagas and effects (YAML/JSON)
- 3.2 Implement config loading from environment variables
- 3.3 Implement config loading from file (YAML/TOML/JSON)
- Environment overrides file
- 3.4 Implement config validation
- Required fields present
- Stream names and consumer naming rules valid
- Manifests load and validate at startup
Tests
- T3.1 Settings loads from environment variables
- T3.2 Settings validation catches missing/invalid values
- T3.3 Tautological test: Settings is Clone + Debug
Milestone 4: Storage Layer (KvStore Transactions)
Goal: Integrate edge-storage KvStore and provide transaction APIs that enforce the Runner’s atomicity requirements.
Dependencies
- Milestone 2 (core types)
- Milestone 3 (configuration)
Tasks
- 4.1 Create
KvClientwrapper- Opens MDBX-backed KvStore at configured path
- Tenant-aware key helpers for all namespaces
- 4.2 Implement saga state primitives
get_saga_state(key) -> Option<Value>put_saga_state(key, value)
- 4.3 Implement checkpoint primitives
get_checkpoint(key) -> Option<u64>put_checkpoint(key, u64)
- 4.4 Implement outbox primitives
put_outbox_item(key, item)list_outbox_prefix(tenant_id, ...)(for relay scanning)delete_outbox_item(key)
- 4.5 Implement schedule primitives
put_schedule_item(key, payload)scan_due_schedule_items(now)(prefix scan + due filtering)delete_schedule_item(key)
- 4.6 Implement atomic commit API for saga processing
- One transaction: update saga state + write outbox items + advance checkpoint + record dedupe marker(s)
- Provide an API surface that makes partial updates difficult
Tests
- T4.1 Saga state round-trip: put/get returns identical JSON
- T4.2 Checkpoint round-trip: put/get returns identical value
- T4.3 Atomicity: if transaction fails, state/outbox/checkpoint are not partially committed
- T4.4 Outbox delete removes keys reliably
- T4.5 Schedule scan only returns due items and respects tenant scoping
Milestone 5: JetStream Integration (Worker Pool Semantics)
Goal: Consume and produce JetStream messages with correct delivery semantics, ack discipline, idempotency, and backpressure.
Dependencies
- Milestone 4 (storage layer)
Tasks
- 5.1 Implement JetStream client wrapper
- Connect to NATS and create JetStream context
- Bind to required streams (create-if-missing for workflow streams if enabled by config)
- 5.2 Implement saga trigger consumer
- Durable consumer filtered to aggregate event subjects (tenant-aware)
- Deliver group support for worker pool replicas
- Extract stream sequence from message metadata
- 5.3 Implement effect command consumer
- Durable consumer filtered to effect command subjects (tenant-aware)
- Deliver group support
- 5.4 Implement publish wrappers
- Publish effect result events with headers (tenant-id, command-id, effect-name, trace-id)
- Publish optional workflow facts if needed
- 5.5 Enforce ack discipline
- Ack only after the relevant storage transaction commits and/or downstream publish is confirmed
- 5.6 Implement poison-message policy wiring
- Consumer max-deliver configured
- After max attempts: quarantine record in KV + TERM ack
Tests
- T5.1 Unit test: checkpoint/dedupe gates skip already-processed items
- T5.2 Unit test: ack is not performed when storage commit fails
- T5.3 Integration test: JetStream redelivery is idempotent (ignored by default; enabled with
RUNNER_TEST_NATS_URL=...)
Milestone 6: Saga Runtime (Deterministic Execution)
Goal: Execute Saga logic as deterministic runtime-function DAG programs, producing work items that are persisted into the outbox.
Dependencies
- Milestone 5 (JetStream consumption)
Tasks
- 6.1 Define Saga program invocation contract
- Input:
{ saga_state, event } - Output:
{ new_saga_state, work_items[], schedules[] }
- Input:
- 6.2 Implement
runtime-functionexecution wrapper- Gas limits and timeouts
- Deterministic inputs only (no I/O, no clock access)
- 6.3 Implement Saga manifest
- Defines sagas, trigger filters, program references (
on_event,compensation) - Validate referenced programs exist
- Defines sagas, trigger filters, program references (
- 6.4 Implement saga processing pipeline
- Load saga state
- Execute program
- Atomic commit (state + outbox + checkpoint + dedupe + schedule items)
Tests
- T6.1 Unit test: same inputs produce same outputs (determinism)
- T6.2 Unit test: pipeline writes checkpoint only if state/outbox commit succeeds
- T6.3 Unit test: dedupe prevents duplicate transitions for the same event_id
Milestone 7: Outbox Relay (Reliable Dispatch)
Goal: Reliably deliver outbox work items to their destinations without dual-write gaps, supporting retries and backpressure.
Dependencies
- Milestone 6 (saga runtime)
Tasks
- 7.1 Implement outbox relay loop
- Poll
outbox:prefix in KV with bounded batch size - Emit metrics for outbox depth and dispatch latency
- Poll
- 7.2 Implement dispatch targets
- Aggregate commands via Gateway submission (HTTP/gRPC, tenant-scoped)
- Always send
x-tenant-idand propagatecorrelation_id/trace_idmetadata - Effect commands published to
WORKFLOW_COMMANDS
- 7.3 Implement idempotency for relay dispatch
- Safe retries: dispatch operation is idempotent using
command_id(and/or JetStreamNats-Msg-Id) - Only delete outbox item after durable confirmation (publish ack / gateway response)
- Safe retries: dispatch operation is idempotent using
- 7.4 Implement backpressure controls
- Max in-flight dispatches per tenant
- Bounded retries with backoff
Tests
- T7.1 Unit test: outbox item is not deleted if dispatch fails
- T7.2 Unit test: dispatch success deletes outbox item exactly once
- T7.3 Integration test: crash/restart simulation re-dispatches pending outbox items without duplicates (ignored by default)
Milestone 8: Effect Worker Runtime (Non-Deterministic Execution)
Goal: Consume effect commands, execute side effects with reliability controls, publish result events, and ensure idempotency.
Dependencies
- Milestone 5 (effect command consumer)
- Milestone 4 (dedupe storage)
Tasks
- 8.1 Define Effect command/result contract
- Command input:
{ tenant_id, command_id, effect_name, payload, metadata } - Result event output:
{ tenant_id, command_id, effect_name, result_type, payload, timestamp }
- Command input:
- 8.2 Implement provider interface
- Provider receives decoded command and returns a typed outcome
- Support per-provider configuration via manifest
- Support secret references resolved from Swarm secrets/config at runtime (no secrets in git)
- 8.3 Implement reliability controls
- retries + exponential backoff
- timeouts
- circuit breakers per upstream
- 8.4 Implement idempotency gate
- Check
dedupe:{tenant_id}:effect:{command_id}before executing external call - Record completion only after result publish is acknowledged
- Check
- 8.5 Publish result events to
WORKFLOW_EVENTS- Include headers for correlation/trace propagation when present
Tests
- T8.1 Unit test: idempotency gate prevents double execution for same command_id
- T8.2 Unit test: result publish failure does not mark command as completed
- T8.3 Integration test: simulated redelivery of effect command does not duplicate external call (ignored by default)
Milestone 9: Scheduling (Durable Timeouts/Reminders)
Goal: Implement durable scheduling for saga timeouts using KV-backed schedules rather than in-memory timers as the source of truth.
Dependencies
- Milestone 4 (schedule primitives)
- Milestone 6 (saga emits schedules)
Tasks
- 9.1 Implement scheduler loop
- Periodically scan due
schedule:items (tenant-aware) - Emit scheduling metrics (due count, scan time, lag)
- Periodically scan due
- 9.2 Define reminder delivery mechanism
- Option A: publish a workflow event to
WORKFLOW_EVENTSthat sagas consume - Option B: inject an internal event into the saga pipeline without JetStream (only if it preserves restart correctness)
- Option A: publish a workflow event to
- 9.3 Ensure idempotency for reminders
- Reminder keys encode due time and correlation_id, allowing safe retry without duplicates
Tests
- T9.1 Unit test: due schedule item is delivered and then deleted
- T9.2 Unit test: scheduler is tenant-scoped
- T9.3 Integration test: restart re-scans and delivers still-due reminders exactly once (ignored by default)
Milestone 10: HTTP Endpoints + Operational Controls (Under Gateway)
Goal: Provide health/readiness/metrics/info and operational controls (drain/reload) aligned with other nodes.
Dependencies
- Milestone 3 (settings)
- Milestone 4 (storage)
- Milestone 5 (JetStream)
Tasks
- 10.1 Implement
/health- Storage writable check + JetStream connectivity check
- 10.2 Implement
/ready- Not draining + storage writable + JetStream reachable
- Stop reporting ready before shutdown/drain to support safe rollouts
- 10.3 Implement
/metrics- Worker lag, outbox depth, effect latency, schedule lag
- 10.4 Implement
/info- Build info, mode, stream/consumer names, enabled saga/effect sets
- 10.5 Implement
/admin/drain- Stop acquiring new work, finish in-flight, flush relay, then report draining state
- 10.6 Implement
/admin/reload- Hot-reload manifests and (optionally) tenant placement config where safe
Tests
- T10.1 Unit test: readiness toggles with draining flag
- T10.2 Unit test: health fails when storage is unwritable
Milestone 11: Container & Deployment
Goal: Package the Runner as a container and define entrypoint behavior consistent with Aggregate/Projection.
Dependencies
- Milestone 9 (scheduling)
- Milestone 10 (operational endpoints)
Tasks
- 11.1 Create
docker/Dockerfile.rust- Multi-stage build
- Minimal runtime image
- Health check wiring (uses
/health)
- 11.2 Create
docker-compose.ymlfor local dev- Runner container (mode configurable)
- NATS server (JetStream enabled)
- Optional: Grafana, Victoria Metrics, Loki
- 11.3 Define container entrypoint behavior
- Config loading
- Graceful shutdown on SIGTERM
- For saga/effect consumers: stop pulling new messages before exit
- Flush outbox relay safely (do not delete outbox entries without confirmation)
- Timeout-based forced shutdown policy
- 11.4 Define environment variables and defaults
- NATS URL, stream names, subject filters
- Storage path
- Mode: saga/effect/combined
- Multi-tenancy enabled flag + default tenant behavior
- Consumer settings: durable name strategy, deliver group, max in-flight, ack wait, max deliver
- 11.5 Create release build optimization
- LTO, strip, single codegen unit
Tests
- T11.1 Container builds successfully
docker build -f docker/Dockerfile.rust --build-arg PACKAGE=runner --build-arg BIN=runner -t cloudlysis/runner:local . docker run cloudlysis/runner:local --help - T11.2 Container starts with valid config
docker run -e RUNNER_NATS_URL=nats://nats:4222 runner:latest
Milestone 12: Provisioning, Scalability, and Docker Swarm Deployment
Goal: Support horizontal scaling and safe rollouts in Docker Swarm with clear worker-pool semantics for JetStream consumers and outbox relay.
Dependencies
- Milestone 11 (container & deployment)
Tasks
- 12.1 Define the scaling model for saga + effect workers
- Saga workers: durable consumer filtered to aggregate event subjects
- Effect workers: durable consumer filtered to effect command subjects
- Deliver group strategy so replicas share workload without duplication
- Consumer configuration requirements (ack policy, max in-flight, ack wait, max deliver)
- 12.2 Implement replica-safe processing invariants
- Ack only after storage transaction commits
- Outbox relay deletes only after durable confirmation (publish ack / gateway response)
- Optional per-key serialization for workflows that require strict ordering
- 12.3 Add tenant-aware provisioning option (sharding)
- Optional tenant-range sharding by subject filters (e.g.,
tenant.<range>.*) - Placement constraints for Swarm nodes (e.g.,
node.labels.tenant_range==<range>) - Strategy for adding/removing shards and rebalancing tenants
- Optional tenant-range sharding by subject filters (e.g.,
- 12.4 Optional: NATS KV-backed tenant placement config
- Define a
TENANT_PLACEMENTbucket to store tenant → shard assignments - Watch for config changes and apply without restart
- Define a
- 12.5 Create Swarm stack definition (
swarm/stacks/platform.yml)- Service definition(s) for saga/effect/combined modes
- Replicas configuration
- Resource limits (CPU, memory)
- Health check integration (
/health,/ready) - Storage volume mapping for
edge-storagedata directory - Secrets/config wiring for provider credentials (no secrets in env vars)
- 12.6 Define rollout + drain strategy
- Rolling update parameters
- Drain behavior: stop acquiring work, finish in-flight, flush relay
- Safe rollback story (old replicas still valid due to idempotency + checkpointing)
Tests
- T12.1 Stack file valid
docker stack config -c swarm/stacks/platform.yml - T12.2 Scale-out does not duplicate work (ignored by default; run with
RUNNER_TEST_NATS_URL=... cargo test -- --ignored)- Start 2+ replicas pulling from the same durable consumer deliver group
- Verify saga checkpoint monotonicity and outbox dispatch idempotency
- T12.3 Rolling restart preserves correctness (ignored by default; run with
RUNNER_TEST_NATS_URL=... cargo test -- --ignored)- Restart replicas during active processing
- Verify no duplicate effects and no lost outbox items
Milestone 13: Observability + Safety Policies
Goal: Ensure production-grade logs, metrics, and policy controls consistent with Aggregate/Projection.
Dependencies
- Milestone 10 (operational endpoints) and prior runtime milestones
Tasks
- 13.1 Integrate
edge-logger-clientlogging pipeline- Ensure tenant_id, trace_id, correlation_id are included in structured logs
- 13.2 Implement core metrics
- consumer lag, redeliveries, processing latency
- outbox depth and dispatch latency
- effect success/failure counts and duration histograms
- schedule lag and scan durations
- 13.3 Implement poison message quarantine records
- Store minimal decoded context and error reason in KV under a deadletter namespace
- 13.4 Standardize correlation and trace propagation to match Gateway/Control conventions
- When submitting commands via the Gateway, include
x-correlation-idandtraceparentheaders when available in workflow metadata - When publishing to JetStream, include
x-correlation-idandtraceparentmessage headers when available - Ensure
correlation_idandtrace_idappear in logs/spans for dispatch and effect execution paths
- When submitting commands via the Gateway, include
Tests
- T13.1 Unit test: metrics exporter emits key metrics
- T13.2 Unit test: quarantine record is written on poison handling path
- T13.3 Integration test: Gateway-bound requests include
x-correlation-idwhen workflow metadata supplies a correlation_id (ignored by default; requires NATS)
Milestone 14: Integration Hardening (Replay, Scale, Compatibility)
Goal: Validate end-to-end correctness across saga + outbox + effect execution, and implement operational workflows needed for safe scaling.
Dependencies
- Milestone 6–13
Tasks
- 14.1 Implement controlled replay for sagas
- Reset
checkpoint:{tenant_id}:{saga_name}with explicit operator intent - Safety checks to prevent accidental full replays
- Reset
- 14.2 Validate worker-pool semantics
- deliver groups distribute work across replicas
- per-key ordering enforcement option for workflows that need it
- 14.3 Validate graceful drain behavior
- no new work acquired
- in-flight finishes
- outbox relay flush completes
- 14.4 End-to-end integration test suite
- Aggregate event → saga transition → outbox command → effect command → effect result → saga completes
Tests
- T14.1 Integration test: full happy-path workflow (ignored by default; requires NATS)
- T14.2 Integration test: crash/restart across boundaries preserves atomicity (ignored by default; requires NATS)
- restart after state commit but before dispatch
- restart after dispatch but before outbox delete
- restart during effect execution and redelivery