### 🧱 Component: Runner (Sagas + Effect Workflows) **Definition:** The Runner is a standalone container responsible for executing **workflow logic** in the system: it runs deterministic **Saga** state machines and drives non-deterministic **Effect Provider** executions. It consumes and produces messages via **NATS JetStream**, persists workflow state and checkpoints in `edge-storage` `KvStore`, and uses the same “container + gateway + multi-tenant isolation + horizontal scale” model as Aggregate and Projection. The Runner is designed to operate as a **worker** in a horizontally scalable worker pool: multiple Runner replicas can share the workload from JetStream while preserving ordering and idempotency guarantees per workflow key. **Multi-Tenancy:** Multi-tenancy is first-class via `tenant_id`. When enabled: - **Routing:** All inbound admin/query requests are tenant-scoped using the `x-tenant-id` header (same key as other components). - **Stream Isolation:** JetStream subjects and durable consumer names are tenant-aware (e.g., `tenant..aggregate.*.*`, `tenant..workflow.*.*`). - **Storage Namespacing:** Saga state, outbox items, checkpoints, and scheduling state are keyed with `tenant_id` prefixes to prevent cross-tenant reads/writes. - **Worker Partitioning:** Runner instances may be sharded by `tenant_id` (tenant placement) or may run multi-tenant with strict subject + key isolation. - **Backward Compatibility:** Deployments without multi-tenancy use a default/empty `tenant_id` and non-tenant-prefixed namespaces. **Dependencies:** * Core crates pulled from the custom Cargo registry: ```toml [registries.madapes] index = "sparse+https://git.madapes.com/api/packages/madapes/cargo/" ``` | Crate | Purpose | |-------|---------| | `edge-storage` | libmdbx-backed `KvStore` for saga state, checkpoints, outbox, and schedules | | `runtime-function` | Deterministic DAG execution for Saga `on_event` / `compensation` programs | | `edge-logger-client` | High-performance logging (UDS + Protobuf, Loki sink) | | `query-engine` | Optional UQF queries over workflow state (admin/debug tooling) | | `async-nats` | NATS JetStream client for consuming events and dispatching workflow messages | * Source code available at `../../madapes/` * **Note:** This is a standalone container, aligned with Aggregate/Projection operational constraints and patterns. **Observability:** * Production stack: **Grafana** + **Victoria Metrics** + **Loki** * Logs via `edge-logger-client` with multi-tenant isolation and cardinality protection * Metrics exported in Prometheus format for Victoria Metrics scraping (worker lag, outbox depth, effect latency) * Trace correlation via propagated `trace_id` in message metadata and log fields --- #### 1. Core Responsibilities * **Event Consumption (Triggering):** Consumes Aggregate events (and optionally workflow events) from JetStream using durable consumers and subject filters. * **Saga Execution:** Runs deterministic Saga `on_event` programs (`runtime-function` DAG) to compute `(new_state, outgoing_work[])`. * **Atomic Persistence (State + Outbox + Checkpoint):** Commits saga state updates, newly produced work items, and processing checkpoints in a single `KvStore` transaction. * **Outbox Relay:** Reliably dispatches outgoing work to the appropriate destination (Gateway command submission, Effect Provider command stream) after it is durably recorded. * **Effect Execution:** Runs Effect Provider workflows that translate internal “effect commands” into external actions (HTTP/gRPC/SMTP/SQL) and publishes result events back into JetStream. * **Scheduling (Durable Timeouts):** Provides durable timers/reminders for sagas (e.g., “cancel order if not paid in 30 minutes”) without relying on in-memory timers as the source of truth. * **Backpressure + Safety:** Enforces max in-flight work, retry policies, and poison-message handling to keep the worker pool healthy. --- #### 2. Runner Operating Modes (Single Binary, Multiple Roles) The Runner can be deployed in one of these modes: 1. **Saga Worker:** Consumes trigger events and advances saga state machines; emits commands/effect commands via outbox. 2. **Effect Worker:** Consumes effect commands, executes real-world side effects, publishes result events. 3. **Combined Worker:** Runs both roles in the same container for small deployments (still with strict separation between deterministic saga runtime and non-deterministic effect execution). Each mode uses the same multitenant model and can scale horizontally by increasing replica count. --- #### 3. Data Model (Keys, Envelopes, Namespaces) The Runner relies on a small set of durable keyspaces in `edge-storage` `KvStore`. All keys are tenant-prefixed when multi-tenancy is enabled. **Saga State:** - `saga:{tenant_id}:{saga_name}:{correlation_id}` → JSON state payload **Saga Checkpoints (JetStream stream sequence or consumer sequence):** - `checkpoint:{tenant_id}:{saga_name}` → last processed sequence (monotonic) **Outbox (Reliable Dispatch):** - `outbox:{tenant_id}:{work_kind}:{work_id}` → serialized work item (command or effect command) - `outbox_index:{tenant_id}` → optional index cursor / priority ordering metadata **Schedules (Durable Timeouts/Reminders):** - `schedule:{tenant_id}:{saga_name}:{correlation_id}:{due_at}` → reminder payload **Idempotency / Dedupe (optional but recommended):** - `dedupe:{tenant_id}:{saga_name}:{event_id}` → marker that an event transition was applied - `dedupe:{tenant_id}:effect:{command_id}` → marker that an effect was executed The `tenant_id` type follows the same semantics as other components (`TenantId` string wrapper; default/empty allowed for single-tenant deployments). **Work Item Envelopes:** * **Aggregate Command (Gateway SubmitCommandRequest shape):** - `tenant_id` - `command_id` (UUID v7) - `aggregate_id` - `aggregate_type` - `payload_json` - `metadata` (must include `correlation_id` and `trace_id` when available) * **Effect Command:** - `tenant_id` - `command_id` (UUID v7, used as the idempotency key) - `effect_name` - `payload` (JSON) - `metadata` (`correlation_id`, `trace_id`, retry policy hints) * **Effect Result Event:** - `tenant_id` - `command_id` - `effect_name` - `result_type` (e.g., `Succeeded`, `Failed`, `TimedOut`) - `payload` (JSON) - `timestamp` --- #### 4. JetStream Integration (Subjects, Streams, Consumers) The Runner consumes and produces messages using tenant-namespaced subject conventions consistent with other components. **Aggregate Event Stream (existing):** - Stream: `AGGREGATE_EVENTS` - Subjects: `tenant.*.aggregate.*.*` - Example: `tenant.acme-corp.aggregate.Account.018f...` **Workflow Command Stream (Runner-produced work):** - Stream: `WORKFLOW_COMMANDS` - Subjects: - Effect commands: `tenant.*.effect..` - Optional internal commands: `tenant.*.workflow..` **Workflow Event Stream (Effect results + workflow facts):** - Stream: `WORKFLOW_EVENTS` - Subjects: - Effect results: `tenant.*.effect_result..` - Optional saga facts: `tenant.*.workflow_event..` **Consumer Model (Worker Pool):** * **Saga workers** use durable consumers filtered to relevant subjects (typically `tenant..aggregate.>` or a narrower wildcard set). Replicas share a deliver group so each message is processed by a single worker in the pool. * **Effect workers** use durable consumers per `effect_name` (or per effect category), again with a deliver group for horizontal scale. * **Ack Discipline:** Messages are acked only after the corresponding state/outbox/checkpoint transaction commits. * **Ordering:** Ordering is guaranteed only within a chosen serialization key (usually `(tenant_id, correlation_id)`); concurrency across keys is allowed. --- #### 5. Saga Lifecycle (Deterministic Workflow) 1. **Trigger:** The Runner receives an Aggregate event from JetStream (`AGGREGATE_EVENTS`) and extracts `tenant_id`, `event_id`, and correlation metadata. 2. **Load:** It loads existing saga state from `KvStore` using `saga:{tenant_id}:{saga_name}:{correlation_id}` (or creates an initial state on first trigger). 3. **Execute (`runtime-function`):** * Runs the deterministic Saga program: `(saga_state, incoming_event) → (new_saga_state, work_items[])`. * The program is sandboxed and must not perform I/O or rely on non-deterministic inputs. 4. **Commit (Atomic Outbox):** * In a single MDBX transaction: * Persist `new_saga_state` * Persist each outgoing work item into `outbox:*` * Advance `checkpoint:{tenant_id}:{saga_name}` to the processed sequence * Optionally record dedupe markers for `event_id` 5. **Dispatch:** The outbox relay publishes the work items to their destinations: * Aggregate commands are submitted through the Gateway (same routing and tenant enforcement model as user commands). * Effect commands are published to `WORKFLOW_COMMANDS` for effect workers to execute. 6. **Wait:** The saga instance remains persisted and will react to the next correlated event or reminder. --- #### 6. Effect Provider Lifecycle (Non-Deterministic Execution) 1. **Receive Command:** The effect worker consumes an effect command message from `WORKFLOW_COMMANDS` (`tenant..effect..`). 2. **Idempotency Gate:** The worker checks `dedupe:{tenant_id}:effect:{command_id}` (or an equivalent idempotency key) to avoid duplicate external calls. 3. **Execute:** The provider performs the real-world action (HTTP/gRPC/SMTP/SQL) with: * retries + exponential backoff * timeouts * circuit breakers per upstream dependency 4. **Publish Result:** The worker publishes an effect result event to `WORKFLOW_EVENTS` (`tenant..effect_result..`). 5. **Finalize:** After result publish is acknowledged, it records completion in `KvStore` (dedupe marker, optional audit trail) and acks the command message. Effect results are consumed by saga workers (and optionally projections) as first-class events. --- #### 7. Technical Constraints & Guarantees * **Determinism Boundary:** Saga programs are deterministic and side-effect free. All I/O happens only in effect workers. * **At-Least-Once Processing:** JetStream delivery is at-least-once; correctness relies on idempotency via checkpoints and dedupe markers. * **Atomicity:** “Saga state + outbox + checkpoint” is committed as one durable transaction to avoid dual-write gaps. * **Tenant Isolation:** Every read/write is tenant-scoped; cross-tenant access is blocked at both subject filters and storage keyspace. * **No Direct Cross-Aggregate Reads:** Sagas coordinate via events and commands; they do not read Aggregate state directly (unless explicitly provided as event payload or via a dedicated query/projection API). --- #### 8. Horizontal Scaling Strategy (Worker Pool) The Runner scales horizontally by adding replicas. * **Work Distribution:** JetStream deliver groups ensure each message is processed by a single worker in a replica set. * **Sharding Options:** - **Tenant Sharding:** Place runner replicas on nodes responsible for certain tenant ranges, aligning with gateway routing and operational locality. - **Key Sharding:** For large tenants, shard by `(tenant_id, saga_name, correlation_id)` across multiple worker groups. * **Draining:** Instances support graceful drain: stop acquiring new work, finish in-flight items, flush outbox relay, then exit. * **Replay:** Rebuild sagas by resetting `checkpoint:{tenant_id}:{saga_name}` and re-consuming from JetStream (with strong caution and explicit operator intent). --- #### 9. Error Handling & Operational Policies * **Poison Messages:** If a message repeatedly fails deterministic execution (schema mismatch, runtime-function error), quarantine it: - write a record to `deadletter:{tenant_id}:...` - emit an alert/metric - continue processing other keys * **Retry Discipline:** Retries for deterministic saga transitions should be bounded and should not busy-loop; external effect retries are handled in effect workers with backoff. * **Backpressure:** Configure max in-flight per worker and per key; expose lag and outbox depth metrics. * **Schema Evolution:** Treat event payloads as versioned; sagas and effects must accept older versions or explicitly gate by version and route to compensating paths. --- #### 10. Admin + Health Endpoints (Under Gateway) The Runner exposes endpoints consistent with other components: - `/health` - storage + JetStream connectivity checks - `/ready` - readiness (not draining, can acquire work) - `/metrics` - Prometheus metrics - `/info` - build info, role mode(s), configured saga/effect sets, stream/consumer names - `/admin/drain` - begin graceful drain - `/admin/reload` - hot-reload manifest/placement config (where supported) All admin endpoints are expected to run behind the Gateway and enforce tenant-scoped access where applicable.