Files
cloudlysis/runner/external_prd.md
Vlad Durnea 1298d9a3df
Some checks failed
ci / rust (push) Failing after 2m34s
ci / ui (push) Failing after 30s
Monorepo consolidation: workspace, shared types, transport plans, docker/swam assets
2026-03-30 11:40:42 +03:00

193 lines
9.7 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
### External PRD: Changes Required in Aggregate, Projection, Runner
This document captures the work needed outside the Gateway to support:
- Tenant-aware routing via `x-tenant-id`
- Independent horizontal scalability of Aggregate, Projection, Runner
- A safe mechanism for tenant rebalancing per service kind
---
## **Target State**
### Independent Placements
Each service kind has its own placement map:
- `aggregate_placement[tenant_id] -> aggregate_shard_id`
- `projection_placement[tenant_id] -> projection_shard_id`
- `runner_placement[tenant_id] -> runner_shard_id`
Each shard is a replica set that can scale independently.
### Rebalancing Contract (Per Service Kind)
All nodes MUST support:
- Dynamic placement updates (watch NATS KV or reload config)
- A drain mechanism that can target a specific tenant (stop acquiring new work for that tenant, finish in-flight, report status)
- Clear readiness semantics that reflect whether the node will accept work for a tenant
Additionally, all nodes SHOULD converge on the same operational contract:
- A per-tenant “accepting” gate (can this shard accept new work/queries/commands for tenant X?)
- A per-tenant “drained” signal (no in-flight work remains for tenant X)
- A per-tenant warmup/catchup signal where relevant (projection lag, aggregate snapshot availability)
---
## **Aggregate: Required Changes**
### 1) Expose a Real Command API (Gateway Upstream)
Today, Aggregate has internal command handling types (e.g., `CommandServer`) but its running HTTP server only exposes health/metrics/admin endpoints ([aggregate/http_server.rs](file:///Users/vlad/Developer/cloudlysis/aggregate/src/http_server.rs#L15-L82), [aggregate/server/mod.rs](file:///Users/vlad/Developer/cloudlysis/aggregate/src/server/mod.rs#L81-L213)).
Aggregate MUST expose one of the following upstream APIs for the Gateway to call:
- **Option A (Recommended)**: gRPC server implementing `aggregate.gateway.v1.CommandService/SubmitCommand` compatible with [aggregate.proto](file:///Users/vlad/Developer/cloudlysis/aggregate/proto/aggregate.proto#L1-L31).
- **Option B**: HTTP endpoint for command submission (REST), with a stable request/response shape that the Gateway can proxy.
### 2) Tenant Placement Enforcement
Aggregate MUST enforce “hosted tenants” so independent scaling is safe:
- If an Aggregate shard/node is not assigned a tenant, it MUST reject commands for that tenant (e.g., `403` or `503` with retriable hint depending on whether the issue is authorization vs placement).
- Aggregate SHOULD maintain an in-memory allowlist of hosted tenants that is driven by:
- NATS KV placement watcher (preferred), or
- Hot-reloaded config pushed via `/admin/reload`
Aggregate already has admin hooks for drain/reload, but they are currently generic and/or illustrative ([aggregate/http_server.rs](file:///Users/vlad/Developer/cloudlysis/aggregate/src/http_server.rs#L15-L72), [aggregate/server/mod.rs](file:///Users/vlad/Developer/cloudlysis/aggregate/src/server/mod.rs#L402-L442)). These need to become placement-aware.
### 3) Tenant Drain (Per Tenant)
Aggregate MUST provide a per-tenant drain mechanism to support rebalancing:
- Stop accepting new commands for the tenant.
- Allow in-flight commands to finish (bounded wait), then report drained.
- Expose drain status per tenant (admin endpoint).
### 4) Rebalancing State Strategy
Aggregate persists snapshots locally (MDBX) and uses JetStream for events. To move a tenant:
- **Approach 1 (Snapshot migration)**: copy tenant snapshot DB/state to the target shard, then switch placement.
- **Approach 2 (Cold rehydrate)**: switch placement and let the target shard rebuild state by replaying events from JetStream; expect higher latency during warmup.
The system should support both, with the rebalancer selecting the strategy based on tenant size/SLO.
### 5) Metrics for Placement Decisions
Aggregate SHOULD expose:
- Per-tenant command rate, error rate
- In-flight commands by tenant
- Rehydrate time / snapshot hit ratio
- Storage size per tenant (if feasible)
---
## **Projection: Required Changes**
### 1) Expose Query API Upstream for Gateway
Projection has a working `QueryService` with tenant-scoped prefix scans ([uqf.rs](file:///Users/vlad/Developer/cloudlysis/projection/src/query/uqf.rs#L121-L162)) but it is not exposed via HTTP/gRPC (current HTTP routes are health/ready/metrics/info only: [projection/http/mod.rs](file:///Users/vlad/Developer/cloudlysis/projection/src/http/mod.rs#L102-L109)).
Projection MUST add one upstream API the Gateway can route to:
- `POST /query/{view_type}` (HTTP) accepting `x-tenant-id` and a UQF payload, returning `QueryResponse`.
- Or a gRPC query service (new proto) if gRPC is preferred end-to-end.
### 2) Tenant Placement Filtering (Independent Scaling)
Projection MUST support running in one of these modes:
- **Multi-tenant shard**: consumes all tenants (simple, less isolated).
- **Tenant-filtered shard (required for rebalancing)**:
- only consumes/serves queries for the tenants assigned to that shard
- rejects queries for unassigned tenants (consistent error semantics)
Implementation direction:
- Add a placement watcher similar to Runners tenant filter ([runner/tenant_placement.rs](file:///Users/vlad/Developer/cloudlysis/runner/src/tenant_placement.rs#L8-L100)).
- Apply tenant filter to:
- event consumption subject filters (preferred), and
- query serving validation (always).
### 3) Drain + Warmup Endpoints
Projection SHOULD add:
- `/admin/drain?tenant_id=...` (stop consuming new events for that tenant, finish in-flight, flush checkpoints)
- `/admin/reload` (apply latest placement/config)
- Optional warmup status: whether the shard has caught up to JetStream tail for that tenant/view_types
### 4) Rebalancing Strategy for Projection
Projection can rebalance safely with “warm then cut over”:
- Assign tenant to the new projection shard while old shard still serves.
- New shard catches up (replay from JetStream, build view KV).
- Switch Gateway placement for query routing to new shard.
- Drain old shard for that tenant and optionally delete old tenant KV keys.
### 5) Metrics for Placement Decisions
Projection SHOULD expose:
- JetStream lag per tenant/view_type (tail minus checkpoint)
- Query latency and scan counts
- Storage size per tenant (if feasible)
---
## **Runner: Required Changes**
Runner already has:
- A tenant placement watcher capable of producing an allowlist ([tenant_placement.rs](file:///Users/vlad/Developer/cloudlysis/runner/src/tenant_placement.rs#L8-L100))
- Admin endpoints including drain/reload/config ([runner/http/mod.rs](file:///Users/vlad/Developer/cloudlysis/runner/src/http/mod.rs#L69-L86))
- Gateway client integration for aggregate command submission ([runner/gateway/mod.rs](file:///Users/vlad/Developer/cloudlysis/runner/src/gateway/mod.rs#L1-L47))
To support independent scalability + rebalancing, Runner needs the following.
### 1) Per-Tenant Drain (Not Only Global)
Runners current drain is global (`/admin/drain` toggles a single draining flag). Runner MUST support draining a specific tenant:
- Stop acquiring new saga/effect work for the tenant.
- Allow in-flight work for the tenant to finish (bounded).
- Flush outbox for the tenant (or guarantee idempotency on handoff).
- Persist final checkpoints so another shard can continue without duplication beyond at-least-once bounds.
### 2) Placement-Enforced Work Acquisition
Runner MUST validate tenant assignment at the boundary where it:
- consumes JetStream messages (saga triggers, effect commands), and
- dispatches outbox work.
If a tenant is not assigned to the shard, Runner must not process its work.
### 3) Handoff Safety Rules for Rebalancing
Runner rebalancing should follow:
- New shard begins processing only after it is assigned the tenant.
- Old shard stops acquiring new work for that tenant, then drains.
- Idempotency remains correct across handoff using checkpoints and dedupe markers.
### 4) Metrics for Placement Decisions
Runner SHOULD expose:
- Outbox depth by tenant
- Work processing latency and retries by tenant/effect
- Schedule due items by tenant
- Consumer lag by tenant (if the consumption model supports per-tenant lag)
### 5) Auth Delivery Side Effects (Email/SMS/Push)
If the platforms AuthN flows require out-of-band delivery (password reset links, email verification, MFA codes), the Runner SHOULD be the standard place to execute those side effects:
- Define a stable effect interface for sending transactional emails (reset links, verification links, security alerts).
- Optionally add SMS/push providers later under the same effect contract.
This keeps the Gateway free of long-lived provider credentials and aligns with the existing “effects are executed by workers” pattern.
---
## **Gateway Integration Notes**
Once the above changes exist:
- Gateway routes per `(tenant_id, service_kind)` using independent placement maps.
- Gateway can implement “warm then cut over” rebalancing for Projection and Runner by switching only query/workflow routing after readiness conditions are met.
- Gateway can enforce consistent tenant validation, authn/authz, and error semantics at the edge even as placements move.
---
## **Gaps / Opportunities**
- **KV schema + ownership**: define the exact NATS KV bucket layout, key naming, revisioning rules, and who is allowed to write placement updates.
- **Rebalancer API**: define operator workflows (plan/apply/rollback), status reporting, and audit log requirements for placement changes.
- **Shard discovery**: define how shard endpoints are registered (static config vs KV directory entries) and how health is represented.
- **Consistency boundaries**: define rebalancing guarantees per service kind (projection can be warm-cutover; runner requires checkpoint handoff; aggregate requires single-writer and state availability).