407 lines
16 KiB
Markdown
407 lines
16 KiB
Markdown
# Development Plan: Projection Node
|
|
|
|
## Overview
|
|
|
|
This plan breaks down the Projection node implementation into milestones ordered by dependency. Each milestone includes:
|
|
- **Tasks** with clear deliverables
|
|
- **Test Requirements** (unit tests + tautological tests + integration tests where applicable)
|
|
- **Dependencies** on previous milestones
|
|
|
|
**Development Approach:**
|
|
1. Complete one milestone at a time
|
|
2. Write tests before implementation (TDD where applicable)
|
|
3. All tests must pass before moving to the next milestone
|
|
4. Mark tasks complete with `[x]` as you progress
|
|
|
|
---
|
|
|
|
## Milestone 1: Project Foundation
|
|
|
|
**Goal:** Set up the Rust project with proper structure, dependencies, and basic tooling.
|
|
|
|
### Tasks
|
|
- [x] **1.1** Initialize Cargo project
|
|
- Create `src/lib.rs` and `src/main.rs`
|
|
- Configure `Cargo.toml` with madapes registry
|
|
- [x] **1.2** Configure dependencies
|
|
- `edge-storage` (KvStore)
|
|
- `runtime-function` (DAG program execution for `project`)
|
|
- `query-engine` (UQF query support)
|
|
- `edge-logger-client` (structured logs client)
|
|
- `async-nats` (JetStream consumption)
|
|
- `tokio`, `serde`, `serde_json`, `thiserror`, `anyhow`, `tracing`
|
|
- [x] **1.3** Establish initial module layout
|
|
```
|
|
src/
|
|
├── lib.rs
|
|
├── main.rs
|
|
├── types/
|
|
│ ├── mod.rs
|
|
│ ├── id.rs
|
|
│ ├── event.rs
|
|
│ ├── view.rs
|
|
│ ├── checkpoint.rs
|
|
│ └── error.rs
|
|
├── config/
|
|
│ ├── mod.rs
|
|
│ └── settings.rs
|
|
├── storage/
|
|
│ ├── mod.rs
|
|
│ └── kv.rs
|
|
├── stream/
|
|
│ ├── mod.rs
|
|
│ └── jetstream.rs
|
|
├── project/
|
|
│ ├── mod.rs
|
|
│ ├── runtime.rs
|
|
│ └── manifest.rs
|
|
├── query/
|
|
│ ├── mod.rs
|
|
│ └── uqf.rs
|
|
└── observability/
|
|
└── mod.rs
|
|
```
|
|
- [x] **1.4** Configure clippy and rustfmt
|
|
|
|
### Tests
|
|
- [x] **T1.1** Project compiles successfully
|
|
- [x] **T1.2** Dependencies resolve from madapes registry
|
|
- [x] **T1.3** Clippy passes with no warnings
|
|
|
|
---
|
|
|
|
## Milestone 2: Core Types (Envelopes, View Keys, Checkpoints)
|
|
|
|
**Goal:** Define all core types required for event consumption, view persistence, and idempotency.
|
|
|
|
### Dependencies
|
|
- Milestone 1 (project foundation)
|
|
|
|
### Tasks
|
|
- [x] **2.1** Implement `TenantId` type
|
|
- Optional with default empty string for non-multi-tenant setups
|
|
- Display, FromStr, Serialize, Deserialize
|
|
- [x] **2.2** Implement `ViewType` and `ViewId` types
|
|
- String wrappers (consistent with `../aggregate` style: no validation in the type wrapper)
|
|
- Display, FromStr, Serialize, Deserialize
|
|
- [x] **2.3** Implement `ViewKey` composition
|
|
- `view:{tenant_id}:{view_type}:{view_id}`
|
|
- Centralize formatting/parsing in one place
|
|
- [x] **2.4** Implement `CheckpointKey` composition
|
|
- `checkpoint:{tenant_id}:{view_type}`
|
|
- [x] **2.5** Define event envelope type consumed from JetStream
|
|
- `tenant_id`, `aggregate_id`, `aggregate_type`, `event_type`, `payload`, `timestamp`
|
|
- Support forward-compatible decoding (unknown fields ignored)
|
|
- [x] **2.6** Define checkpoint representation
|
|
- Persisted value holds JetStream stream sequence (u64) and optional metadata
|
|
- [x] **2.7** Implement Projection error model
|
|
- Storage errors, stream errors, decode errors, project errors, tenant access errors
|
|
- [x] **2.8** Implement `ProjectionManifest` type
|
|
- Defines projections (`view_type`) and the `project` program reference for each
|
|
- Validates referenced programs exist
|
|
- [ ] **2.9** Add correlation/trace context fields to the event envelope (forward compatible)
|
|
- Support optional `correlation_id` and `traceparent` (or `trace_id`) so logs/traces can be stitched back to Gateway flows
|
|
- Preserve unknown fields for forward compatibility where practical
|
|
|
|
### Tests
|
|
- [x] **T2.1** `TenantId` round-trips serialization and defaults to empty
|
|
- [x] **T2.2** `ViewType`, `ViewId` and key composition produce stable strings
|
|
- [x] **T2.3** Checkpoint encoding/decoding round-trips
|
|
- [x] **T2.4** Envelope decoding handles unknown fields
|
|
- [x] **T2.5** Tautological test: core types are Send + Sync
|
|
|
|
---
|
|
|
|
## Milestone 3: Configuration
|
|
|
|
**Goal:** Implement configuration loading and validation for the Projection node.
|
|
|
|
### Dependencies
|
|
- Milestone 2 (core types)
|
|
|
|
### Tasks
|
|
- [x] **3.1** Define `Settings` struct
|
|
- NATS URL, stream name, subject filter(s)
|
|
- Durable consumer name strategy (per tenant/view)
|
|
- Storage path
|
|
- Multi-tenancy enabled flag + default tenant behavior
|
|
- Backpressure configuration (max in-flight, batching, ack timeout)
|
|
- Manifest path (projection definitions and project program refs)
|
|
- [x] **3.2** Implement config loading from environment
|
|
- [x] **3.3** Implement config loading from file (YAML/TOML/JSON)
|
|
- Environment overrides file
|
|
- [x] **3.4** Implement config validation
|
|
- Required fields present
|
|
- Manifest loads and validates at startup (not inside `Settings::validate`)
|
|
|
|
### Tests
|
|
- [x] **T3.1** Settings loads from environment variables
|
|
- [x] **T3.2** Settings validation catches missing/invalid values
|
|
- [x] **T3.3** Tautological test: Settings is Clone + Debug
|
|
|
|
---
|
|
|
|
## Milestone 4: Storage Layer (KvStore Views + Atomic Checkpoints)
|
|
|
|
**Goal:** Integrate `edge-storage` `KvStore` with transactionally correct view + checkpoint updates.
|
|
|
|
### Dependencies
|
|
- Milestone 2 (core types)
|
|
- Milestone 3 (configuration)
|
|
|
|
### Tasks
|
|
- [x] **4.1** Create `KvClient` wrapper
|
|
- Opens MDBX-backed KvStore at configured path
|
|
- Tenant-aware key composition helpers
|
|
- [x] **4.2** Implement view CRUD primitives
|
|
- `get_view(view_key) -> Option<Value>`
|
|
- `put_view(view_key, value)`
|
|
- `delete_view_prefix(tenant_id, view_type)` for rebuilds
|
|
- [x] **4.3** Implement checkpoint primitives
|
|
- `get_checkpoint(checkpoint_key) -> Option<Sequence>`
|
|
- `put_checkpoint(checkpoint_key, sequence)`
|
|
- [x] **4.4** Implement atomic commit primitive
|
|
- `txn { put_view(s); put_checkpoint }` as one MDBX transaction
|
|
- Expose API that makes it hard to update one without the other
|
|
- [x] **4.5** Optional: storage circuit breaker
|
|
- Protects node from tight retry loops when storage is degraded
|
|
|
|
### Tests
|
|
- [x] **T4.1** View round-trip: put/get returns identical JSON
|
|
- [x] **T4.2** Checkpoint round-trip: put/get returns identical sequence
|
|
- [x] **T4.3** Atomicity: if transaction fails, neither view nor checkpoint is committed
|
|
- [x] **T4.4** Prefix delete removes all keys for tenant/view_type
|
|
|
|
---
|
|
|
|
## Milestone 5: JetStream Consumption (Durable Consumer + Idempotency)
|
|
|
|
**Goal:** Consume events from NATS JetStream with correct delivery semantics and checkpoint-based idempotency.
|
|
|
|
### Dependencies
|
|
- Milestone 4 (storage layer)
|
|
|
|
### Tasks
|
|
- [x] **5.1** Implement JetStream client wrapper
|
|
- Connect to NATS and bind to configured stream
|
|
- Create/bind durable consumer (single filter subject for now)
|
|
- [x] **5.2** Decode messages into event envelopes
|
|
- Extract JetStream stream sequence from message metadata
|
|
- Decode payload into the envelope type
|
|
- [x] **5.3** Implement idempotency gate
|
|
- Load checkpoint for `(tenant_id, view_type)`
|
|
- Skip/ack messages with sequence `<= checkpoint`
|
|
- [x] **5.4** Implement ack discipline
|
|
- Ack only after the MDBX transaction commits
|
|
- Define behavior for transient errors (no ack, allow redelivery)
|
|
- [x] **5.5** Implement poison-message policy
|
|
- Enforced via consumer max-deliver + TERM ack on excessive deliveries and KV quarantine record
|
|
|
|
### Tests
|
|
- [x] **T5.1** Unit test: checkpoint gate skips sequences `<= checkpoint`
|
|
- [x] **T5.2** Unit test: ack is not called when storage commit fails
|
|
- [x] **T5.3** Integration test: JetStream redelivery re-processes unacked message and is made idempotent by checkpoint (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
|
|
|
|
---
|
|
|
|
## Milestone 6: Projection Execution (runtime-function `project` Program)
|
|
|
|
**Goal:** Apply deterministic projection logic to turn `(current_view, event)` into `new_view`.
|
|
|
|
### Dependencies
|
|
- Milestone 5 (stream consumption)
|
|
|
|
### Tasks
|
|
- [x] **6.1** Implement `project` execution wrapper
|
|
- Loads program referenced by manifest
|
|
- Executes deterministically with gas limits + timeouts via `runtime-function`
|
|
- [x] **6.2** Define projection invocation contract
|
|
- Input: `{ current_view, event }`
|
|
- Output: `{ new_view, view_id }` (or equivalent)
|
|
- [x] **6.3** Implement per-message processing pipeline
|
|
- Load current view
|
|
- Execute project program
|
|
- Atomically write new view + checkpoint
|
|
- Ack message
|
|
- [x] **6.4** Enforce per-entity ordering where required
|
|
- Serialize updates per `(tenant_id, view_type, view_id)` if correctness depends on ordering
|
|
|
|
### Tests
|
|
- [x] **T6.1** Unit test: project program transforms input deterministically (same input → same output)
|
|
- [x] **T6.2** Unit test: pipeline writes checkpoint only when view write succeeds
|
|
- [ ] **T6.3** Integration test: concurrent processing does not violate per-key ordering when enabled
|
|
|
|
---
|
|
|
|
## Milestone 7: Query Support (query-engine UQF)
|
|
|
|
**Goal:** Provide query access to stored views using `query-engine` UQF over `KvStore::query()` prefix scans.
|
|
|
|
### Dependencies
|
|
- Milestone 4 (storage layer)
|
|
|
|
### Tasks
|
|
- [x] **7.1** Implement query wrapper around `KvStore` prefix scans
|
|
- Tenant-scoped prefix scanning
|
|
- UQF filtering/sorting support
|
|
- [x] **7.2** Define query interface (library API and/or service endpoint)
|
|
- Ensure tenant isolation for all query paths
|
|
- [x] **7.3** Add query-time safeguards
|
|
- Limits on result size and scan cost
|
|
- Stable pagination strategy (if required)
|
|
|
|
### Tests
|
|
- [x] **T7.1** Unit test: tenant-scoped queries never return other-tenant keys
|
|
- [x] **T7.2** Unit test: UQF filter works on a small in-memory fixture dataset
|
|
|
|
---
|
|
|
|
## Milestone 8: Replay, Rebuild, and Hot Provisioning
|
|
|
|
**Goal:** Implement operational workflows: rebuilds, backfills, rolling upgrades, and safety checks.
|
|
|
|
### Dependencies
|
|
- Milestone 5 (JetStream consumption)
|
|
- Milestone 6 (projection execution)
|
|
|
|
### Tasks
|
|
- [x] **8.1** Implement catch-up mode
|
|
- When no checkpoint exists, start from sequence 1 and process until tail
|
|
- [x] **8.2** Implement rebuild workflow
|
|
- Delete view prefix + checkpoint for `(tenant_id, view_type)`
|
|
- Re-consume from sequence 1 (or chosen seed sequence)
|
|
- [x] **8.3** Implement hot upgrade workflow (versioned views)
|
|
- New `view_type` (or suffix) + independent checkpoint
|
|
- Backfill then cutover routing, then retire old
|
|
- [x] **8.4** Add health/readiness signals
|
|
- Storage reachable
|
|
- NATS reachable
|
|
- Consumer lag below threshold (optional)
|
|
|
|
### Tests
|
|
- [x] **T8.1** Integration test: rebuild from scratch produces identical view as uninterrupted consumption (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
|
|
- [x] **T8.2** Integration test: rolling restart resumes from checkpoint without duplicating results (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
|
|
|
|
---
|
|
|
|
## Milestone 9: Container & Deployment
|
|
|
|
**Goal:** Package as a container and enable predictable local and production deployment.
|
|
|
|
### Dependencies
|
|
- Milestone 8 (replay/rebuild + health/readiness)
|
|
|
|
### Tasks
|
|
- [x] **9.1** Create `docker/Dockerfile.rust`
|
|
- Multi-stage build
|
|
- Minimal runtime image
|
|
- Health check integration
|
|
- [x] **9.2** Create `docker-compose.yml` for local dev
|
|
- Projection container
|
|
- NATS server with JetStream enabled
|
|
- Optional: Grafana, Victoria Metrics, Loki
|
|
- [x] **9.3** Create container entrypoint behavior
|
|
- Config loading
|
|
- Graceful shutdown on SIGTERM
|
|
- Stop pulling new JetStream messages
|
|
- Complete or safely abandon in-flight processing without acking early
|
|
- [x] **9.4** Define environment variables and defaults
|
|
- NATS URL, stream name, subject filters
|
|
- Storage path
|
|
- Consumer naming strategy (durable)
|
|
- Multi-tenancy enabled flag
|
|
- [x] **9.5** Create release build optimization
|
|
- LTO, strip, single codegen unit
|
|
|
|
### Tests
|
|
- [x] **T9.1** Container builds successfully
|
|
```bash
|
|
docker build -f docker/Dockerfile.rust --build-arg PACKAGE=projection --build-arg BIN=projection -t cloudlysis/projection:local .
|
|
docker run cloudlysis/projection:local --help
|
|
```
|
|
- [x] **T9.2** Container starts with valid config
|
|
```bash
|
|
docker run -e PROJECTION_NATS_URL=nats://nats:4222 cloudlysis/projection:local
|
|
```
|
|
|
|
---
|
|
|
|
## Milestone 10: Provisioning, Scalability, and Docker Swarm Deployment
|
|
|
|
**Goal:** Support horizontal scaling and safe rollouts in Docker Swarm with clear provisioning semantics for JetStream consumers.
|
|
|
|
### Dependencies
|
|
- Milestone 9 (container & deployment)
|
|
|
|
### Tasks
|
|
- [x] **10.1** Define the scaling model for JetStream consumption
|
|
- Use a durable consumer per `(view_type, shard)` so multiple replicas can share the same consumer workload
|
|
- Define the subject filter(s) used by each consumer (tenant wildcard vs tenant-range sharded)
|
|
- Document consumer configuration requirements (ack policy, max in-flight, replay policy)
|
|
- [x] **10.2** Implement replica-safe consumption
|
|
- Multiple replicas pulling from the same durable consumer distribute work
|
|
- Enforce per-key serialization if required for correctness
|
|
- [x] **10.3** Add tenant-aware provisioning option (sharding)
|
|
- Optional tenant-range sharding by subject filters (e.g., `tenant.<range>.*`)
|
|
- Placement constraints for Swarm nodes (e.g., `node.labels.tenant_range==<range>`)
|
|
- Strategy for adding/removing shards
|
|
- [x] **10.4** Create Swarm stack definition (`swarm/stacks/platform.yml`)
|
|
- Service definition
|
|
- Replicas configuration
|
|
- Resource limits (CPU, memory)
|
|
- Health check integration
|
|
- Storage volume mapping for `edge-storage` data directory
|
|
- [x] **10.5** Define rollout strategy
|
|
- Rolling update parameters
|
|
- Backfill/cutover strategy for versioned `view_type` upgrades
|
|
- Safe rollback story (old view still present + routing switch back)
|
|
|
|
### Tests
|
|
- [x] **T10.1** Stack file valid
|
|
```bash
|
|
docker stack config -c swarm/stacks/platform.yml
|
|
```
|
|
- [x] **T10.2** Scale-out does not duplicate work (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
|
|
- Start 2+ replicas pulling from the same durable consumer
|
|
- Verify the checkpoint is monotonic and view updates are not applied twice for the same sequence
|
|
- [x] **T10.3** Rolling restart preserves correctness (ignored by default; run with `PROJECTION_TEST_NATS_URL=... cargo test -- --ignored`)
|
|
- Restart replicas during active consumption
|
|
- Verify idempotency holds (no view corruption, checkpoint monotonic)
|
|
|
|
---
|
|
|
|
## Milestone 11: Operational Endpoints & Observability
|
|
|
|
**Goal:** Provide the minimum operational surface required to provision, scale, and monitor Projection nodes in production.
|
|
|
|
### Dependencies
|
|
- Milestone 9 (container & deployment)
|
|
- Milestone 10 (provisioning & scaling semantics)
|
|
|
|
### Tasks
|
|
- [x] **11.1** Implement `/health` endpoint
|
|
- Process is up
|
|
- Storage opened successfully
|
|
- [x] **11.2** Implement `/ready` endpoint
|
|
- NATS connection established
|
|
- JetStream consumer bound
|
|
- Storage writable
|
|
- [x] **11.3** Implement `/metrics` endpoint (Prometheus)
|
|
- Consumer lag (stream sequence - checkpoint)
|
|
- Processing throughput and latency
|
|
- Redelivery count / ack failures
|
|
- Storage commit failures
|
|
- [x] **11.4** Add build/runtime identity
|
|
- Version, commit hash (if available), configured `view_type` set
|
|
- [x] **11.5** Add graceful drain behavior for rollouts
|
|
- Report “not ready” before shutdown
|
|
- Stop pulling, wait for in-flight work up to a timeout
|
|
- [ ] **11.6** Correlation-aware logging for investigations
|
|
- When processing messages, include `tenant_id`, `correlation_id`, and `trace_id` in structured logs/spans when present in the envelope/headers
|
|
|
|
### Tests
|
|
- [x] **T11.1** Readiness fails when NATS is unavailable
|
|
- [x] **T11.2** Metrics include lag gauge and counters
|
|
- [x] **T11.3** Drain transitions ready → not ready before exit
|
|
- [ ] **T11.4** Unit test: envelope decoding accepts optional correlation/trace fields and exposes them for logging
|