Files
cloudlysis/runner/prd.md
Vlad Durnea 1298d9a3df
Some checks failed
ci / rust (push) Failing after 2m34s
ci / ui (push) Failing after 30s
Monorepo consolidation: workspace, shared types, transport plans, docker/swam assets
2026-03-30 11:40:42 +03:00

211 lines
13 KiB
Markdown

### 🧱 Component: Runner (Sagas + Effect Workflows)
**Definition:**
The Runner is a standalone container responsible for executing **workflow logic** in the system: it runs deterministic **Saga** state machines and drives non-deterministic **Effect Provider** executions. It consumes and produces messages via **NATS JetStream**, persists workflow state and checkpoints in `edge-storage` `KvStore`, and uses the same “container + gateway + multi-tenant isolation + horizontal scale” model as Aggregate and Projection.
The Runner is designed to operate as a **worker** in a horizontally scalable worker pool: multiple Runner replicas can share the workload from JetStream while preserving ordering and idempotency guarantees per workflow key.
**Multi-Tenancy:**
Multi-tenancy is first-class via `tenant_id`. When enabled:
- **Routing:** All inbound admin/query requests are tenant-scoped using the `x-tenant-id` header (same key as other components).
- **Stream Isolation:** JetStream subjects and durable consumer names are tenant-aware (e.g., `tenant.<tenant_id>.aggregate.*.*`, `tenant.<tenant_id>.workflow.*.*`).
- **Storage Namespacing:** Saga state, outbox items, checkpoints, and scheduling state are keyed with `tenant_id` prefixes to prevent cross-tenant reads/writes.
- **Worker Partitioning:** Runner instances may be sharded by `tenant_id` (tenant placement) or may run multi-tenant with strict subject + key isolation.
- **Backward Compatibility:** Deployments without multi-tenancy use a default/empty `tenant_id` and non-tenant-prefixed namespaces.
**Dependencies:**
* Core crates pulled from the custom Cargo registry:
```toml
[registries.madapes]
index = "sparse+https://git.madapes.com/api/packages/madapes/cargo/"
```
| Crate | Purpose |
|-------|---------|
| `edge-storage` | libmdbx-backed `KvStore` for saga state, checkpoints, outbox, and schedules |
| `runtime-function` | Deterministic DAG execution for Saga `on_event` / `compensation` programs |
| `edge-logger-client` | High-performance logging (UDS + Protobuf, Loki sink) |
| `query-engine` | Optional UQF queries over workflow state (admin/debug tooling) |
| `async-nats` | NATS JetStream client for consuming events and dispatching workflow messages |
* Source code available at `../../madapes/`
* **Note:** This is a standalone container, aligned with Aggregate/Projection operational constraints and patterns.
**Observability:**
* Production stack: **Grafana** + **Victoria Metrics** + **Loki**
* Logs via `edge-logger-client` with multi-tenant isolation and cardinality protection
* Metrics exported in Prometheus format for Victoria Metrics scraping (worker lag, outbox depth, effect latency)
* Trace correlation via propagated `trace_id` in message metadata and log fields
---
#### 1. Core Responsibilities
* **Event Consumption (Triggering):** Consumes Aggregate events (and optionally workflow events) from JetStream using durable consumers and subject filters.
* **Saga Execution:** Runs deterministic Saga `on_event` programs (`runtime-function` DAG) to compute `(new_state, outgoing_work[])`.
* **Atomic Persistence (State + Outbox + Checkpoint):** Commits saga state updates, newly produced work items, and processing checkpoints in a single `KvStore` transaction.
* **Outbox Relay:** Reliably dispatches outgoing work to the appropriate destination (Gateway command submission, Effect Provider command stream) after it is durably recorded.
* **Effect Execution:** Runs Effect Provider workflows that translate internal “effect commands” into external actions (HTTP/gRPC/SMTP/SQL) and publishes result events back into JetStream.
* **Scheduling (Durable Timeouts):** Provides durable timers/reminders for sagas (e.g., “cancel order if not paid in 30 minutes”) without relying on in-memory timers as the source of truth.
* **Backpressure + Safety:** Enforces max in-flight work, retry policies, and poison-message handling to keep the worker pool healthy.
---
#### 2. Runner Operating Modes (Single Binary, Multiple Roles)
The Runner can be deployed in one of these modes:
1. **Saga Worker:** Consumes trigger events and advances saga state machines; emits commands/effect commands via outbox.
2. **Effect Worker:** Consumes effect commands, executes real-world side effects, publishes result events.
3. **Combined Worker:** Runs both roles in the same container for small deployments (still with strict separation between deterministic saga runtime and non-deterministic effect execution).
Each mode uses the same multitenant model and can scale horizontally by increasing replica count.
---
#### 3. Data Model (Keys, Envelopes, Namespaces)
The Runner relies on a small set of durable keyspaces in `edge-storage` `KvStore`. All keys are tenant-prefixed when multi-tenancy is enabled.
**Saga State:**
- `saga:{tenant_id}:{saga_name}:{correlation_id}` → JSON state payload
**Saga Checkpoints (JetStream stream sequence or consumer sequence):**
- `checkpoint:{tenant_id}:{saga_name}` → last processed sequence (monotonic)
**Outbox (Reliable Dispatch):**
- `outbox:{tenant_id}:{work_kind}:{work_id}` → serialized work item (command or effect command)
- `outbox_index:{tenant_id}` → optional index cursor / priority ordering metadata
**Schedules (Durable Timeouts/Reminders):**
- `schedule:{tenant_id}:{saga_name}:{correlation_id}:{due_at}` → reminder payload
**Idempotency / Dedupe (optional but recommended):**
- `dedupe:{tenant_id}:{saga_name}:{event_id}` → marker that an event transition was applied
- `dedupe:{tenant_id}:effect:{command_id}` → marker that an effect was executed
The `tenant_id` type follows the same semantics as other components (`TenantId` string wrapper; default/empty allowed for single-tenant deployments).
**Work Item Envelopes:**
* **Aggregate Command (Gateway SubmitCommandRequest shape):**
- `tenant_id`
- `command_id` (UUID v7)
- `aggregate_id`
- `aggregate_type`
- `payload_json`
- `metadata` (must include `correlation_id` and `trace_id` when available)
* **Effect Command:**
- `tenant_id`
- `command_id` (UUID v7, used as the idempotency key)
- `effect_name`
- `payload` (JSON)
- `metadata` (`correlation_id`, `trace_id`, retry policy hints)
* **Effect Result Event:**
- `tenant_id`
- `command_id`
- `effect_name`
- `result_type` (e.g., `Succeeded`, `Failed`, `TimedOut`)
- `payload` (JSON)
- `timestamp`
---
#### 4. JetStream Integration (Subjects, Streams, Consumers)
The Runner consumes and produces messages using tenant-namespaced subject conventions consistent with other components.
**Aggregate Event Stream (existing):**
- Stream: `AGGREGATE_EVENTS`
- Subjects: `tenant.*.aggregate.*.*`
- Example: `tenant.acme-corp.aggregate.Account.018f...`
**Workflow Command Stream (Runner-produced work):**
- Stream: `WORKFLOW_COMMANDS`
- Subjects:
- Effect commands: `tenant.*.effect.<effect_name>.<command_id>`
- Optional internal commands: `tenant.*.workflow.<saga_name>.<correlation_id>`
**Workflow Event Stream (Effect results + workflow facts):**
- Stream: `WORKFLOW_EVENTS`
- Subjects:
- Effect results: `tenant.*.effect_result.<effect_name>.<command_id>`
- Optional saga facts: `tenant.*.workflow_event.<saga_name>.<correlation_id>`
**Consumer Model (Worker Pool):**
* **Saga workers** use durable consumers filtered to relevant subjects (typically `tenant.<tenant_id>.aggregate.>` or a narrower wildcard set). Replicas share a deliver group so each message is processed by a single worker in the pool.
* **Effect workers** use durable consumers per `effect_name` (or per effect category), again with a deliver group for horizontal scale.
* **Ack Discipline:** Messages are acked only after the corresponding state/outbox/checkpoint transaction commits.
* **Ordering:** Ordering is guaranteed only within a chosen serialization key (usually `(tenant_id, correlation_id)`); concurrency across keys is allowed.
---
#### 5. Saga Lifecycle (Deterministic Workflow)
1. **Trigger:** The Runner receives an Aggregate event from JetStream (`AGGREGATE_EVENTS`) and extracts `tenant_id`, `event_id`, and correlation metadata.
2. **Load:** It loads existing saga state from `KvStore` using `saga:{tenant_id}:{saga_name}:{correlation_id}` (or creates an initial state on first trigger).
3. **Execute (`runtime-function`):**
* Runs the deterministic Saga program: `(saga_state, incoming_event) → (new_saga_state, work_items[])`.
* The program is sandboxed and must not perform I/O or rely on non-deterministic inputs.
4. **Commit (Atomic Outbox):**
* In a single MDBX transaction:
* Persist `new_saga_state`
* Persist each outgoing work item into `outbox:*`
* Advance `checkpoint:{tenant_id}:{saga_name}` to the processed sequence
* Optionally record dedupe markers for `event_id`
5. **Dispatch:** The outbox relay publishes the work items to their destinations:
* Aggregate commands are submitted through the Gateway (same routing and tenant enforcement model as user commands).
* Effect commands are published to `WORKFLOW_COMMANDS` for effect workers to execute.
6. **Wait:** The saga instance remains persisted and will react to the next correlated event or reminder.
---
#### 6. Effect Provider Lifecycle (Non-Deterministic Execution)
1. **Receive Command:** The effect worker consumes an effect command message from `WORKFLOW_COMMANDS` (`tenant.<tenant_id>.effect.<effect_name>.<command_id>`).
2. **Idempotency Gate:** The worker checks `dedupe:{tenant_id}:effect:{command_id}` (or an equivalent idempotency key) to avoid duplicate external calls.
3. **Execute:** The provider performs the real-world action (HTTP/gRPC/SMTP/SQL) with:
* retries + exponential backoff
* timeouts
* circuit breakers per upstream dependency
4. **Publish Result:** The worker publishes an effect result event to `WORKFLOW_EVENTS` (`tenant.<tenant_id>.effect_result.<effect_name>.<command_id>`).
5. **Finalize:** After result publish is acknowledged, it records completion in `KvStore` (dedupe marker, optional audit trail) and acks the command message.
Effect results are consumed by saga workers (and optionally projections) as first-class events.
---
#### 7. Technical Constraints & Guarantees
* **Determinism Boundary:** Saga programs are deterministic and side-effect free. All I/O happens only in effect workers.
* **At-Least-Once Processing:** JetStream delivery is at-least-once; correctness relies on idempotency via checkpoints and dedupe markers.
* **Atomicity:** “Saga state + outbox + checkpoint” is committed as one durable transaction to avoid dual-write gaps.
* **Tenant Isolation:** Every read/write is tenant-scoped; cross-tenant access is blocked at both subject filters and storage keyspace.
* **No Direct Cross-Aggregate Reads:** Sagas coordinate via events and commands; they do not read Aggregate state directly (unless explicitly provided as event payload or via a dedicated query/projection API).
---
#### 8. Horizontal Scaling Strategy (Worker Pool)
The Runner scales horizontally by adding replicas.
* **Work Distribution:** JetStream deliver groups ensure each message is processed by a single worker in a replica set.
* **Sharding Options:**
- **Tenant Sharding:** Place runner replicas on nodes responsible for certain tenant ranges, aligning with gateway routing and operational locality.
- **Key Sharding:** For large tenants, shard by `(tenant_id, saga_name, correlation_id)` across multiple worker groups.
* **Draining:** Instances support graceful drain: stop acquiring new work, finish in-flight items, flush outbox relay, then exit.
* **Replay:** Rebuild sagas by resetting `checkpoint:{tenant_id}:{saga_name}` and re-consuming from JetStream (with strong caution and explicit operator intent).
---
#### 9. Error Handling & Operational Policies
* **Poison Messages:** If a message repeatedly fails deterministic execution (schema mismatch, runtime-function error), quarantine it:
- write a record to `deadletter:{tenant_id}:...`
- emit an alert/metric
- continue processing other keys
* **Retry Discipline:** Retries for deterministic saga transitions should be bounded and should not busy-loop; external effect retries are handled in effect workers with backoff.
* **Backpressure:** Configure max in-flight per worker and per key; expose lag and outbox depth metrics.
* **Schema Evolution:** Treat event payloads as versioned; sagas and effects must accept older versions or explicitly gate by version and route to compensating paths.
---
#### 10. Admin + Health Endpoints (Under Gateway)
The Runner exposes endpoints consistent with other components:
- `/health` - storage + JetStream connectivity checks
- `/ready` - readiness (not draining, can acquire work)
- `/metrics` - Prometheus metrics
- `/info` - build info, role mode(s), configured saga/effect sets, stream/consumer names
- `/admin/drain` - begin graceful drain
- `/admin/reload` - hot-reload manifest/placement config (where supported)
All admin endpoints are expected to run behind the Gateway and enforce tenant-scoped access where applicable.