# Development Plan: Projection Node ## Overview This plan breaks down the Projection node implementation into milestones ordered by dependency. Each milestone includes: - **Tasks** with clear deliverables - **Test Requirements** (unit tests + tautological tests + integration tests where applicable) - **Dependencies** on previous milestones **Development Approach:** 1. Complete one milestone at a time 2. Write tests before implementation (TDD where applicable) 3. All tests must pass before moving to the next milestone 4. Mark tasks complete with `[x]` as you progress --- ## Milestone 1: Project Foundation **Goal:** Set up the Rust project with proper structure, dependencies, and basic tooling. ### Tasks - [x] **1.1** Initialize Cargo project - Create `src/lib.rs` and `src/main.rs` - Configure `Cargo.toml` with madapes registry - [x] **1.2** Configure dependencies - `edge-storage` (KvStore) - `runtime-function` (DAG program execution for `project`) - `query-engine` (UQF query support) - `edge-logger-client` (structured logs client) - `async-nats` (JetStream consumption) - `tokio`, `serde`, `serde_json`, `thiserror`, `anyhow`, `tracing` - [x] **1.3** Establish initial module layout ``` src/ ├── lib.rs ├── main.rs ├── types/ │ ├── mod.rs │ ├── id.rs │ ├── event.rs │ ├── view.rs │ ├── checkpoint.rs │ └── error.rs ├── config/ │ ├── mod.rs │ └── settings.rs ├── storage/ │ ├── mod.rs │ └── kv.rs ├── stream/ │ ├── mod.rs │ └── jetstream.rs ├── project/ │ ├── mod.rs │ ├── runtime.rs │ └── manifest.rs ├── query/ │ ├── mod.rs │ └── uqf.rs └── observability/ └── mod.rs ``` - [x] **1.4** Configure clippy and rustfmt ### Tests - [x] **T1.1** Project compiles successfully - [x] **T1.2** Dependencies resolve from madapes registry - [x] **T1.3** Clippy passes with no warnings --- ## Milestone 2: Core Types (Envelopes, View Keys, Checkpoints) **Goal:** Define all core types required for event consumption, view persistence, and idempotency. ### Dependencies - Milestone 1 (project foundation) ### Tasks - [x] **2.1** Implement `TenantId` type - Optional with default empty string for non-multi-tenant setups - Display, FromStr, Serialize, Deserialize - [x] **2.2** Implement `ViewType` and `ViewId` types - String wrappers (consistent with `../aggregate` style: no validation in the type wrapper) - Display, FromStr, Serialize, Deserialize - [x] **2.3** Implement `ViewKey` composition - `view:{tenant_id}:{view_type}:{view_id}` - Centralize formatting/parsing in one place - [x] **2.4** Implement `CheckpointKey` composition - `checkpoint:{tenant_id}:{view_type}` - [x] **2.5** Define event envelope type consumed from JetStream - `tenant_id`, `aggregate_id`, `aggregate_type`, `event_type`, `payload`, `timestamp` - Support forward-compatible decoding (unknown fields ignored) - [x] **2.6** Define checkpoint representation - Persisted value holds JetStream stream sequence (u64) and optional metadata - [x] **2.7** Implement Projection error model - Storage errors, stream errors, decode errors, project errors, tenant access errors - [x] **2.8** Implement `ProjectionManifest` type - Defines projections (`view_type`) and the `project` program reference for each - Validates referenced programs exist - [ ] **2.9** Add correlation/trace context fields to the event envelope (forward compatible) - Support optional `correlation_id` and `traceparent` (or `trace_id`) so logs/traces can be stitched back to Gateway flows - Preserve unknown fields for forward compatibility where practical ### Tests - [x] **T2.1** `TenantId` round-trips serialization and defaults to empty - [x] **T2.2** `ViewType`, `ViewId` and key composition produce stable strings - [x] **T2.3** Checkpoint encoding/decoding round-trips - [x] **T2.4** Envelope decoding handles unknown fields - [x] **T2.5** Tautological test: core types are Send + Sync --- ## Milestone 3: Configuration **Goal:** Implement configuration loading and validation for the Projection node. ### Dependencies - Milestone 2 (core types) ### Tasks - [x] **3.1** Define `Settings` struct - NATS URL, stream name, subject filter(s) - Durable consumer name strategy (per tenant/view) - Storage path - Multi-tenancy enabled flag + default tenant behavior - Backpressure configuration (max in-flight, batching, ack timeout) - Manifest path (projection definitions and project program refs) - [x] **3.2** Implement config loading from environment - [x] **3.3** Implement config loading from file (YAML/TOML/JSON) - Environment overrides file - [x] **3.4** Implement config validation - Required fields present - Manifest loads and validates at startup (not inside `Settings::validate`) ### Tests - [x] **T3.1** Settings loads from environment variables - [x] **T3.2** Settings validation catches missing/invalid values - [x] **T3.3** Tautological test: Settings is Clone + Debug --- ## Milestone 4: Storage Layer (KvStore Views + Atomic Checkpoints) **Goal:** Integrate `edge-storage` `KvStore` with transactionally correct view + checkpoint updates. ### Dependencies - Milestone 2 (core types) - Milestone 3 (configuration) ### Tasks - [x] **4.1** Create `KvClient` wrapper - Opens MDBX-backed KvStore at configured path - Tenant-aware key composition helpers - [x] **4.2** Implement view CRUD primitives - `get_view(view_key) -> Option` - `put_view(view_key, value)` - `delete_view_prefix(tenant_id, view_type)` for rebuilds - [x] **4.3** Implement checkpoint primitives - `get_checkpoint(checkpoint_key) -> Option` - `put_checkpoint(checkpoint_key, sequence)` - [x] **4.4** Implement atomic commit primitive - `txn { put_view(s); put_checkpoint }` as one MDBX transaction - Expose API that makes it hard to update one without the other - [x] **4.5** Optional: storage circuit breaker - Protects node from tight retry loops when storage is degraded ### Tests - [x] **T4.1** View round-trip: put/get returns identical JSON - [x] **T4.2** Checkpoint round-trip: put/get returns identical sequence - [x] **T4.3** Atomicity: if transaction fails, neither view nor checkpoint is committed - [x] **T4.4** Prefix delete removes all keys for tenant/view_type --- ## Milestone 5: JetStream Consumption (Durable Consumer + Idempotency) **Goal:** Consume events from NATS JetStream with correct delivery semantics and checkpoint-based idempotency. ### Dependencies - Milestone 4 (storage layer) ### Tasks - [x] **5.1** Implement JetStream client wrapper - Connect to NATS and bind to configured stream - Create/bind durable consumer (single filter subject for now) - [x] **5.2** Decode messages into event envelopes - Extract JetStream stream sequence from message metadata - Decode payload into the envelope type - [x] **5.3** Implement idempotency gate - Load checkpoint for `(tenant_id, view_type)` - Skip/ack messages with sequence `<= checkpoint` - [x] **5.4** Implement ack discipline - Ack only after the MDBX transaction commits - Define behavior for transient errors (no ack, allow redelivery) - [x] **5.5** Implement poison-message policy - Enforced via consumer max-deliver + TERM ack on excessive deliveries and KV quarantine record ### Tests - [x] **T5.1** Unit test: checkpoint gate skips sequences `<= checkpoint` - [x] **T5.2** Unit test: ack is not called when storage commit fails - [x] **T5.3** Integration test: JetStream redelivery re-processes unacked message and is made idempotent by checkpoint (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`) --- ## Milestone 6: Projection Execution (runtime-function `project` Program) **Goal:** Apply deterministic projection logic to turn `(current_view, event)` into `new_view`. ### Dependencies - Milestone 5 (stream consumption) ### Tasks - [x] **6.1** Implement `project` execution wrapper - Loads program referenced by manifest - Executes deterministically with gas limits + timeouts via `runtime-function` - [x] **6.2** Define projection invocation contract - Input: `{ current_view, event }` - Output: `{ new_view, view_id }` (or equivalent) - [x] **6.3** Implement per-message processing pipeline - Load current view - Execute project program - Atomically write new view + checkpoint - Ack message - [x] **6.4** Enforce per-entity ordering where required - Serialize updates per `(tenant_id, view_type, view_id)` if correctness depends on ordering ### Tests - [x] **T6.1** Unit test: project program transforms input deterministically (same input → same output) - [x] **T6.2** Unit test: pipeline writes checkpoint only when view write succeeds - [ ] **T6.3** Integration test: concurrent processing does not violate per-key ordering when enabled --- ## Milestone 7: Query Support (query-engine UQF) **Goal:** Provide query access to stored views using `query-engine` UQF over `KvStore::query()` prefix scans. ### Dependencies - Milestone 4 (storage layer) ### Tasks - [x] **7.1** Implement query wrapper around `KvStore` prefix scans - Tenant-scoped prefix scanning - UQF filtering/sorting support - [x] **7.2** Define query interface (library API and/or service endpoint) - Ensure tenant isolation for all query paths - [x] **7.3** Add query-time safeguards - Limits on result size and scan cost - Stable pagination strategy (if required) ### Tests - [x] **T7.1** Unit test: tenant-scoped queries never return other-tenant keys - [x] **T7.2** Unit test: UQF filter works on a small in-memory fixture dataset --- ## Milestone 8: Replay, Rebuild, and Hot Provisioning **Goal:** Implement operational workflows: rebuilds, backfills, rolling upgrades, and safety checks. ### Dependencies - Milestone 5 (JetStream consumption) - Milestone 6 (projection execution) ### Tasks - [x] **8.1** Implement catch-up mode - When no checkpoint exists, start from sequence 1 and process until tail - [x] **8.2** Implement rebuild workflow - Delete view prefix + checkpoint for `(tenant_id, view_type)` - Re-consume from sequence 1 (or chosen seed sequence) - [x] **8.3** Implement hot upgrade workflow (versioned views) - New `view_type` (or suffix) + independent checkpoint - Backfill then cutover routing, then retire old - [x] **8.4** Add health/readiness signals - Storage reachable - NATS reachable - Consumer lag below threshold (optional) ### Tests - [x] **T8.1** Integration test: rebuild from scratch produces identical view as uninterrupted consumption (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`) - [x] **T8.2** Integration test: rolling restart resumes from checkpoint without duplicating results (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`) --- ## Milestone 9: Container & Deployment **Goal:** Package as a container and enable predictable local and production deployment. ### Dependencies - Milestone 8 (replay/rebuild + health/readiness) ### Tasks - [x] **9.1** Create `docker/Dockerfile.rust` - Multi-stage build - Minimal runtime image - Health check integration - [x] **9.2** Create `docker-compose.yml` for local dev - Projection container - NATS server with JetStream enabled - Optional: Grafana, Victoria Metrics, Loki - [x] **9.3** Create container entrypoint behavior - Config loading - Graceful shutdown on SIGTERM - Stop pulling new JetStream messages - Complete or safely abandon in-flight processing without acking early - [x] **9.4** Define environment variables and defaults - NATS URL, stream name, subject filters - Storage path - Consumer naming strategy (durable) - Multi-tenancy enabled flag - [x] **9.5** Create release build optimization - LTO, strip, single codegen unit ### Tests - [x] **T9.1** Container builds successfully ```bash docker build -f docker/Dockerfile.rust --build-arg PACKAGE=projection --build-arg BIN=projection -t cloudlysis/projection:local . docker run cloudlysis/projection:local --help ``` - [x] **T9.2** Container starts with valid config ```bash docker run -e PROJECTION_NATS_URL=nats://nats:4222 cloudlysis/projection:local ``` --- ## Milestone 10: Provisioning, Scalability, and Docker Swarm Deployment **Goal:** Support horizontal scaling and safe rollouts in Docker Swarm with clear provisioning semantics for JetStream consumers. ### Dependencies - Milestone 9 (container & deployment) ### Tasks - [x] **10.1** Define the scaling model for JetStream consumption - Use a durable consumer per `(view_type, shard)` so multiple replicas can share the same consumer workload - Define the subject filter(s) used by each consumer (tenant wildcard vs tenant-range sharded) - Document consumer configuration requirements (ack policy, max in-flight, replay policy) - [x] **10.2** Implement replica-safe consumption - Multiple replicas pulling from the same durable consumer distribute work - Enforce per-key serialization if required for correctness - [x] **10.3** Add tenant-aware provisioning option (sharding) - Optional tenant-range sharding by subject filters (e.g., `tenant..*`) - Placement constraints for Swarm nodes (e.g., `node.labels.tenant_range==`) - Strategy for adding/removing shards - [x] **10.4** Create Swarm stack definition (`swarm/stacks/platform.yml`) - Service definition - Replicas configuration - Resource limits (CPU, memory) - Health check integration - Storage volume mapping for `edge-storage` data directory - [x] **10.5** Define rollout strategy - Rolling update parameters - Backfill/cutover strategy for versioned `view_type` upgrades - Safe rollback story (old view still present + routing switch back) ### Tests - [x] **T10.1** Stack file valid ```bash docker stack config -c swarm/stacks/platform.yml ``` - [x] **T10.2** Scale-out does not duplicate work (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`) - Start 2+ replicas pulling from the same durable consumer - Verify the checkpoint is monotonic and view updates are not applied twice for the same sequence - [x] **T10.3** Rolling restart preserves correctness (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`) - Restart replicas during active consumption - Verify idempotency holds (no view corruption, checkpoint monotonic) --- ## Milestone 11: Operational Endpoints & Observability **Goal:** Provide the minimum operational surface required to provision, scale, and monitor Projection nodes in production. ### Dependencies - Milestone 9 (container & deployment) - Milestone 10 (provisioning & scaling semantics) ### Tasks - [x] **11.1** Implement `/health` endpoint - Process is up - Storage opened successfully - [x] **11.2** Implement `/ready` endpoint - NATS connection established - JetStream consumer bound - Storage writable - [x] **11.3** Implement `/metrics` endpoint (Prometheus) - Consumer lag (stream sequence - checkpoint) - Processing throughput and latency - Redelivery count / ack failures - Storage commit failures - [x] **11.4** Add build/runtime identity - Version, commit hash (if available), configured `view_type` set - [x] **11.5** Add graceful drain behavior for rollouts - Report “not ready” before shutdown - Stop pulling, wait for in-flight work up to a timeout - [ ] **11.6** Correlation-aware logging for investigations - When processing messages, include `tenant_id`, `correlation_id`, and `trace_id` in structured logs/spans when present in the envelope/headers ### Tests - [x] **T11.1** Readiness fails when NATS is unavailable - [x] **T11.2** Metrics include lag gauge and counters - [x] **T11.3** Drain transitions ready → not ready before exit - [ ] **T11.4** Unit test: envelope decoding accepts optional correlation/trace fields and exposes them for logging