Files
cloudlysis/runner/DEVELOPMENT_PLAN.md
Vlad Durnea 1298d9a3df
Some checks failed
ci / rust (push) Failing after 2m34s
ci / ui (push) Failing after 30s
Monorepo consolidation: workspace, shared types, transport plans, docker/swam assets
2026-03-30 11:40:42 +03:00

23 KiB
Raw Permalink Blame History

Development Plan: Runner Node

Overview

This plan breaks down the Runner node implementation into milestones ordered by dependency. Each milestone includes:

  • Tasks with clear deliverables
  • Test Requirements (unit tests + tautological tests + integration tests where applicable)
  • Dependencies on previous milestones

Development Approach:

  1. Complete one milestone at a time
  2. Write tests before implementation (TDD where applicable)
  3. All tests must pass before moving to the next milestone
  4. Mark tasks complete with [x] as you progress

Milestone 0: Repo Bootstrap (Dev Ergonomics + Guardrails)

Goal: Establish baseline development workflows and guardrails so later milestones can be executed and verified consistently.

Tasks

  • 0.1 Define canonical local commands
    • cargo test
    • cargo fmt --check
    • cargo clippy -- -D warnings
    • cargo run -- --help
  • 0.2 Add minimal CI entrypoints (repository-level)
    • Run fmt + clippy + tests on every PR
    • Ensure CI uses the same commands as 0.1
  • 0.3 Define integration-test gating pattern (NATS-required)
    • Use ignored tests that run only when RUNNER_TEST_NATS_URL is set
    • Make the plans “ignored by default” integration tests follow the same convention
  • 0.4 Define baseline operational invariants (written as checklist items for later milestones)
    • Never ack before durable commit
    • Never delete outbox item before durable confirmation
    • Never execute effect twice for the same (tenant_id, command_id)
    • Always propagate tenant_id, correlation_id, and trace_id

Tests

  • T0.1 Tautological test: baseline test harness runs
    #[test]
    fn test_harness_runs() {
        assert!(true);
    }
    

Milestone 1: Project Foundation

Goal: Set up the Runner as a standalone Rust container with correct dependencies and a stable module layout.

Tasks

  • 1.1 Initialize Cargo project
    • Create src/lib.rs and src/main.rs
    • Configure Cargo.toml with the madapes registry
  • 1.2 Configure dependencies (aligned with Aggregate/Projection)
    • edge-storage (KvStore)
    • runtime-function (Saga on_event / compensation)
    • edge-logger-client (structured logs client)
    • query-engine (optional admin/debug queries)
    • async-nats (JetStream)
    • tokio, serde, serde_json, thiserror, anyhow, tracing, tracing-subscriber, uuid, chrono, axum
  • 1.3 Establish initial module layout
    src/
    ├── lib.rs
    ├── main.rs
    ├── types/
    │   ├── mod.rs
    │   ├── id.rs
    │   ├── keys.rs
    │   ├── envelope.rs
    │   └── error.rs
    ├── config/
    │   ├── mod.rs
    │   └── settings.rs
    ├── storage/
    │   ├── mod.rs
    │   └── kv.rs
    ├── stream/
    │   ├── mod.rs
    │   └── jetstream.rs
    ├── saga/
    │   ├── mod.rs
    │   ├── manifest.rs
    │   └── runtime.rs
    ├── outbox/
    │   ├── mod.rs
    │   └── relay.rs
    ├── effects/
    │   ├── mod.rs
    │   ├── manifest.rs
    │   ├── runtime.rs
    │   └── providers/
    │       └── mod.rs
    ├── schedule/
    │   ├── mod.rs
    │   └── scheduler.rs
    ├── http/
    │   └── mod.rs
    └── observability/
        └── mod.rs
    
  • 1.4 Configure clippy and rustfmt (CI-friendly)

Tests

  • T1.1 Project compiles successfully
  • T1.2 Dependencies resolve from madapes registry
  • T1.3 Clippy passes with no warnings

Milestone 2: Core Types + Keyspaces

Goal: Implement the foundational types that the Runner depends on: multi-tenancy, envelope shapes, stable key composition, and error model.

Dependencies

  • Milestone 1 (project foundation)

Tasks

  • 2.1 Implement TenantId type (same semantics as other nodes)
    • String wrapper, default empty for single-tenant
    • Display, FromStr, Serialize, Deserialize
  • 2.2 Implement workflow identity types
    • SagaName, EffectName (string wrappers)
    • CorrelationId (string wrapper)
    • WorkId (UUID v7 or string wrapper depending on work kind)
  • 2.3 Implement stable key composition helpers
    • saga:{tenant_id}:{saga_name}:{correlation_id}
    • checkpoint:{tenant_id}:{saga_name}
    • outbox:{tenant_id}:{work_kind}:{work_id}
    • schedule:{tenant_id}:{saga_name}:{correlation_id}:{due_at}
    • dedupe:{tenant_id}:{saga_name}:{event_id}
    • dedupe:{tenant_id}:effect:{command_id}
  • 2.4 Define message envelopes used by the Runner (serde types)
    • Aggregate event envelope (consumed from AGGREGATE_EVENTS)
    • Effect command envelope (consumed from workflow command stream)
    • Effect result event envelope (produced to workflow event stream)
    • Gateway command submission shape (compatible with Aggregate gateway request fields)
    • Forward-compatible decoding (unknown fields ignored where practical)
  • 2.5 Implement Runner error model
    • Storage errors, stream errors, decode errors, runtime-function errors
    • Tenant access errors
    • Policy errors (poison message, quarantine, etc.)

Tests

  • T2.1 TenantId round-trips serialization and defaults to empty
  • T2.2 Key composition produces stable strings across all keyspaces
  • T2.3 Envelope decoding ignores unknown fields (forward compatibility)
  • T2.4 Tautological test: core types are Send + Sync

Milestone 3: Configuration

Goal: Implement settings and startup validation for modes (saga/effect/combined), multi-tenancy, storage, and JetStream.

Dependencies

  • Milestone 2 (core types)

Tasks

  • 3.1 Define Settings struct
    • NATS URL
    • Storage path
    • Mode: saga/effect/combined
    • Multi-tenancy enabled flag + default tenant behavior
    • Stream names:
      • AGGREGATE_EVENTS (existing)
      • WORKFLOW_COMMANDS (runner work distribution)
      • WORKFLOW_EVENTS (effect results + optional workflow facts)
    • Subject filters for saga triggers and effect commands
    • Consumer configuration: durable name strategy, deliver group, max in-flight, ack wait, max deliver
    • Backpressure configuration: per-key concurrency, batching, relay polling intervals
    • Manifest paths: sagas and effects (YAML/JSON)
  • 3.2 Implement config loading from environment variables
  • 3.3 Implement config loading from file (YAML/TOML/JSON)
    • Environment overrides file
  • 3.4 Implement config validation
    • Required fields present
    • Stream names and consumer naming rules valid
    • Manifests load and validate at startup

Tests

  • T3.1 Settings loads from environment variables
  • T3.2 Settings validation catches missing/invalid values
  • T3.3 Tautological test: Settings is Clone + Debug

Milestone 4: Storage Layer (KvStore Transactions)

Goal: Integrate edge-storage KvStore and provide transaction APIs that enforce the Runners atomicity requirements.

Dependencies

  • Milestone 2 (core types)
  • Milestone 3 (configuration)

Tasks

  • 4.1 Create KvClient wrapper
    • Opens MDBX-backed KvStore at configured path
    • Tenant-aware key helpers for all namespaces
  • 4.2 Implement saga state primitives
    • get_saga_state(key) -> Option<Value>
    • put_saga_state(key, value)
  • 4.3 Implement checkpoint primitives
    • get_checkpoint(key) -> Option<u64>
    • put_checkpoint(key, u64)
  • 4.4 Implement outbox primitives
    • put_outbox_item(key, item)
    • list_outbox_prefix(tenant_id, ...) (for relay scanning)
    • delete_outbox_item(key)
  • 4.5 Implement schedule primitives
    • put_schedule_item(key, payload)
    • scan_due_schedule_items(now) (prefix scan + due filtering)
    • delete_schedule_item(key)
  • 4.6 Implement atomic commit API for saga processing
    • One transaction: update saga state + write outbox items + advance checkpoint + record dedupe marker(s)
    • Provide an API surface that makes partial updates difficult

Tests

  • T4.1 Saga state round-trip: put/get returns identical JSON
  • T4.2 Checkpoint round-trip: put/get returns identical value
  • T4.3 Atomicity: if transaction fails, state/outbox/checkpoint are not partially committed
  • T4.4 Outbox delete removes keys reliably
  • T4.5 Schedule scan only returns due items and respects tenant scoping

Milestone 5: JetStream Integration (Worker Pool Semantics)

Goal: Consume and produce JetStream messages with correct delivery semantics, ack discipline, idempotency, and backpressure.

Dependencies

  • Milestone 4 (storage layer)

Tasks

  • 5.1 Implement JetStream client wrapper
    • Connect to NATS and create JetStream context
    • Bind to required streams (create-if-missing for workflow streams if enabled by config)
  • 5.2 Implement saga trigger consumer
    • Durable consumer filtered to aggregate event subjects (tenant-aware)
    • Deliver group support for worker pool replicas
    • Extract stream sequence from message metadata
  • 5.3 Implement effect command consumer
    • Durable consumer filtered to effect command subjects (tenant-aware)
    • Deliver group support
  • 5.4 Implement publish wrappers
    • Publish effect result events with headers (tenant-id, command-id, effect-name, trace-id)
    • Publish optional workflow facts if needed
  • 5.5 Enforce ack discipline
    • Ack only after the relevant storage transaction commits and/or downstream publish is confirmed
  • 5.6 Implement poison-message policy wiring
    • Consumer max-deliver configured
    • After max attempts: quarantine record in KV + TERM ack

Tests

  • T5.1 Unit test: checkpoint/dedupe gates skip already-processed items
  • T5.2 Unit test: ack is not performed when storage commit fails
  • T5.3 Integration test: JetStream redelivery is idempotent (ignored by default; enabled with RUNNER_TEST_NATS_URL=...)

Milestone 6: Saga Runtime (Deterministic Execution)

Goal: Execute Saga logic as deterministic runtime-function DAG programs, producing work items that are persisted into the outbox.

Dependencies

  • Milestone 5 (JetStream consumption)

Tasks

  • 6.1 Define Saga program invocation contract
    • Input: { saga_state, event }
    • Output: { new_saga_state, work_items[], schedules[] }
  • 6.2 Implement runtime-function execution wrapper
    • Gas limits and timeouts
    • Deterministic inputs only (no I/O, no clock access)
  • 6.3 Implement Saga manifest
    • Defines sagas, trigger filters, program references (on_event, compensation)
    • Validate referenced programs exist
  • 6.4 Implement saga processing pipeline
    • Load saga state
    • Execute program
    • Atomic commit (state + outbox + checkpoint + dedupe + schedule items)

Tests

  • T6.1 Unit test: same inputs produce same outputs (determinism)
  • T6.2 Unit test: pipeline writes checkpoint only if state/outbox commit succeeds
  • T6.3 Unit test: dedupe prevents duplicate transitions for the same event_id

Milestone 7: Outbox Relay (Reliable Dispatch)

Goal: Reliably deliver outbox work items to their destinations without dual-write gaps, supporting retries and backpressure.

Dependencies

  • Milestone 6 (saga runtime)

Tasks

  • 7.1 Implement outbox relay loop
    • Poll outbox: prefix in KV with bounded batch size
    • Emit metrics for outbox depth and dispatch latency
  • 7.2 Implement dispatch targets
    • Aggregate commands via Gateway submission (HTTP/gRPC, tenant-scoped)
    • Always send x-tenant-id and propagate correlation_id/trace_id metadata
    • Effect commands published to WORKFLOW_COMMANDS
  • 7.3 Implement idempotency for relay dispatch
    • Safe retries: dispatch operation is idempotent using command_id (and/or JetStream Nats-Msg-Id)
    • Only delete outbox item after durable confirmation (publish ack / gateway response)
  • 7.4 Implement backpressure controls
    • Max in-flight dispatches per tenant
    • Bounded retries with backoff

Tests

  • T7.1 Unit test: outbox item is not deleted if dispatch fails
  • T7.2 Unit test: dispatch success deletes outbox item exactly once
  • T7.3 Integration test: crash/restart simulation re-dispatches pending outbox items without duplicates (ignored by default)

Milestone 8: Effect Worker Runtime (Non-Deterministic Execution)

Goal: Consume effect commands, execute side effects with reliability controls, publish result events, and ensure idempotency.

Dependencies

  • Milestone 5 (effect command consumer)
  • Milestone 4 (dedupe storage)

Tasks

  • 8.1 Define Effect command/result contract
    • Command input: { tenant_id, command_id, effect_name, payload, metadata }
    • Result event output: { tenant_id, command_id, effect_name, result_type, payload, timestamp }
  • 8.2 Implement provider interface
    • Provider receives decoded command and returns a typed outcome
    • Support per-provider configuration via manifest
    • Support secret references resolved from Swarm secrets/config at runtime (no secrets in git)
  • 8.3 Implement reliability controls
    • retries + exponential backoff
    • timeouts
    • circuit breakers per upstream
  • 8.4 Implement idempotency gate
    • Check dedupe:{tenant_id}:effect:{command_id} before executing external call
    • Record completion only after result publish is acknowledged
  • 8.5 Publish result events to WORKFLOW_EVENTS
    • Include headers for correlation/trace propagation when present

Tests

  • T8.1 Unit test: idempotency gate prevents double execution for same command_id
  • T8.2 Unit test: result publish failure does not mark command as completed
  • T8.3 Integration test: simulated redelivery of effect command does not duplicate external call (ignored by default)

Milestone 9: Scheduling (Durable Timeouts/Reminders)

Goal: Implement durable scheduling for saga timeouts using KV-backed schedules rather than in-memory timers as the source of truth.

Dependencies

  • Milestone 4 (schedule primitives)
  • Milestone 6 (saga emits schedules)

Tasks

  • 9.1 Implement scheduler loop
    • Periodically scan due schedule: items (tenant-aware)
    • Emit scheduling metrics (due count, scan time, lag)
  • 9.2 Define reminder delivery mechanism
    • Option A: publish a workflow event to WORKFLOW_EVENTS that sagas consume
    • Option B: inject an internal event into the saga pipeline without JetStream (only if it preserves restart correctness)
  • 9.3 Ensure idempotency for reminders
    • Reminder keys encode due time and correlation_id, allowing safe retry without duplicates

Tests

  • T9.1 Unit test: due schedule item is delivered and then deleted
  • T9.2 Unit test: scheduler is tenant-scoped
  • T9.3 Integration test: restart re-scans and delivers still-due reminders exactly once (ignored by default)

Milestone 10: HTTP Endpoints + Operational Controls (Under Gateway)

Goal: Provide health/readiness/metrics/info and operational controls (drain/reload) aligned with other nodes.

Dependencies

  • Milestone 3 (settings)
  • Milestone 4 (storage)
  • Milestone 5 (JetStream)

Tasks

  • 10.1 Implement /health
    • Storage writable check + JetStream connectivity check
  • 10.2 Implement /ready
    • Not draining + storage writable + JetStream reachable
    • Stop reporting ready before shutdown/drain to support safe rollouts
  • 10.3 Implement /metrics
    • Worker lag, outbox depth, effect latency, schedule lag
  • 10.4 Implement /info
    • Build info, mode, stream/consumer names, enabled saga/effect sets
  • 10.5 Implement /admin/drain
    • Stop acquiring new work, finish in-flight, flush relay, then report draining state
  • 10.6 Implement /admin/reload
    • Hot-reload manifests and (optionally) tenant placement config where safe

Tests

  • T10.1 Unit test: readiness toggles with draining flag
  • T10.2 Unit test: health fails when storage is unwritable

Milestone 11: Container & Deployment

Goal: Package the Runner as a container and define entrypoint behavior consistent with Aggregate/Projection.

Dependencies

  • Milestone 9 (scheduling)
  • Milestone 10 (operational endpoints)

Tasks

  • 11.1 Create docker/Dockerfile.rust
    • Multi-stage build
    • Minimal runtime image
    • Health check wiring (uses /health)
  • 11.2 Create docker-compose.yml for local dev
    • Runner container (mode configurable)
    • NATS server (JetStream enabled)
    • Optional: Grafana, Victoria Metrics, Loki
  • 11.3 Define container entrypoint behavior
    • Config loading
    • Graceful shutdown on SIGTERM
    • For saga/effect consumers: stop pulling new messages before exit
    • Flush outbox relay safely (do not delete outbox entries without confirmation)
    • Timeout-based forced shutdown policy
  • 11.4 Define environment variables and defaults
    • NATS URL, stream names, subject filters
    • Storage path
    • Mode: saga/effect/combined
    • Multi-tenancy enabled flag + default tenant behavior
    • Consumer settings: durable name strategy, deliver group, max in-flight, ack wait, max deliver
  • 11.5 Create release build optimization
    • LTO, strip, single codegen unit

Tests

  • T11.1 Container builds successfully
    docker build -f docker/Dockerfile.rust --build-arg PACKAGE=runner --build-arg BIN=runner -t cloudlysis/runner:local .
    docker run cloudlysis/runner:local --help
    
  • T11.2 Container starts with valid config
    docker run -e RUNNER_NATS_URL=nats://nats:4222 runner:latest
    

Milestone 12: Provisioning, Scalability, and Docker Swarm Deployment

Goal: Support horizontal scaling and safe rollouts in Docker Swarm with clear worker-pool semantics for JetStream consumers and outbox relay.

Dependencies

  • Milestone 11 (container & deployment)

Tasks

  • 12.1 Define the scaling model for saga + effect workers
    • Saga workers: durable consumer filtered to aggregate event subjects
    • Effect workers: durable consumer filtered to effect command subjects
    • Deliver group strategy so replicas share workload without duplication
    • Consumer configuration requirements (ack policy, max in-flight, ack wait, max deliver)
  • 12.2 Implement replica-safe processing invariants
    • Ack only after storage transaction commits
    • Outbox relay deletes only after durable confirmation (publish ack / gateway response)
    • Optional per-key serialization for workflows that require strict ordering
  • 12.3 Add tenant-aware provisioning option (sharding)
    • Optional tenant-range sharding by subject filters (e.g., tenant.<range>.*)
    • Placement constraints for Swarm nodes (e.g., node.labels.tenant_range==<range>)
    • Strategy for adding/removing shards and rebalancing tenants
  • 12.4 Optional: NATS KV-backed tenant placement config
    • Define a TENANT_PLACEMENT bucket to store tenant → shard assignments
    • Watch for config changes and apply without restart
  • 12.5 Create Swarm stack definition (swarm/stacks/platform.yml)
    • Service definition(s) for saga/effect/combined modes
    • Replicas configuration
    • Resource limits (CPU, memory)
    • Health check integration (/health, /ready)
    • Storage volume mapping for edge-storage data directory
    • Secrets/config wiring for provider credentials (no secrets in env vars)
  • 12.6 Define rollout + drain strategy
    • Rolling update parameters
    • Drain behavior: stop acquiring work, finish in-flight, flush relay
    • Safe rollback story (old replicas still valid due to idempotency + checkpointing)

Tests

  • T12.1 Stack file valid
    docker stack config -c swarm/stacks/platform.yml
    
  • T12.2 Scale-out does not duplicate work (ignored by default; run with RUNNER_TEST_NATS_URL=... cargo test -- --ignored)
    • Start 2+ replicas pulling from the same durable consumer deliver group
    • Verify saga checkpoint monotonicity and outbox dispatch idempotency
  • T12.3 Rolling restart preserves correctness (ignored by default; run with RUNNER_TEST_NATS_URL=... cargo test -- --ignored)
    • Restart replicas during active processing
    • Verify no duplicate effects and no lost outbox items

Milestone 13: Observability + Safety Policies

Goal: Ensure production-grade logs, metrics, and policy controls consistent with Aggregate/Projection.

Dependencies

  • Milestone 10 (operational endpoints) and prior runtime milestones

Tasks

  • 13.1 Integrate edge-logger-client logging pipeline
    • Ensure tenant_id, trace_id, correlation_id are included in structured logs
  • 13.2 Implement core metrics
    • consumer lag, redeliveries, processing latency
    • outbox depth and dispatch latency
    • effect success/failure counts and duration histograms
    • schedule lag and scan durations
  • 13.3 Implement poison message quarantine records
    • Store minimal decoded context and error reason in KV under a deadletter namespace
  • 13.4 Standardize correlation and trace propagation to match Gateway/Control conventions
    • When submitting commands via the Gateway, include x-correlation-id and traceparent headers when available in workflow metadata
    • When publishing to JetStream, include x-correlation-id and traceparent message headers when available
    • Ensure correlation_id and trace_id appear in logs/spans for dispatch and effect execution paths

Tests

  • T13.1 Unit test: metrics exporter emits key metrics
  • T13.2 Unit test: quarantine record is written on poison handling path
  • T13.3 Integration test: Gateway-bound requests include x-correlation-id when workflow metadata supplies a correlation_id (ignored by default; requires NATS)

Milestone 14: Integration Hardening (Replay, Scale, Compatibility)

Goal: Validate end-to-end correctness across saga + outbox + effect execution, and implement operational workflows needed for safe scaling.

Dependencies

  • Milestone 613

Tasks

  • 14.1 Implement controlled replay for sagas
    • Reset checkpoint:{tenant_id}:{saga_name} with explicit operator intent
    • Safety checks to prevent accidental full replays
  • 14.2 Validate worker-pool semantics
    • deliver groups distribute work across replicas
    • per-key ordering enforcement option for workflows that need it
  • 14.3 Validate graceful drain behavior
    • no new work acquired
    • in-flight finishes
    • outbox relay flush completes
  • 14.4 End-to-end integration test suite
    • Aggregate event → saga transition → outbox command → effect command → effect result → saga completes

Tests

  • T14.1 Integration test: full happy-path workflow (ignored by default; requires NATS)
  • T14.2 Integration test: crash/restart across boundaries preserves atomicity (ignored by default; requires NATS)
    • restart after state commit but before dispatch
    • restart after dispatch but before outbox delete
    • restart during effect execution and redelivery