9.7 KiB
External PRD: Changes Required in Aggregate, Projection, Runner
This document captures the work needed outside the Gateway to support:
- Tenant-aware routing via
x-tenant-id - Independent horizontal scalability of Aggregate, Projection, Runner
- A safe mechanism for tenant rebalancing per service kind
Target State
Independent Placements
Each service kind has its own placement map:
aggregate_placement[tenant_id] -> aggregate_shard_idprojection_placement[tenant_id] -> projection_shard_idrunner_placement[tenant_id] -> runner_shard_id
Each shard is a replica set that can scale independently.
Rebalancing Contract (Per Service Kind)
All nodes MUST support:
- Dynamic placement updates (watch NATS KV or reload config)
- A drain mechanism that can target a specific tenant (stop acquiring new work for that tenant, finish in-flight, report status)
- Clear readiness semantics that reflect whether the node will accept work for a tenant
Additionally, all nodes SHOULD converge on the same operational contract:
- A per-tenant “accepting” gate (can this shard accept new work/queries/commands for tenant X?)
- A per-tenant “drained” signal (no in-flight work remains for tenant X)
- A per-tenant warmup/catchup signal where relevant (projection lag, aggregate snapshot availability)
Aggregate: Required Changes
1) Expose a Real Command API (Gateway Upstream)
Today, Aggregate has internal command handling types (e.g., CommandServer) but its running HTTP server only exposes health/metrics/admin endpoints (aggregate/http_server.rs, aggregate/server/mod.rs).
Aggregate MUST expose one of the following upstream APIs for the Gateway to call:
- Option A (Recommended): gRPC server implementing
aggregate.gateway.v1.CommandService/SubmitCommandcompatible with aggregate.proto. - Option B: HTTP endpoint for command submission (REST), with a stable request/response shape that the Gateway can proxy.
2) Tenant Placement Enforcement
Aggregate MUST enforce “hosted tenants” so independent scaling is safe:
- If an Aggregate shard/node is not assigned a tenant, it MUST reject commands for that tenant (e.g.,
403or503with retriable hint depending on whether the issue is authorization vs placement). - Aggregate SHOULD maintain an in-memory allowlist of hosted tenants that is driven by:
- NATS KV placement watcher (preferred), or
- Hot-reloaded config pushed via
/admin/reload
Aggregate already has admin hooks for drain/reload, but they are currently generic and/or illustrative (aggregate/http_server.rs, aggregate/server/mod.rs). These need to become placement-aware.
3) Tenant Drain (Per Tenant)
Aggregate MUST provide a per-tenant drain mechanism to support rebalancing:
- Stop accepting new commands for the tenant.
- Allow in-flight commands to finish (bounded wait), then report drained.
- Expose drain status per tenant (admin endpoint).
4) Rebalancing State Strategy
Aggregate persists snapshots locally (MDBX) and uses JetStream for events. To move a tenant:
- Approach 1 (Snapshot migration): copy tenant snapshot DB/state to the target shard, then switch placement.
- Approach 2 (Cold rehydrate): switch placement and let the target shard rebuild state by replaying events from JetStream; expect higher latency during warmup.
The system should support both, with the rebalancer selecting the strategy based on tenant size/SLO.
5) Metrics for Placement Decisions
Aggregate SHOULD expose:
- Per-tenant command rate, error rate
- In-flight commands by tenant
- Rehydrate time / snapshot hit ratio
- Storage size per tenant (if feasible)
Projection: Required Changes
1) Expose Query API Upstream for Gateway
Projection has a working QueryService with tenant-scoped prefix scans (uqf.rs) but it is not exposed via HTTP/gRPC (current HTTP routes are health/ready/metrics/info only: projection/http/mod.rs).
Projection MUST add one upstream API the Gateway can route to:
POST /query/{view_type}(HTTP) acceptingx-tenant-idand a UQF payload, returningQueryResponse.- Or a gRPC query service (new proto) if gRPC is preferred end-to-end.
2) Tenant Placement Filtering (Independent Scaling)
Projection MUST support running in one of these modes:
- Multi-tenant shard: consumes all tenants (simple, less isolated).
- Tenant-filtered shard (required for rebalancing):
- only consumes/serves queries for the tenants assigned to that shard
- rejects queries for unassigned tenants (consistent error semantics)
Implementation direction:
- Add a placement watcher similar to Runner’s tenant filter (runner/tenant_placement.rs).
- Apply tenant filter to:
- event consumption subject filters (preferred), and
- query serving validation (always).
3) Drain + Warmup Endpoints
Projection SHOULD add:
/admin/drain?tenant_id=...(stop consuming new events for that tenant, finish in-flight, flush checkpoints)/admin/reload(apply latest placement/config)- Optional warmup status: whether the shard has caught up to JetStream tail for that tenant/view_types
4) Rebalancing Strategy for Projection
Projection can rebalance safely with “warm then cut over”:
- Assign tenant to the new projection shard while old shard still serves.
- New shard catches up (replay from JetStream, build view KV).
- Switch Gateway placement for query routing to new shard.
- Drain old shard for that tenant and optionally delete old tenant KV keys.
5) Metrics for Placement Decisions
Projection SHOULD expose:
- JetStream lag per tenant/view_type (tail minus checkpoint)
- Query latency and scan counts
- Storage size per tenant (if feasible)
Runner: Required Changes
Runner already has:
- A tenant placement watcher capable of producing an allowlist (tenant_placement.rs)
- Admin endpoints including drain/reload/config (runner/http/mod.rs)
- Gateway client integration for aggregate command submission (runner/gateway/mod.rs)
To support independent scalability + rebalancing, Runner needs the following.
1) Per-Tenant Drain (Not Only Global)
Runner’s current drain is global (/admin/drain toggles a single draining flag). Runner MUST support draining a specific tenant:
- Stop acquiring new saga/effect work for the tenant.
- Allow in-flight work for the tenant to finish (bounded).
- Flush outbox for the tenant (or guarantee idempotency on handoff).
- Persist final checkpoints so another shard can continue without duplication beyond at-least-once bounds.
2) Placement-Enforced Work Acquisition
Runner MUST validate tenant assignment at the boundary where it:
- consumes JetStream messages (saga triggers, effect commands), and
- dispatches outbox work.
If a tenant is not assigned to the shard, Runner must not process its work.
3) Handoff Safety Rules for Rebalancing
Runner rebalancing should follow:
- New shard begins processing only after it is assigned the tenant.
- Old shard stops acquiring new work for that tenant, then drains.
- Idempotency remains correct across handoff using checkpoints and dedupe markers.
4) Metrics for Placement Decisions
Runner SHOULD expose:
- Outbox depth by tenant
- Work processing latency and retries by tenant/effect
- Schedule due items by tenant
- Consumer lag by tenant (if the consumption model supports per-tenant lag)
5) Auth Delivery Side Effects (Email/SMS/Push)
If the platform’s AuthN flows require out-of-band delivery (password reset links, email verification, MFA codes), the Runner SHOULD be the standard place to execute those side effects:
- Define a stable effect interface for sending transactional emails (reset links, verification links, security alerts).
- Optionally add SMS/push providers later under the same effect contract.
This keeps the Gateway free of long-lived provider credentials and aligns with the existing “effects are executed by workers” pattern.
Gateway Integration Notes
Once the above changes exist:
- Gateway routes per
(tenant_id, service_kind)using independent placement maps. - Gateway can implement “warm then cut over” rebalancing for Projection and Runner by switching only query/workflow routing after readiness conditions are met.
- Gateway can enforce consistent tenant validation, authn/authz, and error semantics at the edge even as placements move.
Gaps / Opportunities
- KV schema + ownership: define the exact NATS KV bucket layout, key naming, revisioning rules, and who is allowed to write placement updates.
- Rebalancer API: define operator workflows (plan/apply/rollback), status reporting, and audit log requirements for placement changes.
- Shard discovery: define how shard endpoints are registered (static config vs KV directory entries) and how health is represented.
- Consistency boundaries: define rebalancing guarantees per service kind (projection can be warm-cutover; runner requires checkpoint handoff; aggregate requires single-writer and state availability).