13 KiB
🧱 Component: Runner (Sagas + Effect Workflows)
Definition:
The Runner is a standalone container responsible for executing workflow logic in the system: it runs deterministic Saga state machines and drives non-deterministic Effect Provider executions. It consumes and produces messages via NATS JetStream, persists workflow state and checkpoints in edge-storage KvStore, and uses the same “container + gateway + multi-tenant isolation + horizontal scale” model as Aggregate and Projection.
The Runner is designed to operate as a worker in a horizontally scalable worker pool: multiple Runner replicas can share the workload from JetStream while preserving ordering and idempotency guarantees per workflow key.
Multi-Tenancy:
Multi-tenancy is first-class via tenant_id. When enabled:
- Routing: All inbound admin/query requests are tenant-scoped using the
x-tenant-idheader (same key as other components). - Stream Isolation: JetStream subjects and durable consumer names are tenant-aware (e.g.,
tenant.<tenant_id>.aggregate.*.*,tenant.<tenant_id>.workflow.*.*). - Storage Namespacing: Saga state, outbox items, checkpoints, and scheduling state are keyed with
tenant_idprefixes to prevent cross-tenant reads/writes. - Worker Partitioning: Runner instances may be sharded by
tenant_id(tenant placement) or may run multi-tenant with strict subject + key isolation. - Backward Compatibility: Deployments without multi-tenancy use a default/empty
tenant_idand non-tenant-prefixed namespaces.
Dependencies:
-
Core crates pulled from the custom Cargo registry:
[registries.madapes] index = "sparse+https://git.madapes.com/api/packages/madapes/cargo/"Crate Purpose edge-storagelibmdbx-backed KvStorefor saga state, checkpoints, outbox, and schedulesruntime-functionDeterministic DAG execution for Saga on_event/compensationprogramsedge-logger-clientHigh-performance logging (UDS + Protobuf, Loki sink) query-engineOptional UQF queries over workflow state (admin/debug tooling) async-natsNATS JetStream client for consuming events and dispatching workflow messages -
Source code available at
../../madapes/ -
Note: This is a standalone container, aligned with Aggregate/Projection operational constraints and patterns.
Observability:
- Production stack: Grafana + Victoria Metrics + Loki
- Logs via
edge-logger-clientwith multi-tenant isolation and cardinality protection - Metrics exported in Prometheus format for Victoria Metrics scraping (worker lag, outbox depth, effect latency)
- Trace correlation via propagated
trace_idin message metadata and log fields
1. Core Responsibilities
- Event Consumption (Triggering): Consumes Aggregate events (and optionally workflow events) from JetStream using durable consumers and subject filters.
- Saga Execution: Runs deterministic Saga
on_eventprograms (runtime-functionDAG) to compute(new_state, outgoing_work[]). - Atomic Persistence (State + Outbox + Checkpoint): Commits saga state updates, newly produced work items, and processing checkpoints in a single
KvStoretransaction. - Outbox Relay: Reliably dispatches outgoing work to the appropriate destination (Gateway command submission, Effect Provider command stream) after it is durably recorded.
- Effect Execution: Runs Effect Provider workflows that translate internal “effect commands” into external actions (HTTP/gRPC/SMTP/SQL) and publishes result events back into JetStream.
- Scheduling (Durable Timeouts): Provides durable timers/reminders for sagas (e.g., “cancel order if not paid in 30 minutes”) without relying on in-memory timers as the source of truth.
- Backpressure + Safety: Enforces max in-flight work, retry policies, and poison-message handling to keep the worker pool healthy.
2. Runner Operating Modes (Single Binary, Multiple Roles)
The Runner can be deployed in one of these modes:
- Saga Worker: Consumes trigger events and advances saga state machines; emits commands/effect commands via outbox.
- Effect Worker: Consumes effect commands, executes real-world side effects, publishes result events.
- Combined Worker: Runs both roles in the same container for small deployments (still with strict separation between deterministic saga runtime and non-deterministic effect execution).
Each mode uses the same multitenant model and can scale horizontally by increasing replica count.
3. Data Model (Keys, Envelopes, Namespaces)
The Runner relies on a small set of durable keyspaces in edge-storage KvStore. All keys are tenant-prefixed when multi-tenancy is enabled.
Saga State:
saga:{tenant_id}:{saga_name}:{correlation_id}→ JSON state payload
Saga Checkpoints (JetStream stream sequence or consumer sequence):
checkpoint:{tenant_id}:{saga_name}→ last processed sequence (monotonic)
Outbox (Reliable Dispatch):
outbox:{tenant_id}:{work_kind}:{work_id}→ serialized work item (command or effect command)outbox_index:{tenant_id}→ optional index cursor / priority ordering metadata
Schedules (Durable Timeouts/Reminders):
schedule:{tenant_id}:{saga_name}:{correlation_id}:{due_at}→ reminder payload
Idempotency / Dedupe (optional but recommended):
dedupe:{tenant_id}:{saga_name}:{event_id}→ marker that an event transition was applieddedupe:{tenant_id}:effect:{command_id}→ marker that an effect was executed
The tenant_id type follows the same semantics as other components (TenantId string wrapper; default/empty allowed for single-tenant deployments).
Work Item Envelopes:
- Aggregate Command (Gateway SubmitCommandRequest shape):
tenant_idcommand_id(UUID v7)aggregate_idaggregate_typepayload_jsonmetadata(must includecorrelation_idandtrace_idwhen available)
- Effect Command:
tenant_idcommand_id(UUID v7, used as the idempotency key)effect_namepayload(JSON)metadata(correlation_id,trace_id, retry policy hints)
- Effect Result Event:
tenant_idcommand_ideffect_nameresult_type(e.g.,Succeeded,Failed,TimedOut)payload(JSON)timestamp
4. JetStream Integration (Subjects, Streams, Consumers)
The Runner consumes and produces messages using tenant-namespaced subject conventions consistent with other components.
Aggregate Event Stream (existing):
- Stream:
AGGREGATE_EVENTS - Subjects:
tenant.*.aggregate.*.* - Example:
tenant.acme-corp.aggregate.Account.018f...
Workflow Command Stream (Runner-produced work):
- Stream:
WORKFLOW_COMMANDS - Subjects:
- Effect commands:
tenant.*.effect.<effect_name>.<command_id> - Optional internal commands:
tenant.*.workflow.<saga_name>.<correlation_id>
- Effect commands:
Workflow Event Stream (Effect results + workflow facts):
- Stream:
WORKFLOW_EVENTS - Subjects:
- Effect results:
tenant.*.effect_result.<effect_name>.<command_id> - Optional saga facts:
tenant.*.workflow_event.<saga_name>.<correlation_id>
- Effect results:
Consumer Model (Worker Pool):
- Saga workers use durable consumers filtered to relevant subjects (typically
tenant.<tenant_id>.aggregate.>or a narrower wildcard set). Replicas share a deliver group so each message is processed by a single worker in the pool. - Effect workers use durable consumers per
effect_name(or per effect category), again with a deliver group for horizontal scale. - Ack Discipline: Messages are acked only after the corresponding state/outbox/checkpoint transaction commits.
- Ordering: Ordering is guaranteed only within a chosen serialization key (usually
(tenant_id, correlation_id)); concurrency across keys is allowed.
5. Saga Lifecycle (Deterministic Workflow)
- Trigger: The Runner receives an Aggregate event from JetStream (
AGGREGATE_EVENTS) and extractstenant_id,event_id, and correlation metadata. - Load: It loads existing saga state from
KvStoreusingsaga:{tenant_id}:{saga_name}:{correlation_id}(or creates an initial state on first trigger). - Execute (
runtime-function):- Runs the deterministic Saga program:
(saga_state, incoming_event) → (new_saga_state, work_items[]). - The program is sandboxed and must not perform I/O or rely on non-deterministic inputs.
- Runs the deterministic Saga program:
- Commit (Atomic Outbox):
- In a single MDBX transaction:
- Persist
new_saga_state - Persist each outgoing work item into
outbox:* - Advance
checkpoint:{tenant_id}:{saga_name}to the processed sequence - Optionally record dedupe markers for
event_id
- Persist
- In a single MDBX transaction:
- Dispatch: The outbox relay publishes the work items to their destinations:
- Aggregate commands are submitted through the Gateway (same routing and tenant enforcement model as user commands).
- Effect commands are published to
WORKFLOW_COMMANDSfor effect workers to execute.
- Wait: The saga instance remains persisted and will react to the next correlated event or reminder.
6. Effect Provider Lifecycle (Non-Deterministic Execution)
- Receive Command: The effect worker consumes an effect command message from
WORKFLOW_COMMANDS(tenant.<tenant_id>.effect.<effect_name>.<command_id>). - Idempotency Gate: The worker checks
dedupe:{tenant_id}:effect:{command_id}(or an equivalent idempotency key) to avoid duplicate external calls. - Execute: The provider performs the real-world action (HTTP/gRPC/SMTP/SQL) with:
- retries + exponential backoff
- timeouts
- circuit breakers per upstream dependency
- Publish Result: The worker publishes an effect result event to
WORKFLOW_EVENTS(tenant.<tenant_id>.effect_result.<effect_name>.<command_id>). - Finalize: After result publish is acknowledged, it records completion in
KvStore(dedupe marker, optional audit trail) and acks the command message.
Effect results are consumed by saga workers (and optionally projections) as first-class events.
7. Technical Constraints & Guarantees
- Determinism Boundary: Saga programs are deterministic and side-effect free. All I/O happens only in effect workers.
- At-Least-Once Processing: JetStream delivery is at-least-once; correctness relies on idempotency via checkpoints and dedupe markers.
- Atomicity: “Saga state + outbox + checkpoint” is committed as one durable transaction to avoid dual-write gaps.
- Tenant Isolation: Every read/write is tenant-scoped; cross-tenant access is blocked at both subject filters and storage keyspace.
- No Direct Cross-Aggregate Reads: Sagas coordinate via events and commands; they do not read Aggregate state directly (unless explicitly provided as event payload or via a dedicated query/projection API).
8. Horizontal Scaling Strategy (Worker Pool)
The Runner scales horizontally by adding replicas.
- Work Distribution: JetStream deliver groups ensure each message is processed by a single worker in a replica set.
- Sharding Options:
- Tenant Sharding: Place runner replicas on nodes responsible for certain tenant ranges, aligning with gateway routing and operational locality.
- Key Sharding: For large tenants, shard by
(tenant_id, saga_name, correlation_id)across multiple worker groups.
- Draining: Instances support graceful drain: stop acquiring new work, finish in-flight items, flush outbox relay, then exit.
- Replay: Rebuild sagas by resetting
checkpoint:{tenant_id}:{saga_name}and re-consuming from JetStream (with strong caution and explicit operator intent).
9. Error Handling & Operational Policies
- Poison Messages: If a message repeatedly fails deterministic execution (schema mismatch, runtime-function error), quarantine it:
- write a record to
deadletter:{tenant_id}:... - emit an alert/metric
- continue processing other keys
- write a record to
- Retry Discipline: Retries for deterministic saga transitions should be bounded and should not busy-loop; external effect retries are handled in effect workers with backoff.
- Backpressure: Configure max in-flight per worker and per key; expose lag and outbox depth metrics.
- Schema Evolution: Treat event payloads as versioned; sagas and effects must accept older versions or explicitly gate by version and route to compensating paths.
10. Admin + Health Endpoints (Under Gateway)
The Runner exposes endpoints consistent with other components:
/health- storage + JetStream connectivity checks/ready- readiness (not draining, can acquire work)/metrics- Prometheus metrics/info- build info, role mode(s), configured saga/effect sets, stream/consumer names/admin/drain- begin graceful drain/admin/reload- hot-reload manifest/placement config (where supported)
All admin endpoints are expected to run behind the Gateway and enforce tenant-scoped access where applicable.