Files
cloudlysis/runner/DEVELOPMENT_PLAN.md
Vlad Durnea 1298d9a3df
Some checks failed
ci / rust (push) Failing after 2m34s
ci / ui (push) Failing after 30s
Monorepo consolidation: workspace, shared types, transport plans, docker/swam assets
2026-03-30 11:40:42 +03:00

563 lines
23 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Development Plan: Runner Node
## Overview
This plan breaks down the Runner node implementation into milestones ordered by dependency. Each milestone includes:
- **Tasks** with clear deliverables
- **Test Requirements** (unit tests + tautological tests + integration tests where applicable)
- **Dependencies** on previous milestones
**Development Approach:**
1. Complete one milestone at a time
2. Write tests before implementation (TDD where applicable)
3. All tests must pass before moving to the next milestone
4. Mark tasks complete with `[x]` as you progress
---
## Milestone 0: Repo Bootstrap (Dev Ergonomics + Guardrails)
**Goal:** Establish baseline development workflows and guardrails so later milestones can be executed and verified consistently.
### Tasks
- [x] **0.1** Define canonical local commands
- `cargo test`
- `cargo fmt --check`
- `cargo clippy -- -D warnings`
- `cargo run -- --help`
- [x] **0.2** Add minimal CI entrypoints (repository-level)
- Run fmt + clippy + tests on every PR
- Ensure CI uses the same commands as **0.1**
- [x] **0.3** Define integration-test gating pattern (NATS-required)
- Use ignored tests that run only when `RUNNER_TEST_NATS_URL` is set
- Make the plans “ignored by default” integration tests follow the same convention
- [x] **0.4** Define baseline operational invariants (written as checklist items for later milestones)
- Never ack before durable commit
- Never delete outbox item before durable confirmation
- Never execute effect twice for the same `(tenant_id, command_id)`
- Always propagate `tenant_id`, `correlation_id`, and `trace_id`
### Tests
- [x] **T0.1** Tautological test: baseline test harness runs
```rust
#[test]
fn test_harness_runs() {
assert!(true);
}
```
---
## Milestone 1: Project Foundation
**Goal:** Set up the Runner as a standalone Rust container with correct dependencies and a stable module layout.
### Tasks
- [x] **1.1** Initialize Cargo project
- Create `src/lib.rs` and `src/main.rs`
- Configure `Cargo.toml` with the madapes registry
- [x] **1.2** Configure dependencies (aligned with Aggregate/Projection)
- `edge-storage` (KvStore)
- `runtime-function` (Saga `on_event` / `compensation`)
- `edge-logger-client` (structured logs client)
- `query-engine` (optional admin/debug queries)
- `async-nats` (JetStream)
- `tokio`, `serde`, `serde_json`, `thiserror`, `anyhow`, `tracing`, `tracing-subscriber`, `uuid`, `chrono`, `axum`
- [x] **1.3** Establish initial module layout
```
src/
├── lib.rs
├── main.rs
├── types/
│ ├── mod.rs
│ ├── id.rs
│ ├── keys.rs
│ ├── envelope.rs
│ └── error.rs
├── config/
│ ├── mod.rs
│ └── settings.rs
├── storage/
│ ├── mod.rs
│ └── kv.rs
├── stream/
│ ├── mod.rs
│ └── jetstream.rs
├── saga/
│ ├── mod.rs
│ ├── manifest.rs
│ └── runtime.rs
├── outbox/
│ ├── mod.rs
│ └── relay.rs
├── effects/
│ ├── mod.rs
│ ├── manifest.rs
│ ├── runtime.rs
│ └── providers/
│ └── mod.rs
├── schedule/
│ ├── mod.rs
│ └── scheduler.rs
├── http/
│ └── mod.rs
└── observability/
└── mod.rs
```
- [x] **1.4** Configure clippy and rustfmt (CI-friendly)
### Tests
- [x] **T1.1** Project compiles successfully
- [x] **T1.2** Dependencies resolve from madapes registry
- [x] **T1.3** Clippy passes with no warnings
---
## Milestone 2: Core Types + Keyspaces
**Goal:** Implement the foundational types that the Runner depends on: multi-tenancy, envelope shapes, stable key composition, and error model.
### Dependencies
- Milestone 1 (project foundation)
### Tasks
- [x] **2.1** Implement `TenantId` type (same semantics as other nodes)
- String wrapper, default empty for single-tenant
- Display, FromStr, Serialize, Deserialize
- [x] **2.2** Implement workflow identity types
- `SagaName`, `EffectName` (string wrappers)
- `CorrelationId` (string wrapper)
- `WorkId` (UUID v7 or string wrapper depending on work kind)
- [x] **2.3** Implement stable key composition helpers
- `saga:{tenant_id}:{saga_name}:{correlation_id}`
- `checkpoint:{tenant_id}:{saga_name}`
- `outbox:{tenant_id}:{work_kind}:{work_id}`
- `schedule:{tenant_id}:{saga_name}:{correlation_id}:{due_at}`
- `dedupe:{tenant_id}:{saga_name}:{event_id}`
- `dedupe:{tenant_id}:effect:{command_id}`
- [x] **2.4** Define message envelopes used by the Runner (serde types)
- Aggregate event envelope (consumed from `AGGREGATE_EVENTS`)
- Effect command envelope (consumed from workflow command stream)
- Effect result event envelope (produced to workflow event stream)
- Gateway command submission shape (compatible with Aggregate gateway request fields)
- Forward-compatible decoding (unknown fields ignored where practical)
- [x] **2.5** Implement Runner error model
- Storage errors, stream errors, decode errors, runtime-function errors
- Tenant access errors
- Policy errors (poison message, quarantine, etc.)
### Tests
- [x] **T2.1** `TenantId` round-trips serialization and defaults to empty
- [x] **T2.2** Key composition produces stable strings across all keyspaces
- [x] **T2.3** Envelope decoding ignores unknown fields (forward compatibility)
- [x] **T2.4** Tautological test: core types are Send + Sync
---
## Milestone 3: Configuration
**Goal:** Implement settings and startup validation for modes (saga/effect/combined), multi-tenancy, storage, and JetStream.
### Dependencies
- Milestone 2 (core types)
### Tasks
- [x] **3.1** Define `Settings` struct
- NATS URL
- Storage path
- Mode: saga/effect/combined
- Multi-tenancy enabled flag + default tenant behavior
- Stream names:
- `AGGREGATE_EVENTS` (existing)
- `WORKFLOW_COMMANDS` (runner work distribution)
- `WORKFLOW_EVENTS` (effect results + optional workflow facts)
- Subject filters for saga triggers and effect commands
- Consumer configuration: durable name strategy, deliver group, max in-flight, ack wait, max deliver
- Backpressure configuration: per-key concurrency, batching, relay polling intervals
- Manifest paths: sagas and effects (YAML/JSON)
- [x] **3.2** Implement config loading from environment variables
- [x] **3.3** Implement config loading from file (YAML/TOML/JSON)
- Environment overrides file
- [x] **3.4** Implement config validation
- Required fields present
- Stream names and consumer naming rules valid
- Manifests load and validate at startup
### Tests
- [x] **T3.1** Settings loads from environment variables
- [x] **T3.2** Settings validation catches missing/invalid values
- [x] **T3.3** Tautological test: Settings is Clone + Debug
---
## Milestone 4: Storage Layer (KvStore Transactions)
**Goal:** Integrate `edge-storage` `KvStore` and provide transaction APIs that enforce the Runners atomicity requirements.
### Dependencies
- Milestone 2 (core types)
- Milestone 3 (configuration)
### Tasks
- [x] **4.1** Create `KvClient` wrapper
- Opens MDBX-backed KvStore at configured path
- Tenant-aware key helpers for all namespaces
- [x] **4.2** Implement saga state primitives
- `get_saga_state(key) -> Option<Value>`
- `put_saga_state(key, value)`
- [x] **4.3** Implement checkpoint primitives
- `get_checkpoint(key) -> Option<u64>`
- `put_checkpoint(key, u64)`
- [x] **4.4** Implement outbox primitives
- `put_outbox_item(key, item)`
- `list_outbox_prefix(tenant_id, ...)` (for relay scanning)
- `delete_outbox_item(key)`
- [x] **4.5** Implement schedule primitives
- `put_schedule_item(key, payload)`
- `scan_due_schedule_items(now)` (prefix scan + due filtering)
- `delete_schedule_item(key)`
- [x] **4.6** Implement atomic commit API for saga processing
- One transaction: update saga state + write outbox items + advance checkpoint + record dedupe marker(s)
- Provide an API surface that makes partial updates difficult
### Tests
- [x] **T4.1** Saga state round-trip: put/get returns identical JSON
- [x] **T4.2** Checkpoint round-trip: put/get returns identical value
- [x] **T4.3** Atomicity: if transaction fails, state/outbox/checkpoint are not partially committed
- [x] **T4.4** Outbox delete removes keys reliably
- [x] **T4.5** Schedule scan only returns due items and respects tenant scoping
---
## Milestone 5: JetStream Integration (Worker Pool Semantics)
**Goal:** Consume and produce JetStream messages with correct delivery semantics, ack discipline, idempotency, and backpressure.
### Dependencies
- Milestone 4 (storage layer)
### Tasks
- [x] **5.1** Implement JetStream client wrapper
- Connect to NATS and create JetStream context
- Bind to required streams (create-if-missing for workflow streams if enabled by config)
- [x] **5.2** Implement saga trigger consumer
- Durable consumer filtered to aggregate event subjects (tenant-aware)
- Deliver group support for worker pool replicas
- Extract stream sequence from message metadata
- [x] **5.3** Implement effect command consumer
- Durable consumer filtered to effect command subjects (tenant-aware)
- Deliver group support
- [x] **5.4** Implement publish wrappers
- Publish effect result events with headers (tenant-id, command-id, effect-name, trace-id)
- Publish optional workflow facts if needed
- [x] **5.5** Enforce ack discipline
- Ack only after the relevant storage transaction commits and/or downstream publish is confirmed
- [x] **5.6** Implement poison-message policy wiring
- Consumer max-deliver configured
- After max attempts: quarantine record in KV + TERM ack
### Tests
- [x] **T5.1** Unit test: checkpoint/dedupe gates skip already-processed items
- [x] **T5.2** Unit test: ack is not performed when storage commit fails
- [x] **T5.3** Integration test: JetStream redelivery is idempotent (ignored by default; enabled with `RUNNER_TEST_NATS_URL=...`)
---
## Milestone 6: Saga Runtime (Deterministic Execution)
**Goal:** Execute Saga logic as deterministic `runtime-function` DAG programs, producing work items that are persisted into the outbox.
### Dependencies
- Milestone 5 (JetStream consumption)
### Tasks
- [x] **6.1** Define Saga program invocation contract
- Input: `{ saga_state, event }`
- Output: `{ new_saga_state, work_items[], schedules[] }`
- [x] **6.2** Implement `runtime-function` execution wrapper
- Gas limits and timeouts
- Deterministic inputs only (no I/O, no clock access)
- [x] **6.3** Implement Saga manifest
- Defines sagas, trigger filters, program references (`on_event`, `compensation`)
- Validate referenced programs exist
- [x] **6.4** Implement saga processing pipeline
- Load saga state
- Execute program
- Atomic commit (state + outbox + checkpoint + dedupe + schedule items)
### Tests
- [x] **T6.1** Unit test: same inputs produce same outputs (determinism)
- [x] **T6.2** Unit test: pipeline writes checkpoint only if state/outbox commit succeeds
- [x] **T6.3** Unit test: dedupe prevents duplicate transitions for the same event_id
---
## Milestone 7: Outbox Relay (Reliable Dispatch)
**Goal:** Reliably deliver outbox work items to their destinations without dual-write gaps, supporting retries and backpressure.
### Dependencies
- Milestone 6 (saga runtime)
### Tasks
- [x] **7.1** Implement outbox relay loop
- Poll `outbox:` prefix in KV with bounded batch size
- Emit metrics for outbox depth and dispatch latency
- [x] **7.2** Implement dispatch targets
- Aggregate commands via Gateway submission (HTTP/gRPC, tenant-scoped)
- Always send `x-tenant-id` and propagate `correlation_id`/`trace_id` metadata
- Effect commands published to `WORKFLOW_COMMANDS`
- [x] **7.3** Implement idempotency for relay dispatch
- Safe retries: dispatch operation is idempotent using `command_id` (and/or JetStream `Nats-Msg-Id`)
- Only delete outbox item after durable confirmation (publish ack / gateway response)
- [x] **7.4** Implement backpressure controls
- Max in-flight dispatches per tenant
- Bounded retries with backoff
### Tests
- [x] **T7.1** Unit test: outbox item is not deleted if dispatch fails
- [x] **T7.2** Unit test: dispatch success deletes outbox item exactly once
- [x] **T7.3** Integration test: crash/restart simulation re-dispatches pending outbox items without duplicates (ignored by default)
---
## Milestone 8: Effect Worker Runtime (Non-Deterministic Execution)
**Goal:** Consume effect commands, execute side effects with reliability controls, publish result events, and ensure idempotency.
### Dependencies
- Milestone 5 (effect command consumer)
- Milestone 4 (dedupe storage)
### Tasks
- [x] **8.1** Define Effect command/result contract
- Command input: `{ tenant_id, command_id, effect_name, payload, metadata }`
- Result event output: `{ tenant_id, command_id, effect_name, result_type, payload, timestamp }`
- [x] **8.2** Implement provider interface
- Provider receives decoded command and returns a typed outcome
- Support per-provider configuration via manifest
- Support secret references resolved from Swarm secrets/config at runtime (no secrets in git)
- [x] **8.3** Implement reliability controls
- retries + exponential backoff
- timeouts
- circuit breakers per upstream
- [x] **8.4** Implement idempotency gate
- Check `dedupe:{tenant_id}:effect:{command_id}` before executing external call
- Record completion only after result publish is acknowledged
- [x] **8.5** Publish result events to `WORKFLOW_EVENTS`
- Include headers for correlation/trace propagation when present
### Tests
- [x] **T8.1** Unit test: idempotency gate prevents double execution for same command_id
- [x] **T8.2** Unit test: result publish failure does not mark command as completed
- [x] **T8.3** Integration test: simulated redelivery of effect command does not duplicate external call (ignored by default)
---
## Milestone 9: Scheduling (Durable Timeouts/Reminders)
**Goal:** Implement durable scheduling for saga timeouts using KV-backed schedules rather than in-memory timers as the source of truth.
### Dependencies
- Milestone 4 (schedule primitives)
- Milestone 6 (saga emits schedules)
### Tasks
- [x] **9.1** Implement scheduler loop
- Periodically scan due `schedule:` items (tenant-aware)
- Emit scheduling metrics (due count, scan time, lag)
- [x] **9.2** Define reminder delivery mechanism
- Option A: publish a workflow event to `WORKFLOW_EVENTS` that sagas consume
- Option B: inject an internal event into the saga pipeline without JetStream (only if it preserves restart correctness)
- [x] **9.3** Ensure idempotency for reminders
- Reminder keys encode due time and correlation_id, allowing safe retry without duplicates
### Tests
- [x] **T9.1** Unit test: due schedule item is delivered and then deleted
- [x] **T9.2** Unit test: scheduler is tenant-scoped
- [x] **T9.3** Integration test: restart re-scans and delivers still-due reminders exactly once (ignored by default)
---
## Milestone 10: HTTP Endpoints + Operational Controls (Under Gateway)
**Goal:** Provide health/readiness/metrics/info and operational controls (drain/reload) aligned with other nodes.
### Dependencies
- Milestone 3 (settings)
- Milestone 4 (storage)
- Milestone 5 (JetStream)
### Tasks
- [x] **10.1** Implement `/health`
- Storage writable check + JetStream connectivity check
- [x] **10.2** Implement `/ready`
- Not draining + storage writable + JetStream reachable
- Stop reporting ready before shutdown/drain to support safe rollouts
- [x] **10.3** Implement `/metrics`
- Worker lag, outbox depth, effect latency, schedule lag
- [x] **10.4** Implement `/info`
- Build info, mode, stream/consumer names, enabled saga/effect sets
- [x] **10.5** Implement `/admin/drain`
- Stop acquiring new work, finish in-flight, flush relay, then report draining state
- [x] **10.6** Implement `/admin/reload`
- Hot-reload manifests and (optionally) tenant placement config where safe
### Tests
- [x] **T10.1** Unit test: readiness toggles with draining flag
- [x] **T10.2** Unit test: health fails when storage is unwritable
---
## Milestone 11: Container & Deployment
**Goal:** Package the Runner as a container and define entrypoint behavior consistent with Aggregate/Projection.
### Dependencies
- Milestone 9 (scheduling)
- Milestone 10 (operational endpoints)
### Tasks
- [x] **11.1** Create `docker/Dockerfile.rust`
- Multi-stage build
- Minimal runtime image
- Health check wiring (uses `/health`)
- [x] **11.2** Create `docker-compose.yml` for local dev
- Runner container (mode configurable)
- NATS server (JetStream enabled)
- Optional: Grafana, Victoria Metrics, Loki
- [x] **11.3** Define container entrypoint behavior
- Config loading
- Graceful shutdown on SIGTERM
- For saga/effect consumers: stop pulling new messages before exit
- Flush outbox relay safely (do not delete outbox entries without confirmation)
- Timeout-based forced shutdown policy
- [x] **11.4** Define environment variables and defaults
- NATS URL, stream names, subject filters
- Storage path
- Mode: saga/effect/combined
- Multi-tenancy enabled flag + default tenant behavior
- Consumer settings: durable name strategy, deliver group, max in-flight, ack wait, max deliver
- [x] **11.5** Create release build optimization
- LTO, strip, single codegen unit
### Tests
- [x] **T11.1** Container builds successfully
```bash
docker build -f docker/Dockerfile.rust --build-arg PACKAGE=runner --build-arg BIN=runner -t cloudlysis/runner:local .
docker run cloudlysis/runner:local --help
```
- [x] **T11.2** Container starts with valid config
```bash
docker run -e RUNNER_NATS_URL=nats://nats:4222 runner:latest
```
---
## Milestone 12: Provisioning, Scalability, and Docker Swarm Deployment
**Goal:** Support horizontal scaling and safe rollouts in Docker Swarm with clear worker-pool semantics for JetStream consumers and outbox relay.
### Dependencies
- Milestone 11 (container & deployment)
### Tasks
- [x] **12.1** Define the scaling model for saga + effect workers
- Saga workers: durable consumer filtered to aggregate event subjects
- Effect workers: durable consumer filtered to effect command subjects
- Deliver group strategy so replicas share workload without duplication
- Consumer configuration requirements (ack policy, max in-flight, ack wait, max deliver)
- [x] **12.2** Implement replica-safe processing invariants
- Ack only after storage transaction commits
- Outbox relay deletes only after durable confirmation (publish ack / gateway response)
- Optional per-key serialization for workflows that require strict ordering
- [x] **12.3** Add tenant-aware provisioning option (sharding)
- Optional tenant-range sharding by subject filters (e.g., `tenant.<range>.*`)
- Placement constraints for Swarm nodes (e.g., `node.labels.tenant_range==<range>`)
- Strategy for adding/removing shards and rebalancing tenants
- [x] **12.4** Optional: NATS KV-backed tenant placement config
- Define a `TENANT_PLACEMENT` bucket to store tenant → shard assignments
- Watch for config changes and apply without restart
- [x] **12.5** Create Swarm stack definition (`swarm/stacks/platform.yml`)
- Service definition(s) for saga/effect/combined modes
- Replicas configuration
- Resource limits (CPU, memory)
- Health check integration (`/health`, `/ready`)
- Storage volume mapping for `edge-storage` data directory
- Secrets/config wiring for provider credentials (no secrets in env vars)
- [x] **12.6** Define rollout + drain strategy
- Rolling update parameters
- Drain behavior: stop acquiring work, finish in-flight, flush relay
- Safe rollback story (old replicas still valid due to idempotency + checkpointing)
### Tests
- [x] **T12.1** Stack file valid
```bash
docker stack config -c swarm/stacks/platform.yml
```
- [x] **T12.2** Scale-out does not duplicate work (ignored by default; run with `RUNNER_TEST_NATS_URL=... cargo test -- --ignored`)
- Start 2+ replicas pulling from the same durable consumer deliver group
- Verify saga checkpoint monotonicity and outbox dispatch idempotency
- [x] **T12.3** Rolling restart preserves correctness (ignored by default; run with `RUNNER_TEST_NATS_URL=... cargo test -- --ignored`)
- Restart replicas during active processing
- Verify no duplicate effects and no lost outbox items
---
## Milestone 13: Observability + Safety Policies
**Goal:** Ensure production-grade logs, metrics, and policy controls consistent with Aggregate/Projection.
### Dependencies
- Milestone 10 (operational endpoints) and prior runtime milestones
### Tasks
- [x] **13.1** Integrate `edge-logger-client` logging pipeline
- Ensure tenant_id, trace_id, correlation_id are included in structured logs
- [x] **13.2** Implement core metrics
- consumer lag, redeliveries, processing latency
- outbox depth and dispatch latency
- effect success/failure counts and duration histograms
- schedule lag and scan durations
- [x] **13.3** Implement poison message quarantine records
- Store minimal decoded context and error reason in KV under a deadletter namespace
- [ ] **13.4** Standardize correlation and trace propagation to match Gateway/Control conventions
- When submitting commands via the Gateway, include `x-correlation-id` and `traceparent` headers when available in workflow metadata
- When publishing to JetStream, include `x-correlation-id` and `traceparent` message headers when available
- Ensure `correlation_id` and `trace_id` appear in logs/spans for dispatch and effect execution paths
### Tests
- [x] **T13.1** Unit test: metrics exporter emits key metrics
- [x] **T13.2** Unit test: quarantine record is written on poison handling path
- [ ] **T13.3** Integration test: Gateway-bound requests include `x-correlation-id` when workflow metadata supplies a correlation_id (ignored by default; requires NATS)
---
## Milestone 14: Integration Hardening (Replay, Scale, Compatibility)
**Goal:** Validate end-to-end correctness across saga + outbox + effect execution, and implement operational workflows needed for safe scaling.
### Dependencies
- Milestone 613
### Tasks
- [x] **14.1** Implement controlled replay for sagas
- Reset `checkpoint:{tenant_id}:{saga_name}` with explicit operator intent
- Safety checks to prevent accidental full replays
- [x] **14.2** Validate worker-pool semantics
- deliver groups distribute work across replicas
- per-key ordering enforcement option for workflows that need it
- [x] **14.3** Validate graceful drain behavior
- no new work acquired
- in-flight finishes
- outbox relay flush completes
- [x] **14.4** End-to-end integration test suite
- Aggregate event → saga transition → outbox command → effect command → effect result → saga completes
### Tests
- [x] **T14.1** Integration test: full happy-path workflow (ignored by default; requires NATS)
- [x] **T14.2** Integration test: crash/restart across boundaries preserves atomicity (ignored by default; requires NATS)
- restart after state commit but before dispatch
- restart after dispatch but before outbox delete
- restart during effect execution and redelivery