193 lines
9.7 KiB
Markdown
193 lines
9.7 KiB
Markdown
### External PRD: Changes Required in Aggregate, Projection, Runner
|
||
|
||
This document captures the work needed outside the Gateway to support:
|
||
- Tenant-aware routing via `x-tenant-id`
|
||
- Independent horizontal scalability of Aggregate, Projection, Runner
|
||
- A safe mechanism for tenant rebalancing per service kind
|
||
|
||
---
|
||
|
||
## **Target State**
|
||
|
||
### Independent Placements
|
||
|
||
Each service kind has its own placement map:
|
||
- `aggregate_placement[tenant_id] -> aggregate_shard_id`
|
||
- `projection_placement[tenant_id] -> projection_shard_id`
|
||
- `runner_placement[tenant_id] -> runner_shard_id`
|
||
|
||
Each shard is a replica set that can scale independently.
|
||
|
||
### Rebalancing Contract (Per Service Kind)
|
||
|
||
All nodes MUST support:
|
||
- Dynamic placement updates (watch NATS KV or reload config)
|
||
- A drain mechanism that can target a specific tenant (stop acquiring new work for that tenant, finish in-flight, report status)
|
||
- Clear readiness semantics that reflect whether the node will accept work for a tenant
|
||
|
||
Additionally, all nodes SHOULD converge on the same operational contract:
|
||
- A per-tenant “accepting” gate (can this shard accept new work/queries/commands for tenant X?)
|
||
- A per-tenant “drained” signal (no in-flight work remains for tenant X)
|
||
- A per-tenant warmup/catchup signal where relevant (projection lag, aggregate snapshot availability)
|
||
|
||
---
|
||
|
||
## **Aggregate: Required Changes**
|
||
|
||
### 1) Expose a Real Command API (Gateway Upstream)
|
||
|
||
Today, Aggregate has internal command handling types (e.g., `CommandServer`) but its running HTTP server only exposes health/metrics/admin endpoints ([aggregate/http_server.rs](file:///Users/vlad/Developer/cloudlysis/aggregate/src/http_server.rs#L15-L82), [aggregate/server/mod.rs](file:///Users/vlad/Developer/cloudlysis/aggregate/src/server/mod.rs#L81-L213)).
|
||
|
||
Aggregate MUST expose one of the following upstream APIs for the Gateway to call:
|
||
- **Option A (Recommended)**: gRPC server implementing `aggregate.gateway.v1.CommandService/SubmitCommand` compatible with [aggregate.proto](file:///Users/vlad/Developer/cloudlysis/aggregate/proto/aggregate.proto#L1-L31).
|
||
- **Option B**: HTTP endpoint for command submission (REST), with a stable request/response shape that the Gateway can proxy.
|
||
|
||
### 2) Tenant Placement Enforcement
|
||
|
||
Aggregate MUST enforce “hosted tenants” so independent scaling is safe:
|
||
- If an Aggregate shard/node is not assigned a tenant, it MUST reject commands for that tenant (e.g., `403` or `503` with retriable hint depending on whether the issue is authorization vs placement).
|
||
- Aggregate SHOULD maintain an in-memory allowlist of hosted tenants that is driven by:
|
||
- NATS KV placement watcher (preferred), or
|
||
- Hot-reloaded config pushed via `/admin/reload`
|
||
|
||
Aggregate already has admin hooks for drain/reload, but they are currently generic and/or illustrative ([aggregate/http_server.rs](file:///Users/vlad/Developer/cloudlysis/aggregate/src/http_server.rs#L15-L72), [aggregate/server/mod.rs](file:///Users/vlad/Developer/cloudlysis/aggregate/src/server/mod.rs#L402-L442)). These need to become placement-aware.
|
||
|
||
### 3) Tenant Drain (Per Tenant)
|
||
|
||
Aggregate MUST provide a per-tenant drain mechanism to support rebalancing:
|
||
- Stop accepting new commands for the tenant.
|
||
- Allow in-flight commands to finish (bounded wait), then report drained.
|
||
- Expose drain status per tenant (admin endpoint).
|
||
|
||
### 4) Rebalancing State Strategy
|
||
|
||
Aggregate persists snapshots locally (MDBX) and uses JetStream for events. To move a tenant:
|
||
- **Approach 1 (Snapshot migration)**: copy tenant snapshot DB/state to the target shard, then switch placement.
|
||
- **Approach 2 (Cold rehydrate)**: switch placement and let the target shard rebuild state by replaying events from JetStream; expect higher latency during warmup.
|
||
|
||
The system should support both, with the rebalancer selecting the strategy based on tenant size/SLO.
|
||
|
||
### 5) Metrics for Placement Decisions
|
||
|
||
Aggregate SHOULD expose:
|
||
- Per-tenant command rate, error rate
|
||
- In-flight commands by tenant
|
||
- Rehydrate time / snapshot hit ratio
|
||
- Storage size per tenant (if feasible)
|
||
|
||
---
|
||
|
||
## **Projection: Required Changes**
|
||
|
||
### 1) Expose Query API Upstream for Gateway
|
||
|
||
Projection has a working `QueryService` with tenant-scoped prefix scans ([uqf.rs](file:///Users/vlad/Developer/cloudlysis/projection/src/query/uqf.rs#L121-L162)) but it is not exposed via HTTP/gRPC (current HTTP routes are health/ready/metrics/info only: [projection/http/mod.rs](file:///Users/vlad/Developer/cloudlysis/projection/src/http/mod.rs#L102-L109)).
|
||
|
||
Projection MUST add one upstream API the Gateway can route to:
|
||
- `POST /query/{view_type}` (HTTP) accepting `x-tenant-id` and a UQF payload, returning `QueryResponse`.
|
||
- Or a gRPC query service (new proto) if gRPC is preferred end-to-end.
|
||
|
||
### 2) Tenant Placement Filtering (Independent Scaling)
|
||
|
||
Projection MUST support running in one of these modes:
|
||
- **Multi-tenant shard**: consumes all tenants (simple, less isolated).
|
||
- **Tenant-filtered shard (required for rebalancing)**:
|
||
- only consumes/serves queries for the tenants assigned to that shard
|
||
- rejects queries for unassigned tenants (consistent error semantics)
|
||
|
||
Implementation direction:
|
||
- Add a placement watcher similar to Runner’s tenant filter ([runner/tenant_placement.rs](file:///Users/vlad/Developer/cloudlysis/runner/src/tenant_placement.rs#L8-L100)).
|
||
- Apply tenant filter to:
|
||
- event consumption subject filters (preferred), and
|
||
- query serving validation (always).
|
||
|
||
### 3) Drain + Warmup Endpoints
|
||
|
||
Projection SHOULD add:
|
||
- `/admin/drain?tenant_id=...` (stop consuming new events for that tenant, finish in-flight, flush checkpoints)
|
||
- `/admin/reload` (apply latest placement/config)
|
||
- Optional warmup status: whether the shard has caught up to JetStream tail for that tenant/view_types
|
||
|
||
### 4) Rebalancing Strategy for Projection
|
||
|
||
Projection can rebalance safely with “warm then cut over”:
|
||
- Assign tenant to the new projection shard while old shard still serves.
|
||
- New shard catches up (replay from JetStream, build view KV).
|
||
- Switch Gateway placement for query routing to new shard.
|
||
- Drain old shard for that tenant and optionally delete old tenant KV keys.
|
||
|
||
### 5) Metrics for Placement Decisions
|
||
|
||
Projection SHOULD expose:
|
||
- JetStream lag per tenant/view_type (tail minus checkpoint)
|
||
- Query latency and scan counts
|
||
- Storage size per tenant (if feasible)
|
||
|
||
---
|
||
|
||
## **Runner: Required Changes**
|
||
|
||
Runner already has:
|
||
- A tenant placement watcher capable of producing an allowlist ([tenant_placement.rs](file:///Users/vlad/Developer/cloudlysis/runner/src/tenant_placement.rs#L8-L100))
|
||
- Admin endpoints including drain/reload/config ([runner/http/mod.rs](file:///Users/vlad/Developer/cloudlysis/runner/src/http/mod.rs#L69-L86))
|
||
- Gateway client integration for aggregate command submission ([runner/gateway/mod.rs](file:///Users/vlad/Developer/cloudlysis/runner/src/gateway/mod.rs#L1-L47))
|
||
|
||
To support independent scalability + rebalancing, Runner needs the following.
|
||
|
||
### 1) Per-Tenant Drain (Not Only Global)
|
||
|
||
Runner’s current drain is global (`/admin/drain` toggles a single draining flag). Runner MUST support draining a specific tenant:
|
||
- Stop acquiring new saga/effect work for the tenant.
|
||
- Allow in-flight work for the tenant to finish (bounded).
|
||
- Flush outbox for the tenant (or guarantee idempotency on handoff).
|
||
- Persist final checkpoints so another shard can continue without duplication beyond at-least-once bounds.
|
||
|
||
### 2) Placement-Enforced Work Acquisition
|
||
|
||
Runner MUST validate tenant assignment at the boundary where it:
|
||
- consumes JetStream messages (saga triggers, effect commands), and
|
||
- dispatches outbox work.
|
||
|
||
If a tenant is not assigned to the shard, Runner must not process its work.
|
||
|
||
### 3) Handoff Safety Rules for Rebalancing
|
||
|
||
Runner rebalancing should follow:
|
||
- New shard begins processing only after it is assigned the tenant.
|
||
- Old shard stops acquiring new work for that tenant, then drains.
|
||
- Idempotency remains correct across handoff using checkpoints and dedupe markers.
|
||
|
||
### 4) Metrics for Placement Decisions
|
||
|
||
Runner SHOULD expose:
|
||
- Outbox depth by tenant
|
||
- Work processing latency and retries by tenant/effect
|
||
- Schedule due items by tenant
|
||
- Consumer lag by tenant (if the consumption model supports per-tenant lag)
|
||
|
||
### 5) Auth Delivery Side Effects (Email/SMS/Push)
|
||
|
||
If the platform’s AuthN flows require out-of-band delivery (password reset links, email verification, MFA codes), the Runner SHOULD be the standard place to execute those side effects:
|
||
- Define a stable effect interface for sending transactional emails (reset links, verification links, security alerts).
|
||
- Optionally add SMS/push providers later under the same effect contract.
|
||
|
||
This keeps the Gateway free of long-lived provider credentials and aligns with the existing “effects are executed by workers” pattern.
|
||
|
||
---
|
||
|
||
## **Gateway Integration Notes**
|
||
|
||
Once the above changes exist:
|
||
- Gateway routes per `(tenant_id, service_kind)` using independent placement maps.
|
||
- Gateway can implement “warm then cut over” rebalancing for Projection and Runner by switching only query/workflow routing after readiness conditions are met.
|
||
- Gateway can enforce consistent tenant validation, authn/authz, and error semantics at the edge even as placements move.
|
||
|
||
---
|
||
|
||
## **Gaps / Opportunities**
|
||
|
||
- **KV schema + ownership**: define the exact NATS KV bucket layout, key naming, revisioning rules, and who is allowed to write placement updates.
|
||
- **Rebalancer API**: define operator workflows (plan/apply/rollback), status reporting, and audit log requirements for placement changes.
|
||
- **Shard discovery**: define how shard endpoints are registered (static config vs KV directory entries) and how health is represented.
|
||
- **Consistency boundaries**: define rebalancing guarantees per service kind (projection can be warm-cutover; runner requires checkpoint handoff; aggregate requires single-writer and state availability).
|