Files
cloudlysis/runner/external_prd.md
Vlad Durnea 1298d9a3df
Some checks failed
ci / rust (push) Failing after 2m34s
ci / ui (push) Failing after 30s
Monorepo consolidation: workspace, shared types, transport plans, docker/swam assets
2026-03-30 11:40:42 +03:00

9.7 KiB
Raw Permalink Blame History

External PRD: Changes Required in Aggregate, Projection, Runner

This document captures the work needed outside the Gateway to support:

  • Tenant-aware routing via x-tenant-id
  • Independent horizontal scalability of Aggregate, Projection, Runner
  • A safe mechanism for tenant rebalancing per service kind

Target State

Independent Placements

Each service kind has its own placement map:

  • aggregate_placement[tenant_id] -> aggregate_shard_id
  • projection_placement[tenant_id] -> projection_shard_id
  • runner_placement[tenant_id] -> runner_shard_id

Each shard is a replica set that can scale independently.

Rebalancing Contract (Per Service Kind)

All nodes MUST support:

  • Dynamic placement updates (watch NATS KV or reload config)
  • A drain mechanism that can target a specific tenant (stop acquiring new work for that tenant, finish in-flight, report status)
  • Clear readiness semantics that reflect whether the node will accept work for a tenant

Additionally, all nodes SHOULD converge on the same operational contract:

  • A per-tenant “accepting” gate (can this shard accept new work/queries/commands for tenant X?)
  • A per-tenant “drained” signal (no in-flight work remains for tenant X)
  • A per-tenant warmup/catchup signal where relevant (projection lag, aggregate snapshot availability)

Aggregate: Required Changes

1) Expose a Real Command API (Gateway Upstream)

Today, Aggregate has internal command handling types (e.g., CommandServer) but its running HTTP server only exposes health/metrics/admin endpoints (aggregate/http_server.rs, aggregate/server/mod.rs).

Aggregate MUST expose one of the following upstream APIs for the Gateway to call:

  • Option A (Recommended): gRPC server implementing aggregate.gateway.v1.CommandService/SubmitCommand compatible with aggregate.proto.
  • Option B: HTTP endpoint for command submission (REST), with a stable request/response shape that the Gateway can proxy.

2) Tenant Placement Enforcement

Aggregate MUST enforce “hosted tenants” so independent scaling is safe:

  • If an Aggregate shard/node is not assigned a tenant, it MUST reject commands for that tenant (e.g., 403 or 503 with retriable hint depending on whether the issue is authorization vs placement).
  • Aggregate SHOULD maintain an in-memory allowlist of hosted tenants that is driven by:
    • NATS KV placement watcher (preferred), or
    • Hot-reloaded config pushed via /admin/reload

Aggregate already has admin hooks for drain/reload, but they are currently generic and/or illustrative (aggregate/http_server.rs, aggregate/server/mod.rs). These need to become placement-aware.

3) Tenant Drain (Per Tenant)

Aggregate MUST provide a per-tenant drain mechanism to support rebalancing:

  • Stop accepting new commands for the tenant.
  • Allow in-flight commands to finish (bounded wait), then report drained.
  • Expose drain status per tenant (admin endpoint).

4) Rebalancing State Strategy

Aggregate persists snapshots locally (MDBX) and uses JetStream for events. To move a tenant:

  • Approach 1 (Snapshot migration): copy tenant snapshot DB/state to the target shard, then switch placement.
  • Approach 2 (Cold rehydrate): switch placement and let the target shard rebuild state by replaying events from JetStream; expect higher latency during warmup.

The system should support both, with the rebalancer selecting the strategy based on tenant size/SLO.

5) Metrics for Placement Decisions

Aggregate SHOULD expose:

  • Per-tenant command rate, error rate
  • In-flight commands by tenant
  • Rehydrate time / snapshot hit ratio
  • Storage size per tenant (if feasible)

Projection: Required Changes

1) Expose Query API Upstream for Gateway

Projection has a working QueryService with tenant-scoped prefix scans (uqf.rs) but it is not exposed via HTTP/gRPC (current HTTP routes are health/ready/metrics/info only: projection/http/mod.rs).

Projection MUST add one upstream API the Gateway can route to:

  • POST /query/{view_type} (HTTP) accepting x-tenant-id and a UQF payload, returning QueryResponse.
  • Or a gRPC query service (new proto) if gRPC is preferred end-to-end.

2) Tenant Placement Filtering (Independent Scaling)

Projection MUST support running in one of these modes:

  • Multi-tenant shard: consumes all tenants (simple, less isolated).
  • Tenant-filtered shard (required for rebalancing):
    • only consumes/serves queries for the tenants assigned to that shard
    • rejects queries for unassigned tenants (consistent error semantics)

Implementation direction:

  • Add a placement watcher similar to Runners tenant filter (runner/tenant_placement.rs).
  • Apply tenant filter to:
    • event consumption subject filters (preferred), and
    • query serving validation (always).

3) Drain + Warmup Endpoints

Projection SHOULD add:

  • /admin/drain?tenant_id=... (stop consuming new events for that tenant, finish in-flight, flush checkpoints)
  • /admin/reload (apply latest placement/config)
  • Optional warmup status: whether the shard has caught up to JetStream tail for that tenant/view_types

4) Rebalancing Strategy for Projection

Projection can rebalance safely with “warm then cut over”:

  • Assign tenant to the new projection shard while old shard still serves.
  • New shard catches up (replay from JetStream, build view KV).
  • Switch Gateway placement for query routing to new shard.
  • Drain old shard for that tenant and optionally delete old tenant KV keys.

5) Metrics for Placement Decisions

Projection SHOULD expose:

  • JetStream lag per tenant/view_type (tail minus checkpoint)
  • Query latency and scan counts
  • Storage size per tenant (if feasible)

Runner: Required Changes

Runner already has:

To support independent scalability + rebalancing, Runner needs the following.

1) Per-Tenant Drain (Not Only Global)

Runners current drain is global (/admin/drain toggles a single draining flag). Runner MUST support draining a specific tenant:

  • Stop acquiring new saga/effect work for the tenant.
  • Allow in-flight work for the tenant to finish (bounded).
  • Flush outbox for the tenant (or guarantee idempotency on handoff).
  • Persist final checkpoints so another shard can continue without duplication beyond at-least-once bounds.

2) Placement-Enforced Work Acquisition

Runner MUST validate tenant assignment at the boundary where it:

  • consumes JetStream messages (saga triggers, effect commands), and
  • dispatches outbox work.

If a tenant is not assigned to the shard, Runner must not process its work.

3) Handoff Safety Rules for Rebalancing

Runner rebalancing should follow:

  • New shard begins processing only after it is assigned the tenant.
  • Old shard stops acquiring new work for that tenant, then drains.
  • Idempotency remains correct across handoff using checkpoints and dedupe markers.

4) Metrics for Placement Decisions

Runner SHOULD expose:

  • Outbox depth by tenant
  • Work processing latency and retries by tenant/effect
  • Schedule due items by tenant
  • Consumer lag by tenant (if the consumption model supports per-tenant lag)

5) Auth Delivery Side Effects (Email/SMS/Push)

If the platforms AuthN flows require out-of-band delivery (password reset links, email verification, MFA codes), the Runner SHOULD be the standard place to execute those side effects:

  • Define a stable effect interface for sending transactional emails (reset links, verification links, security alerts).
  • Optionally add SMS/push providers later under the same effect contract.

This keeps the Gateway free of long-lived provider credentials and aligns with the existing “effects are executed by workers” pattern.


Gateway Integration Notes

Once the above changes exist:

  • Gateway routes per (tenant_id, service_kind) using independent placement maps.
  • Gateway can implement “warm then cut over” rebalancing for Projection and Runner by switching only query/workflow routing after readiness conditions are met.
  • Gateway can enforce consistent tenant validation, authn/authz, and error semantics at the edge even as placements move.

Gaps / Opportunities

  • KV schema + ownership: define the exact NATS KV bucket layout, key naming, revisioning rules, and who is allowed to write placement updates.
  • Rebalancer API: define operator workflows (plan/apply/rollback), status reporting, and audit log requirements for placement changes.
  • Shard discovery: define how shard endpoints are registered (static config vs KV directory entries) and how health is represented.
  • Consistency boundaries: define rebalancing guarantees per service kind (projection can be warm-cutover; runner requires checkpoint handoff; aggregate requires single-writer and state availability).