Files
cloudlysis/runner/prd.md
Vlad Durnea 1298d9a3df
Some checks failed
ci / rust (push) Failing after 2m34s
ci / ui (push) Failing after 30s
Monorepo consolidation: workspace, shared types, transport plans, docker/swam assets
2026-03-30 11:40:42 +03:00

13 KiB

🧱 Component: Runner (Sagas + Effect Workflows)

Definition:
The Runner is a standalone container responsible for executing workflow logic in the system: it runs deterministic Saga state machines and drives non-deterministic Effect Provider executions. It consumes and produces messages via NATS JetStream, persists workflow state and checkpoints in edge-storage KvStore, and uses the same “container + gateway + multi-tenant isolation + horizontal scale” model as Aggregate and Projection.

The Runner is designed to operate as a worker in a horizontally scalable worker pool: multiple Runner replicas can share the workload from JetStream while preserving ordering and idempotency guarantees per workflow key.

Multi-Tenancy:
Multi-tenancy is first-class via tenant_id. When enabled:

  • Routing: All inbound admin/query requests are tenant-scoped using the x-tenant-id header (same key as other components).
  • Stream Isolation: JetStream subjects and durable consumer names are tenant-aware (e.g., tenant.<tenant_id>.aggregate.*.*, tenant.<tenant_id>.workflow.*.*).
  • Storage Namespacing: Saga state, outbox items, checkpoints, and scheduling state are keyed with tenant_id prefixes to prevent cross-tenant reads/writes.
  • Worker Partitioning: Runner instances may be sharded by tenant_id (tenant placement) or may run multi-tenant with strict subject + key isolation.
  • Backward Compatibility: Deployments without multi-tenancy use a default/empty tenant_id and non-tenant-prefixed namespaces.

Dependencies:

  • Core crates pulled from the custom Cargo registry:

    [registries.madapes]
    index = "sparse+https://git.madapes.com/api/packages/madapes/cargo/"
    
    Crate Purpose
    edge-storage libmdbx-backed KvStore for saga state, checkpoints, outbox, and schedules
    runtime-function Deterministic DAG execution for Saga on_event / compensation programs
    edge-logger-client High-performance logging (UDS + Protobuf, Loki sink)
    query-engine Optional UQF queries over workflow state (admin/debug tooling)
    async-nats NATS JetStream client for consuming events and dispatching workflow messages
  • Source code available at ../../madapes/

  • Note: This is a standalone container, aligned with Aggregate/Projection operational constraints and patterns.

Observability:

  • Production stack: Grafana + Victoria Metrics + Loki
  • Logs via edge-logger-client with multi-tenant isolation and cardinality protection
  • Metrics exported in Prometheus format for Victoria Metrics scraping (worker lag, outbox depth, effect latency)
  • Trace correlation via propagated trace_id in message metadata and log fields

1. Core Responsibilities

  • Event Consumption (Triggering): Consumes Aggregate events (and optionally workflow events) from JetStream using durable consumers and subject filters.
  • Saga Execution: Runs deterministic Saga on_event programs (runtime-function DAG) to compute (new_state, outgoing_work[]).
  • Atomic Persistence (State + Outbox + Checkpoint): Commits saga state updates, newly produced work items, and processing checkpoints in a single KvStore transaction.
  • Outbox Relay: Reliably dispatches outgoing work to the appropriate destination (Gateway command submission, Effect Provider command stream) after it is durably recorded.
  • Effect Execution: Runs Effect Provider workflows that translate internal “effect commands” into external actions (HTTP/gRPC/SMTP/SQL) and publishes result events back into JetStream.
  • Scheduling (Durable Timeouts): Provides durable timers/reminders for sagas (e.g., “cancel order if not paid in 30 minutes”) without relying on in-memory timers as the source of truth.
  • Backpressure + Safety: Enforces max in-flight work, retry policies, and poison-message handling to keep the worker pool healthy.

2. Runner Operating Modes (Single Binary, Multiple Roles)

The Runner can be deployed in one of these modes:

  1. Saga Worker: Consumes trigger events and advances saga state machines; emits commands/effect commands via outbox.
  2. Effect Worker: Consumes effect commands, executes real-world side effects, publishes result events.
  3. Combined Worker: Runs both roles in the same container for small deployments (still with strict separation between deterministic saga runtime and non-deterministic effect execution).

Each mode uses the same multitenant model and can scale horizontally by increasing replica count.


3. Data Model (Keys, Envelopes, Namespaces)

The Runner relies on a small set of durable keyspaces in edge-storage KvStore. All keys are tenant-prefixed when multi-tenancy is enabled.

Saga State:

  • saga:{tenant_id}:{saga_name}:{correlation_id} → JSON state payload

Saga Checkpoints (JetStream stream sequence or consumer sequence):

  • checkpoint:{tenant_id}:{saga_name} → last processed sequence (monotonic)

Outbox (Reliable Dispatch):

  • outbox:{tenant_id}:{work_kind}:{work_id} → serialized work item (command or effect command)
  • outbox_index:{tenant_id} → optional index cursor / priority ordering metadata

Schedules (Durable Timeouts/Reminders):

  • schedule:{tenant_id}:{saga_name}:{correlation_id}:{due_at} → reminder payload

Idempotency / Dedupe (optional but recommended):

  • dedupe:{tenant_id}:{saga_name}:{event_id} → marker that an event transition was applied
  • dedupe:{tenant_id}:effect:{command_id} → marker that an effect was executed

The tenant_id type follows the same semantics as other components (TenantId string wrapper; default/empty allowed for single-tenant deployments).

Work Item Envelopes:

  • Aggregate Command (Gateway SubmitCommandRequest shape):
    • tenant_id
    • command_id (UUID v7)
    • aggregate_id
    • aggregate_type
    • payload_json
    • metadata (must include correlation_id and trace_id when available)
  • Effect Command:
    • tenant_id
    • command_id (UUID v7, used as the idempotency key)
    • effect_name
    • payload (JSON)
    • metadata (correlation_id, trace_id, retry policy hints)
  • Effect Result Event:
    • tenant_id
    • command_id
    • effect_name
    • result_type (e.g., Succeeded, Failed, TimedOut)
    • payload (JSON)
    • timestamp

4. JetStream Integration (Subjects, Streams, Consumers)

The Runner consumes and produces messages using tenant-namespaced subject conventions consistent with other components.

Aggregate Event Stream (existing):

  • Stream: AGGREGATE_EVENTS
  • Subjects: tenant.*.aggregate.*.*
  • Example: tenant.acme-corp.aggregate.Account.018f...

Workflow Command Stream (Runner-produced work):

  • Stream: WORKFLOW_COMMANDS
  • Subjects:
    • Effect commands: tenant.*.effect.<effect_name>.<command_id>
    • Optional internal commands: tenant.*.workflow.<saga_name>.<correlation_id>

Workflow Event Stream (Effect results + workflow facts):

  • Stream: WORKFLOW_EVENTS
  • Subjects:
    • Effect results: tenant.*.effect_result.<effect_name>.<command_id>
    • Optional saga facts: tenant.*.workflow_event.<saga_name>.<correlation_id>

Consumer Model (Worker Pool):

  • Saga workers use durable consumers filtered to relevant subjects (typically tenant.<tenant_id>.aggregate.> or a narrower wildcard set). Replicas share a deliver group so each message is processed by a single worker in the pool.
  • Effect workers use durable consumers per effect_name (or per effect category), again with a deliver group for horizontal scale.
  • Ack Discipline: Messages are acked only after the corresponding state/outbox/checkpoint transaction commits.
  • Ordering: Ordering is guaranteed only within a chosen serialization key (usually (tenant_id, correlation_id)); concurrency across keys is allowed.

5. Saga Lifecycle (Deterministic Workflow)

  1. Trigger: The Runner receives an Aggregate event from JetStream (AGGREGATE_EVENTS) and extracts tenant_id, event_id, and correlation metadata.
  2. Load: It loads existing saga state from KvStore using saga:{tenant_id}:{saga_name}:{correlation_id} (or creates an initial state on first trigger).
  3. Execute (runtime-function):
    • Runs the deterministic Saga program: (saga_state, incoming_event) → (new_saga_state, work_items[]).
    • The program is sandboxed and must not perform I/O or rely on non-deterministic inputs.
  4. Commit (Atomic Outbox):
    • In a single MDBX transaction:
      • Persist new_saga_state
      • Persist each outgoing work item into outbox:*
      • Advance checkpoint:{tenant_id}:{saga_name} to the processed sequence
      • Optionally record dedupe markers for event_id
  5. Dispatch: The outbox relay publishes the work items to their destinations:
    • Aggregate commands are submitted through the Gateway (same routing and tenant enforcement model as user commands).
    • Effect commands are published to WORKFLOW_COMMANDS for effect workers to execute.
  6. Wait: The saga instance remains persisted and will react to the next correlated event or reminder.

6. Effect Provider Lifecycle (Non-Deterministic Execution)

  1. Receive Command: The effect worker consumes an effect command message from WORKFLOW_COMMANDS (tenant.<tenant_id>.effect.<effect_name>.<command_id>).
  2. Idempotency Gate: The worker checks dedupe:{tenant_id}:effect:{command_id} (or an equivalent idempotency key) to avoid duplicate external calls.
  3. Execute: The provider performs the real-world action (HTTP/gRPC/SMTP/SQL) with:
    • retries + exponential backoff
    • timeouts
    • circuit breakers per upstream dependency
  4. Publish Result: The worker publishes an effect result event to WORKFLOW_EVENTS (tenant.<tenant_id>.effect_result.<effect_name>.<command_id>).
  5. Finalize: After result publish is acknowledged, it records completion in KvStore (dedupe marker, optional audit trail) and acks the command message.

Effect results are consumed by saga workers (and optionally projections) as first-class events.


7. Technical Constraints & Guarantees

  • Determinism Boundary: Saga programs are deterministic and side-effect free. All I/O happens only in effect workers.
  • At-Least-Once Processing: JetStream delivery is at-least-once; correctness relies on idempotency via checkpoints and dedupe markers.
  • Atomicity: “Saga state + outbox + checkpoint” is committed as one durable transaction to avoid dual-write gaps.
  • Tenant Isolation: Every read/write is tenant-scoped; cross-tenant access is blocked at both subject filters and storage keyspace.
  • No Direct Cross-Aggregate Reads: Sagas coordinate via events and commands; they do not read Aggregate state directly (unless explicitly provided as event payload or via a dedicated query/projection API).

8. Horizontal Scaling Strategy (Worker Pool)

The Runner scales horizontally by adding replicas.

  • Work Distribution: JetStream deliver groups ensure each message is processed by a single worker in a replica set.
  • Sharding Options:
    • Tenant Sharding: Place runner replicas on nodes responsible for certain tenant ranges, aligning with gateway routing and operational locality.
    • Key Sharding: For large tenants, shard by (tenant_id, saga_name, correlation_id) across multiple worker groups.
  • Draining: Instances support graceful drain: stop acquiring new work, finish in-flight items, flush outbox relay, then exit.
  • Replay: Rebuild sagas by resetting checkpoint:{tenant_id}:{saga_name} and re-consuming from JetStream (with strong caution and explicit operator intent).

9. Error Handling & Operational Policies

  • Poison Messages: If a message repeatedly fails deterministic execution (schema mismatch, runtime-function error), quarantine it:
    • write a record to deadletter:{tenant_id}:...
    • emit an alert/metric
    • continue processing other keys
  • Retry Discipline: Retries for deterministic saga transitions should be bounded and should not busy-loop; external effect retries are handled in effect workers with backoff.
  • Backpressure: Configure max in-flight per worker and per key; expose lag and outbox depth metrics.
  • Schema Evolution: Treat event payloads as versioned; sagas and effects must accept older versions or explicitly gate by version and route to compensating paths.

10. Admin + Health Endpoints (Under Gateway)

The Runner exposes endpoints consistent with other components:

  • /health - storage + JetStream connectivity checks
  • /ready - readiness (not draining, can acquire work)
  • /metrics - Prometheus metrics
  • /info - build info, role mode(s), configured saga/effect sets, stream/consumer names
  • /admin/drain - begin graceful drain
  • /admin/reload - hot-reload manifest/placement config (where supported)

All admin endpoints are expected to run behind the Gateway and enforce tenant-scoped access where applicable.