Files
cloudlysis/projection/DEVELOPMENT_PLAN.md
Vlad Durnea 1298d9a3df
Some checks failed
ci / rust (push) Failing after 2m34s
ci / ui (push) Failing after 30s
Monorepo consolidation: workspace, shared types, transport plans, docker/swam assets
2026-03-30 11:40:42 +03:00

407 lines
16 KiB
Markdown

# Development Plan: Projection Node
## Overview
This plan breaks down the Projection node implementation into milestones ordered by dependency. Each milestone includes:
- **Tasks** with clear deliverables
- **Test Requirements** (unit tests + tautological tests + integration tests where applicable)
- **Dependencies** on previous milestones
**Development Approach:**
1. Complete one milestone at a time
2. Write tests before implementation (TDD where applicable)
3. All tests must pass before moving to the next milestone
4. Mark tasks complete with `[x]` as you progress
---
## Milestone 1: Project Foundation
**Goal:** Set up the Rust project with proper structure, dependencies, and basic tooling.
### Tasks
- [x] **1.1** Initialize Cargo project
- Create `src/lib.rs` and `src/main.rs`
- Configure `Cargo.toml` with madapes registry
- [x] **1.2** Configure dependencies
- `edge-storage` (KvStore)
- `runtime-function` (DAG program execution for `project`)
- `query-engine` (UQF query support)
- `edge-logger-client` (structured logs client)
- `async-nats` (JetStream consumption)
- `tokio`, `serde`, `serde_json`, `thiserror`, `anyhow`, `tracing`
- [x] **1.3** Establish initial module layout
```
src/
├── lib.rs
├── main.rs
├── types/
│ ├── mod.rs
│ ├── id.rs
│ ├── event.rs
│ ├── view.rs
│ ├── checkpoint.rs
│ └── error.rs
├── config/
│ ├── mod.rs
│ └── settings.rs
├── storage/
│ ├── mod.rs
│ └── kv.rs
├── stream/
│ ├── mod.rs
│ └── jetstream.rs
├── project/
│ ├── mod.rs
│ ├── runtime.rs
│ └── manifest.rs
├── query/
│ ├── mod.rs
│ └── uqf.rs
└── observability/
└── mod.rs
```
- [x] **1.4** Configure clippy and rustfmt
### Tests
- [x] **T1.1** Project compiles successfully
- [x] **T1.2** Dependencies resolve from madapes registry
- [x] **T1.3** Clippy passes with no warnings
---
## Milestone 2: Core Types (Envelopes, View Keys, Checkpoints)
**Goal:** Define all core types required for event consumption, view persistence, and idempotency.
### Dependencies
- Milestone 1 (project foundation)
### Tasks
- [x] **2.1** Implement `TenantId` type
- Optional with default empty string for non-multi-tenant setups
- Display, FromStr, Serialize, Deserialize
- [x] **2.2** Implement `ViewType` and `ViewId` types
- String wrappers (consistent with `../aggregate` style: no validation in the type wrapper)
- Display, FromStr, Serialize, Deserialize
- [x] **2.3** Implement `ViewKey` composition
- `view:{tenant_id}:{view_type}:{view_id}`
- Centralize formatting/parsing in one place
- [x] **2.4** Implement `CheckpointKey` composition
- `checkpoint:{tenant_id}:{view_type}`
- [x] **2.5** Define event envelope type consumed from JetStream
- `tenant_id`, `aggregate_id`, `aggregate_type`, `event_type`, `payload`, `timestamp`
- Support forward-compatible decoding (unknown fields ignored)
- [x] **2.6** Define checkpoint representation
- Persisted value holds JetStream stream sequence (u64) and optional metadata
- [x] **2.7** Implement Projection error model
- Storage errors, stream errors, decode errors, project errors, tenant access errors
- [x] **2.8** Implement `ProjectionManifest` type
- Defines projections (`view_type`) and the `project` program reference for each
- Validates referenced programs exist
- [ ] **2.9** Add correlation/trace context fields to the event envelope (forward compatible)
- Support optional `correlation_id` and `traceparent` (or `trace_id`) so logs/traces can be stitched back to Gateway flows
- Preserve unknown fields for forward compatibility where practical
### Tests
- [x] **T2.1** `TenantId` round-trips serialization and defaults to empty
- [x] **T2.2** `ViewType`, `ViewId` and key composition produce stable strings
- [x] **T2.3** Checkpoint encoding/decoding round-trips
- [x] **T2.4** Envelope decoding handles unknown fields
- [x] **T2.5** Tautological test: core types are Send + Sync
---
## Milestone 3: Configuration
**Goal:** Implement configuration loading and validation for the Projection node.
### Dependencies
- Milestone 2 (core types)
### Tasks
- [x] **3.1** Define `Settings` struct
- NATS URL, stream name, subject filter(s)
- Durable consumer name strategy (per tenant/view)
- Storage path
- Multi-tenancy enabled flag + default tenant behavior
- Backpressure configuration (max in-flight, batching, ack timeout)
- Manifest path (projection definitions and project program refs)
- [x] **3.2** Implement config loading from environment
- [x] **3.3** Implement config loading from file (YAML/TOML/JSON)
- Environment overrides file
- [x] **3.4** Implement config validation
- Required fields present
- Manifest loads and validates at startup (not inside `Settings::validate`)
### Tests
- [x] **T3.1** Settings loads from environment variables
- [x] **T3.2** Settings validation catches missing/invalid values
- [x] **T3.3** Tautological test: Settings is Clone + Debug
---
## Milestone 4: Storage Layer (KvStore Views + Atomic Checkpoints)
**Goal:** Integrate `edge-storage` `KvStore` with transactionally correct view + checkpoint updates.
### Dependencies
- Milestone 2 (core types)
- Milestone 3 (configuration)
### Tasks
- [x] **4.1** Create `KvClient` wrapper
- Opens MDBX-backed KvStore at configured path
- Tenant-aware key composition helpers
- [x] **4.2** Implement view CRUD primitives
- `get_view(view_key) -> Option<Value>`
- `put_view(view_key, value)`
- `delete_view_prefix(tenant_id, view_type)` for rebuilds
- [x] **4.3** Implement checkpoint primitives
- `get_checkpoint(checkpoint_key) -> Option<Sequence>`
- `put_checkpoint(checkpoint_key, sequence)`
- [x] **4.4** Implement atomic commit primitive
- `txn { put_view(s); put_checkpoint }` as one MDBX transaction
- Expose API that makes it hard to update one without the other
- [x] **4.5** Optional: storage circuit breaker
- Protects node from tight retry loops when storage is degraded
### Tests
- [x] **T4.1** View round-trip: put/get returns identical JSON
- [x] **T4.2** Checkpoint round-trip: put/get returns identical sequence
- [x] **T4.3** Atomicity: if transaction fails, neither view nor checkpoint is committed
- [x] **T4.4** Prefix delete removes all keys for tenant/view_type
---
## Milestone 5: JetStream Consumption (Durable Consumer + Idempotency)
**Goal:** Consume events from NATS JetStream with correct delivery semantics and checkpoint-based idempotency.
### Dependencies
- Milestone 4 (storage layer)
### Tasks
- [x] **5.1** Implement JetStream client wrapper
- Connect to NATS and bind to configured stream
- Create/bind durable consumer (single filter subject for now)
- [x] **5.2** Decode messages into event envelopes
- Extract JetStream stream sequence from message metadata
- Decode payload into the envelope type
- [x] **5.3** Implement idempotency gate
- Load checkpoint for `(tenant_id, view_type)`
- Skip/ack messages with sequence `<= checkpoint`
- [x] **5.4** Implement ack discipline
- Ack only after the MDBX transaction commits
- Define behavior for transient errors (no ack, allow redelivery)
- [x] **5.5** Implement poison-message policy
- Enforced via consumer max-deliver + TERM ack on excessive deliveries and KV quarantine record
### Tests
- [x] **T5.1** Unit test: checkpoint gate skips sequences `<= checkpoint`
- [x] **T5.2** Unit test: ack is not called when storage commit fails
- [x] **T5.3** Integration test: JetStream redelivery re-processes unacked message and is made idempotent by checkpoint (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
---
## Milestone 6: Projection Execution (runtime-function `project` Program)
**Goal:** Apply deterministic projection logic to turn `(current_view, event)` into `new_view`.
### Dependencies
- Milestone 5 (stream consumption)
### Tasks
- [x] **6.1** Implement `project` execution wrapper
- Loads program referenced by manifest
- Executes deterministically with gas limits + timeouts via `runtime-function`
- [x] **6.2** Define projection invocation contract
- Input: `{ current_view, event }`
- Output: `{ new_view, view_id }` (or equivalent)
- [x] **6.3** Implement per-message processing pipeline
- Load current view
- Execute project program
- Atomically write new view + checkpoint
- Ack message
- [x] **6.4** Enforce per-entity ordering where required
- Serialize updates per `(tenant_id, view_type, view_id)` if correctness depends on ordering
### Tests
- [x] **T6.1** Unit test: project program transforms input deterministically (same input → same output)
- [x] **T6.2** Unit test: pipeline writes checkpoint only when view write succeeds
- [ ] **T6.3** Integration test: concurrent processing does not violate per-key ordering when enabled
---
## Milestone 7: Query Support (query-engine UQF)
**Goal:** Provide query access to stored views using `query-engine` UQF over `KvStore::query()` prefix scans.
### Dependencies
- Milestone 4 (storage layer)
### Tasks
- [x] **7.1** Implement query wrapper around `KvStore` prefix scans
- Tenant-scoped prefix scanning
- UQF filtering/sorting support
- [x] **7.2** Define query interface (library API and/or service endpoint)
- Ensure tenant isolation for all query paths
- [x] **7.3** Add query-time safeguards
- Limits on result size and scan cost
- Stable pagination strategy (if required)
### Tests
- [x] **T7.1** Unit test: tenant-scoped queries never return other-tenant keys
- [x] **T7.2** Unit test: UQF filter works on a small in-memory fixture dataset
---
## Milestone 8: Replay, Rebuild, and Hot Provisioning
**Goal:** Implement operational workflows: rebuilds, backfills, rolling upgrades, and safety checks.
### Dependencies
- Milestone 5 (JetStream consumption)
- Milestone 6 (projection execution)
### Tasks
- [x] **8.1** Implement catch-up mode
- When no checkpoint exists, start from sequence 1 and process until tail
- [x] **8.2** Implement rebuild workflow
- Delete view prefix + checkpoint for `(tenant_id, view_type)`
- Re-consume from sequence 1 (or chosen seed sequence)
- [x] **8.3** Implement hot upgrade workflow (versioned views)
- New `view_type` (or suffix) + independent checkpoint
- Backfill then cutover routing, then retire old
- [x] **8.4** Add health/readiness signals
- Storage reachable
- NATS reachable
- Consumer lag below threshold (optional)
### Tests
- [x] **T8.1** Integration test: rebuild from scratch produces identical view as uninterrupted consumption (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
- [x] **T8.2** Integration test: rolling restart resumes from checkpoint without duplicating results (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
---
## Milestone 9: Container & Deployment
**Goal:** Package as a container and enable predictable local and production deployment.
### Dependencies
- Milestone 8 (replay/rebuild + health/readiness)
### Tasks
- [x] **9.1** Create `docker/Dockerfile.rust`
- Multi-stage build
- Minimal runtime image
- Health check integration
- [x] **9.2** Create `docker-compose.yml` for local dev
- Projection container
- NATS server with JetStream enabled
- Optional: Grafana, Victoria Metrics, Loki
- [x] **9.3** Create container entrypoint behavior
- Config loading
- Graceful shutdown on SIGTERM
- Stop pulling new JetStream messages
- Complete or safely abandon in-flight processing without acking early
- [x] **9.4** Define environment variables and defaults
- NATS URL, stream name, subject filters
- Storage path
- Consumer naming strategy (durable)
- Multi-tenancy enabled flag
- [x] **9.5** Create release build optimization
- LTO, strip, single codegen unit
### Tests
- [x] **T9.1** Container builds successfully
```bash
docker build -f docker/Dockerfile.rust --build-arg PACKAGE=projection --build-arg BIN=projection -t cloudlysis/projection:local .
docker run cloudlysis/projection:local --help
```
- [x] **T9.2** Container starts with valid config
```bash
docker run -e PROJECTION_NATS_URL=nats://nats:4222 cloudlysis/projection:local
```
---
## Milestone 10: Provisioning, Scalability, and Docker Swarm Deployment
**Goal:** Support horizontal scaling and safe rollouts in Docker Swarm with clear provisioning semantics for JetStream consumers.
### Dependencies
- Milestone 9 (container & deployment)
### Tasks
- [x] **10.1** Define the scaling model for JetStream consumption
- Use a durable consumer per `(view_type, shard)` so multiple replicas can share the same consumer workload
- Define the subject filter(s) used by each consumer (tenant wildcard vs tenant-range sharded)
- Document consumer configuration requirements (ack policy, max in-flight, replay policy)
- [x] **10.2** Implement replica-safe consumption
- Multiple replicas pulling from the same durable consumer distribute work
- Enforce per-key serialization if required for correctness
- [x] **10.3** Add tenant-aware provisioning option (sharding)
- Optional tenant-range sharding by subject filters (e.g., `tenant.<range>.*`)
- Placement constraints for Swarm nodes (e.g., `node.labels.tenant_range==<range>`)
- Strategy for adding/removing shards
- [x] **10.4** Create Swarm stack definition (`swarm/stacks/platform.yml`)
- Service definition
- Replicas configuration
- Resource limits (CPU, memory)
- Health check integration
- Storage volume mapping for `edge-storage` data directory
- [x] **10.5** Define rollout strategy
- Rolling update parameters
- Backfill/cutover strategy for versioned `view_type` upgrades
- Safe rollback story (old view still present + routing switch back)
### Tests
- [x] **T10.1** Stack file valid
```bash
docker stack config -c swarm/stacks/platform.yml
```
- [x] **T10.2** Scale-out does not duplicate work (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
- Start 2+ replicas pulling from the same durable consumer
- Verify the checkpoint is monotonic and view updates are not applied twice for the same sequence
- [x] **T10.3** Rolling restart preserves correctness (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
- Restart replicas during active consumption
- Verify idempotency holds (no view corruption, checkpoint monotonic)
---
## Milestone 11: Operational Endpoints & Observability
**Goal:** Provide the minimum operational surface required to provision, scale, and monitor Projection nodes in production.
### Dependencies
- Milestone 9 (container & deployment)
- Milestone 10 (provisioning & scaling semantics)
### Tasks
- [x] **11.1** Implement `/health` endpoint
- Process is up
- Storage opened successfully
- [x] **11.2** Implement `/ready` endpoint
- NATS connection established
- JetStream consumer bound
- Storage writable
- [x] **11.3** Implement `/metrics` endpoint (Prometheus)
- Consumer lag (stream sequence - checkpoint)
- Processing throughput and latency
- Redelivery count / ack failures
- Storage commit failures
- [x] **11.4** Add build/runtime identity
- Version, commit hash (if available), configured `view_type` set
- [x] **11.5** Add graceful drain behavior for rollouts
- Report “not ready” before shutdown
- Stop pulling, wait for in-flight work up to a timeout
- [ ] **11.6** Correlation-aware logging for investigations
- When processing messages, include `tenant_id`, `correlation_id`, and `trace_id` in structured logs/spans when present in the envelope/headers
### Tests
- [x] **T11.1** Readiness fails when NATS is unavailable
- [x] **T11.2** Metrics include lag gauge and counters
- [x] **T11.3** Drain transitions ready → not ready before exit
- [ ] **T11.4** Unit test: envelope decoding accepts optional correlation/trace fields and exposes them for logging