211 lines
13 KiB
Markdown
211 lines
13 KiB
Markdown
### 🧱 Component: Runner (Sagas + Effect Workflows)
|
|
**Definition:**
|
|
The Runner is a standalone container responsible for executing **workflow logic** in the system: it runs deterministic **Saga** state machines and drives non-deterministic **Effect Provider** executions. It consumes and produces messages via **NATS JetStream**, persists workflow state and checkpoints in `edge-storage` `KvStore`, and uses the same “container + gateway + multi-tenant isolation + horizontal scale” model as Aggregate and Projection.
|
|
|
|
The Runner is designed to operate as a **worker** in a horizontally scalable worker pool: multiple Runner replicas can share the workload from JetStream while preserving ordering and idempotency guarantees per workflow key.
|
|
|
|
**Multi-Tenancy:**
|
|
Multi-tenancy is first-class via `tenant_id`. When enabled:
|
|
- **Routing:** All inbound admin/query requests are tenant-scoped using the `x-tenant-id` header (same key as other components).
|
|
- **Stream Isolation:** JetStream subjects and durable consumer names are tenant-aware (e.g., `tenant.<tenant_id>.aggregate.*.*`, `tenant.<tenant_id>.workflow.*.*`).
|
|
- **Storage Namespacing:** Saga state, outbox items, checkpoints, and scheduling state are keyed with `tenant_id` prefixes to prevent cross-tenant reads/writes.
|
|
- **Worker Partitioning:** Runner instances may be sharded by `tenant_id` (tenant placement) or may run multi-tenant with strict subject + key isolation.
|
|
- **Backward Compatibility:** Deployments without multi-tenancy use a default/empty `tenant_id` and non-tenant-prefixed namespaces.
|
|
|
|
**Dependencies:**
|
|
* Core crates pulled from the custom Cargo registry:
|
|
```toml
|
|
[registries.madapes]
|
|
index = "sparse+https://git.madapes.com/api/packages/madapes/cargo/"
|
|
```
|
|
|
|
| Crate | Purpose |
|
|
|-------|---------|
|
|
| `edge-storage` | libmdbx-backed `KvStore` for saga state, checkpoints, outbox, and schedules |
|
|
| `runtime-function` | Deterministic DAG execution for Saga `on_event` / `compensation` programs |
|
|
| `edge-logger-client` | High-performance logging (UDS + Protobuf, Loki sink) |
|
|
| `query-engine` | Optional UQF queries over workflow state (admin/debug tooling) |
|
|
| `async-nats` | NATS JetStream client for consuming events and dispatching workflow messages |
|
|
|
|
* Source code available at `../../madapes/`
|
|
* **Note:** This is a standalone container, aligned with Aggregate/Projection operational constraints and patterns.
|
|
|
|
**Observability:**
|
|
* Production stack: **Grafana** + **Victoria Metrics** + **Loki**
|
|
* Logs via `edge-logger-client` with multi-tenant isolation and cardinality protection
|
|
* Metrics exported in Prometheus format for Victoria Metrics scraping (worker lag, outbox depth, effect latency)
|
|
* Trace correlation via propagated `trace_id` in message metadata and log fields
|
|
|
|
---
|
|
|
|
#### 1. Core Responsibilities
|
|
* **Event Consumption (Triggering):** Consumes Aggregate events (and optionally workflow events) from JetStream using durable consumers and subject filters.
|
|
* **Saga Execution:** Runs deterministic Saga `on_event` programs (`runtime-function` DAG) to compute `(new_state, outgoing_work[])`.
|
|
* **Atomic Persistence (State + Outbox + Checkpoint):** Commits saga state updates, newly produced work items, and processing checkpoints in a single `KvStore` transaction.
|
|
* **Outbox Relay:** Reliably dispatches outgoing work to the appropriate destination (Gateway command submission, Effect Provider command stream) after it is durably recorded.
|
|
* **Effect Execution:** Runs Effect Provider workflows that translate internal “effect commands” into external actions (HTTP/gRPC/SMTP/SQL) and publishes result events back into JetStream.
|
|
* **Scheduling (Durable Timeouts):** Provides durable timers/reminders for sagas (e.g., “cancel order if not paid in 30 minutes”) without relying on in-memory timers as the source of truth.
|
|
* **Backpressure + Safety:** Enforces max in-flight work, retry policies, and poison-message handling to keep the worker pool healthy.
|
|
|
|
---
|
|
|
|
#### 2. Runner Operating Modes (Single Binary, Multiple Roles)
|
|
The Runner can be deployed in one of these modes:
|
|
1. **Saga Worker:** Consumes trigger events and advances saga state machines; emits commands/effect commands via outbox.
|
|
2. **Effect Worker:** Consumes effect commands, executes real-world side effects, publishes result events.
|
|
3. **Combined Worker:** Runs both roles in the same container for small deployments (still with strict separation between deterministic saga runtime and non-deterministic effect execution).
|
|
|
|
Each mode uses the same multitenant model and can scale horizontally by increasing replica count.
|
|
|
|
---
|
|
|
|
#### 3. Data Model (Keys, Envelopes, Namespaces)
|
|
The Runner relies on a small set of durable keyspaces in `edge-storage` `KvStore`. All keys are tenant-prefixed when multi-tenancy is enabled.
|
|
|
|
**Saga State:**
|
|
- `saga:{tenant_id}:{saga_name}:{correlation_id}` → JSON state payload
|
|
|
|
**Saga Checkpoints (JetStream stream sequence or consumer sequence):**
|
|
- `checkpoint:{tenant_id}:{saga_name}` → last processed sequence (monotonic)
|
|
|
|
**Outbox (Reliable Dispatch):**
|
|
- `outbox:{tenant_id}:{work_kind}:{work_id}` → serialized work item (command or effect command)
|
|
- `outbox_index:{tenant_id}` → optional index cursor / priority ordering metadata
|
|
|
|
**Schedules (Durable Timeouts/Reminders):**
|
|
- `schedule:{tenant_id}:{saga_name}:{correlation_id}:{due_at}` → reminder payload
|
|
|
|
**Idempotency / Dedupe (optional but recommended):**
|
|
- `dedupe:{tenant_id}:{saga_name}:{event_id}` → marker that an event transition was applied
|
|
- `dedupe:{tenant_id}:effect:{command_id}` → marker that an effect was executed
|
|
|
|
The `tenant_id` type follows the same semantics as other components (`TenantId` string wrapper; default/empty allowed for single-tenant deployments).
|
|
|
|
**Work Item Envelopes:**
|
|
* **Aggregate Command (Gateway SubmitCommandRequest shape):**
|
|
- `tenant_id`
|
|
- `command_id` (UUID v7)
|
|
- `aggregate_id`
|
|
- `aggregate_type`
|
|
- `payload_json`
|
|
- `metadata` (must include `correlation_id` and `trace_id` when available)
|
|
* **Effect Command:**
|
|
- `tenant_id`
|
|
- `command_id` (UUID v7, used as the idempotency key)
|
|
- `effect_name`
|
|
- `payload` (JSON)
|
|
- `metadata` (`correlation_id`, `trace_id`, retry policy hints)
|
|
* **Effect Result Event:**
|
|
- `tenant_id`
|
|
- `command_id`
|
|
- `effect_name`
|
|
- `result_type` (e.g., `Succeeded`, `Failed`, `TimedOut`)
|
|
- `payload` (JSON)
|
|
- `timestamp`
|
|
|
|
---
|
|
|
|
#### 4. JetStream Integration (Subjects, Streams, Consumers)
|
|
The Runner consumes and produces messages using tenant-namespaced subject conventions consistent with other components.
|
|
|
|
**Aggregate Event Stream (existing):**
|
|
- Stream: `AGGREGATE_EVENTS`
|
|
- Subjects: `tenant.*.aggregate.*.*`
|
|
- Example: `tenant.acme-corp.aggregate.Account.018f...`
|
|
|
|
**Workflow Command Stream (Runner-produced work):**
|
|
- Stream: `WORKFLOW_COMMANDS`
|
|
- Subjects:
|
|
- Effect commands: `tenant.*.effect.<effect_name>.<command_id>`
|
|
- Optional internal commands: `tenant.*.workflow.<saga_name>.<correlation_id>`
|
|
|
|
**Workflow Event Stream (Effect results + workflow facts):**
|
|
- Stream: `WORKFLOW_EVENTS`
|
|
- Subjects:
|
|
- Effect results: `tenant.*.effect_result.<effect_name>.<command_id>`
|
|
- Optional saga facts: `tenant.*.workflow_event.<saga_name>.<correlation_id>`
|
|
|
|
**Consumer Model (Worker Pool):**
|
|
* **Saga workers** use durable consumers filtered to relevant subjects (typically `tenant.<tenant_id>.aggregate.>` or a narrower wildcard set). Replicas share a deliver group so each message is processed by a single worker in the pool.
|
|
* **Effect workers** use durable consumers per `effect_name` (or per effect category), again with a deliver group for horizontal scale.
|
|
* **Ack Discipline:** Messages are acked only after the corresponding state/outbox/checkpoint transaction commits.
|
|
* **Ordering:** Ordering is guaranteed only within a chosen serialization key (usually `(tenant_id, correlation_id)`); concurrency across keys is allowed.
|
|
|
|
---
|
|
|
|
#### 5. Saga Lifecycle (Deterministic Workflow)
|
|
1. **Trigger:** The Runner receives an Aggregate event from JetStream (`AGGREGATE_EVENTS`) and extracts `tenant_id`, `event_id`, and correlation metadata.
|
|
2. **Load:** It loads existing saga state from `KvStore` using `saga:{tenant_id}:{saga_name}:{correlation_id}` (or creates an initial state on first trigger).
|
|
3. **Execute (`runtime-function`):**
|
|
* Runs the deterministic Saga program: `(saga_state, incoming_event) → (new_saga_state, work_items[])`.
|
|
* The program is sandboxed and must not perform I/O or rely on non-deterministic inputs.
|
|
4. **Commit (Atomic Outbox):**
|
|
* In a single MDBX transaction:
|
|
* Persist `new_saga_state`
|
|
* Persist each outgoing work item into `outbox:*`
|
|
* Advance `checkpoint:{tenant_id}:{saga_name}` to the processed sequence
|
|
* Optionally record dedupe markers for `event_id`
|
|
5. **Dispatch:** The outbox relay publishes the work items to their destinations:
|
|
* Aggregate commands are submitted through the Gateway (same routing and tenant enforcement model as user commands).
|
|
* Effect commands are published to `WORKFLOW_COMMANDS` for effect workers to execute.
|
|
6. **Wait:** The saga instance remains persisted and will react to the next correlated event or reminder.
|
|
|
|
---
|
|
|
|
#### 6. Effect Provider Lifecycle (Non-Deterministic Execution)
|
|
1. **Receive Command:** The effect worker consumes an effect command message from `WORKFLOW_COMMANDS` (`tenant.<tenant_id>.effect.<effect_name>.<command_id>`).
|
|
2. **Idempotency Gate:** The worker checks `dedupe:{tenant_id}:effect:{command_id}` (or an equivalent idempotency key) to avoid duplicate external calls.
|
|
3. **Execute:** The provider performs the real-world action (HTTP/gRPC/SMTP/SQL) with:
|
|
* retries + exponential backoff
|
|
* timeouts
|
|
* circuit breakers per upstream dependency
|
|
4. **Publish Result:** The worker publishes an effect result event to `WORKFLOW_EVENTS` (`tenant.<tenant_id>.effect_result.<effect_name>.<command_id>`).
|
|
5. **Finalize:** After result publish is acknowledged, it records completion in `KvStore` (dedupe marker, optional audit trail) and acks the command message.
|
|
|
|
Effect results are consumed by saga workers (and optionally projections) as first-class events.
|
|
|
|
---
|
|
|
|
#### 7. Technical Constraints & Guarantees
|
|
* **Determinism Boundary:** Saga programs are deterministic and side-effect free. All I/O happens only in effect workers.
|
|
* **At-Least-Once Processing:** JetStream delivery is at-least-once; correctness relies on idempotency via checkpoints and dedupe markers.
|
|
* **Atomicity:** “Saga state + outbox + checkpoint” is committed as one durable transaction to avoid dual-write gaps.
|
|
* **Tenant Isolation:** Every read/write is tenant-scoped; cross-tenant access is blocked at both subject filters and storage keyspace.
|
|
* **No Direct Cross-Aggregate Reads:** Sagas coordinate via events and commands; they do not read Aggregate state directly (unless explicitly provided as event payload or via a dedicated query/projection API).
|
|
|
|
---
|
|
|
|
#### 8. Horizontal Scaling Strategy (Worker Pool)
|
|
The Runner scales horizontally by adding replicas.
|
|
|
|
* **Work Distribution:** JetStream deliver groups ensure each message is processed by a single worker in a replica set.
|
|
* **Sharding Options:**
|
|
- **Tenant Sharding:** Place runner replicas on nodes responsible for certain tenant ranges, aligning with gateway routing and operational locality.
|
|
- **Key Sharding:** For large tenants, shard by `(tenant_id, saga_name, correlation_id)` across multiple worker groups.
|
|
* **Draining:** Instances support graceful drain: stop acquiring new work, finish in-flight items, flush outbox relay, then exit.
|
|
* **Replay:** Rebuild sagas by resetting `checkpoint:{tenant_id}:{saga_name}` and re-consuming from JetStream (with strong caution and explicit operator intent).
|
|
|
|
---
|
|
|
|
#### 9. Error Handling & Operational Policies
|
|
* **Poison Messages:** If a message repeatedly fails deterministic execution (schema mismatch, runtime-function error), quarantine it:
|
|
- write a record to `deadletter:{tenant_id}:...`
|
|
- emit an alert/metric
|
|
- continue processing other keys
|
|
* **Retry Discipline:** Retries for deterministic saga transitions should be bounded and should not busy-loop; external effect retries are handled in effect workers with backoff.
|
|
* **Backpressure:** Configure max in-flight per worker and per key; expose lag and outbox depth metrics.
|
|
* **Schema Evolution:** Treat event payloads as versioned; sagas and effects must accept older versions or explicitly gate by version and route to compensating paths.
|
|
|
|
---
|
|
|
|
#### 10. Admin + Health Endpoints (Under Gateway)
|
|
The Runner exposes endpoints consistent with other components:
|
|
- `/health` - storage + JetStream connectivity checks
|
|
- `/ready` - readiness (not draining, can acquire work)
|
|
- `/metrics` - Prometheus metrics
|
|
- `/info` - build info, role mode(s), configured saga/effect sets, stream/consumer names
|
|
- `/admin/drain` - begin graceful drain
|
|
- `/admin/reload` - hot-reload manifest/placement config (where supported)
|
|
|
|
All admin endpoints are expected to run behind the Gateway and enforce tenant-scoped access where applicable.
|