16 KiB
Development Plan: Projection Node
Overview
This plan breaks down the Projection node implementation into milestones ordered by dependency. Each milestone includes:
- Tasks with clear deliverables
- Test Requirements (unit tests + tautological tests + integration tests where applicable)
- Dependencies on previous milestones
Development Approach:
- Complete one milestone at a time
- Write tests before implementation (TDD where applicable)
- All tests must pass before moving to the next milestone
- Mark tasks complete with
[x]as you progress
Milestone 1: Project Foundation
Goal: Set up the Rust project with proper structure, dependencies, and basic tooling.
Tasks
- 1.1 Initialize Cargo project
- Create
src/lib.rsandsrc/main.rs - Configure
Cargo.tomlwith madapes registry
- Create
- 1.2 Configure dependencies
edge-storage(KvStore)runtime-function(DAG program execution forproject)query-engine(UQF query support)edge-logger-client(structured logs client)async-nats(JetStream consumption)tokio,serde,serde_json,thiserror,anyhow,tracing
- 1.3 Establish initial module layout
src/ ├── lib.rs ├── main.rs ├── types/ │ ├── mod.rs │ ├── id.rs │ ├── event.rs │ ├── view.rs │ ├── checkpoint.rs │ └── error.rs ├── config/ │ ├── mod.rs │ └── settings.rs ├── storage/ │ ├── mod.rs │ └── kv.rs ├── stream/ │ ├── mod.rs │ └── jetstream.rs ├── project/ │ ├── mod.rs │ ├── runtime.rs │ └── manifest.rs ├── query/ │ ├── mod.rs │ └── uqf.rs └── observability/ └── mod.rs - 1.4 Configure clippy and rustfmt
Tests
- T1.1 Project compiles successfully
- T1.2 Dependencies resolve from madapes registry
- T1.3 Clippy passes with no warnings
Milestone 2: Core Types (Envelopes, View Keys, Checkpoints)
Goal: Define all core types required for event consumption, view persistence, and idempotency.
Dependencies
- Milestone 1 (project foundation)
Tasks
- 2.1 Implement
TenantIdtype- Optional with default empty string for non-multi-tenant setups
- Display, FromStr, Serialize, Deserialize
- 2.2 Implement
ViewTypeandViewIdtypes- String wrappers (consistent with
../aggregatestyle: no validation in the type wrapper) - Display, FromStr, Serialize, Deserialize
- String wrappers (consistent with
- 2.3 Implement
ViewKeycompositionview:{tenant_id}:{view_type}:{view_id}- Centralize formatting/parsing in one place
- 2.4 Implement
CheckpointKeycompositioncheckpoint:{tenant_id}:{view_type}
- 2.5 Define event envelope type consumed from JetStream
tenant_id,aggregate_id,aggregate_type,event_type,payload,timestamp- Support forward-compatible decoding (unknown fields ignored)
- 2.6 Define checkpoint representation
- Persisted value holds JetStream stream sequence (u64) and optional metadata
- 2.7 Implement Projection error model
- Storage errors, stream errors, decode errors, project errors, tenant access errors
- 2.8 Implement
ProjectionManifesttype- Defines projections (
view_type) and theprojectprogram reference for each - Validates referenced programs exist
- Defines projections (
- 2.9 Add correlation/trace context fields to the event envelope (forward compatible)
- Support optional
correlation_idandtraceparent(ortrace_id) so logs/traces can be stitched back to Gateway flows - Preserve unknown fields for forward compatibility where practical
- Support optional
Tests
- T2.1
TenantIdround-trips serialization and defaults to empty - T2.2
ViewType,ViewIdand key composition produce stable strings - T2.3 Checkpoint encoding/decoding round-trips
- T2.4 Envelope decoding handles unknown fields
- T2.5 Tautological test: core types are Send + Sync
Milestone 3: Configuration
Goal: Implement configuration loading and validation for the Projection node.
Dependencies
- Milestone 2 (core types)
Tasks
- 3.1 Define
Settingsstruct- NATS URL, stream name, subject filter(s)
- Durable consumer name strategy (per tenant/view)
- Storage path
- Multi-tenancy enabled flag + default tenant behavior
- Backpressure configuration (max in-flight, batching, ack timeout)
- Manifest path (projection definitions and project program refs)
- 3.2 Implement config loading from environment
- 3.3 Implement config loading from file (YAML/TOML/JSON)
- Environment overrides file
- 3.4 Implement config validation
- Required fields present
- Manifest loads and validates at startup (not inside
Settings::validate)
Tests
- T3.1 Settings loads from environment variables
- T3.2 Settings validation catches missing/invalid values
- T3.3 Tautological test: Settings is Clone + Debug
Milestone 4: Storage Layer (KvStore Views + Atomic Checkpoints)
Goal: Integrate edge-storage KvStore with transactionally correct view + checkpoint updates.
Dependencies
- Milestone 2 (core types)
- Milestone 3 (configuration)
Tasks
- 4.1 Create
KvClientwrapper- Opens MDBX-backed KvStore at configured path
- Tenant-aware key composition helpers
- 4.2 Implement view CRUD primitives
get_view(view_key) -> Option<Value>put_view(view_key, value)delete_view_prefix(tenant_id, view_type)for rebuilds
- 4.3 Implement checkpoint primitives
get_checkpoint(checkpoint_key) -> Option<Sequence>put_checkpoint(checkpoint_key, sequence)
- 4.4 Implement atomic commit primitive
txn { put_view(s); put_checkpoint }as one MDBX transaction- Expose API that makes it hard to update one without the other
- 4.5 Optional: storage circuit breaker
- Protects node from tight retry loops when storage is degraded
Tests
- T4.1 View round-trip: put/get returns identical JSON
- T4.2 Checkpoint round-trip: put/get returns identical sequence
- T4.3 Atomicity: if transaction fails, neither view nor checkpoint is committed
- T4.4 Prefix delete removes all keys for tenant/view_type
Milestone 5: JetStream Consumption (Durable Consumer + Idempotency)
Goal: Consume events from NATS JetStream with correct delivery semantics and checkpoint-based idempotency.
Dependencies
- Milestone 4 (storage layer)
Tasks
- 5.1 Implement JetStream client wrapper
- Connect to NATS and bind to configured stream
- Create/bind durable consumer (single filter subject for now)
- 5.2 Decode messages into event envelopes
- Extract JetStream stream sequence from message metadata
- Decode payload into the envelope type
- 5.3 Implement idempotency gate
- Load checkpoint for
(tenant_id, view_type) - Skip/ack messages with sequence
<= checkpoint
- Load checkpoint for
- 5.4 Implement ack discipline
- Ack only after the MDBX transaction commits
- Define behavior for transient errors (no ack, allow redelivery)
- 5.5 Implement poison-message policy
- Enforced via consumer max-deliver + TERM ack on excessive deliveries and KV quarantine record
Tests
- T5.1 Unit test: checkpoint gate skips sequences
<= checkpoint - T5.2 Unit test: ack is not called when storage commit fails
- T5.3 Integration test: JetStream redelivery re-processes unacked message and is made idempotent by checkpoint (ignored by default; run with
PROJECTION_TEST_NATS_URL=... cargo test -- --ignored)
Milestone 6: Projection Execution (runtime-function project Program)
Goal: Apply deterministic projection logic to turn (current_view, event) into new_view.
Dependencies
- Milestone 5 (stream consumption)
Tasks
- 6.1 Implement
projectexecution wrapper- Loads program referenced by manifest
- Executes deterministically with gas limits + timeouts via
runtime-function
- 6.2 Define projection invocation contract
- Input:
{ current_view, event } - Output:
{ new_view, view_id }(or equivalent)
- Input:
- 6.3 Implement per-message processing pipeline
- Load current view
- Execute project program
- Atomically write new view + checkpoint
- Ack message
- 6.4 Enforce per-entity ordering where required
- Serialize updates per
(tenant_id, view_type, view_id)if correctness depends on ordering
- Serialize updates per
Tests
- T6.1 Unit test: project program transforms input deterministically (same input → same output)
- T6.2 Unit test: pipeline writes checkpoint only when view write succeeds
- T6.3 Integration test: concurrent processing does not violate per-key ordering when enabled
Milestone 7: Query Support (query-engine UQF)
Goal: Provide query access to stored views using query-engine UQF over KvStore::query() prefix scans.
Dependencies
- Milestone 4 (storage layer)
Tasks
- 7.1 Implement query wrapper around
KvStoreprefix scans- Tenant-scoped prefix scanning
- UQF filtering/sorting support
- 7.2 Define query interface (library API and/or service endpoint)
- Ensure tenant isolation for all query paths
- 7.3 Add query-time safeguards
- Limits on result size and scan cost
- Stable pagination strategy (if required)
Tests
- T7.1 Unit test: tenant-scoped queries never return other-tenant keys
- T7.2 Unit test: UQF filter works on a small in-memory fixture dataset
Milestone 8: Replay, Rebuild, and Hot Provisioning
Goal: Implement operational workflows: rebuilds, backfills, rolling upgrades, and safety checks.
Dependencies
- Milestone 5 (JetStream consumption)
- Milestone 6 (projection execution)
Tasks
- 8.1 Implement catch-up mode
- When no checkpoint exists, start from sequence 1 and process until tail
- 8.2 Implement rebuild workflow
- Delete view prefix + checkpoint for
(tenant_id, view_type) - Re-consume from sequence 1 (or chosen seed sequence)
- Delete view prefix + checkpoint for
- 8.3 Implement hot upgrade workflow (versioned views)
- New
view_type(or suffix) + independent checkpoint - Backfill then cutover routing, then retire old
- New
- 8.4 Add health/readiness signals
- Storage reachable
- NATS reachable
- Consumer lag below threshold (optional)
Tests
- T8.1 Integration test: rebuild from scratch produces identical view as uninterrupted consumption (ignored by default; run with
PROJECTION_TEST_NATS_URL=... cargo test -- --ignored) - T8.2 Integration test: rolling restart resumes from checkpoint without duplicating results (ignored by default; run with
PROJECTION_TEST_NATS_URL=... cargo test -- --ignored)
Milestone 9: Container & Deployment
Goal: Package as a container and enable predictable local and production deployment.
Dependencies
- Milestone 8 (replay/rebuild + health/readiness)
Tasks
- 9.1 Create
docker/Dockerfile.rust- Multi-stage build
- Minimal runtime image
- Health check integration
- 9.2 Create
docker-compose.ymlfor local dev- Projection container
- NATS server with JetStream enabled
- Optional: Grafana, Victoria Metrics, Loki
- 9.3 Create container entrypoint behavior
- Config loading
- Graceful shutdown on SIGTERM
- Stop pulling new JetStream messages
- Complete or safely abandon in-flight processing without acking early
- 9.4 Define environment variables and defaults
- NATS URL, stream name, subject filters
- Storage path
- Consumer naming strategy (durable)
- Multi-tenancy enabled flag
- 9.5 Create release build optimization
- LTO, strip, single codegen unit
Tests
- T9.1 Container builds successfully
docker build -f docker/Dockerfile.rust --build-arg PACKAGE=projection --build-arg BIN=projection -t cloudlysis/projection:local . docker run cloudlysis/projection:local --help - T9.2 Container starts with valid config
docker run -e PROJECTION_NATS_URL=nats://nats:4222 cloudlysis/projection:local
Milestone 10: Provisioning, Scalability, and Docker Swarm Deployment
Goal: Support horizontal scaling and safe rollouts in Docker Swarm with clear provisioning semantics for JetStream consumers.
Dependencies
- Milestone 9 (container & deployment)
Tasks
- 10.1 Define the scaling model for JetStream consumption
- Use a durable consumer per
(view_type, shard)so multiple replicas can share the same consumer workload - Define the subject filter(s) used by each consumer (tenant wildcard vs tenant-range sharded)
- Document consumer configuration requirements (ack policy, max in-flight, replay policy)
- Use a durable consumer per
- 10.2 Implement replica-safe consumption
- Multiple replicas pulling from the same durable consumer distribute work
- Enforce per-key serialization if required for correctness
- 10.3 Add tenant-aware provisioning option (sharding)
- Optional tenant-range sharding by subject filters (e.g.,
tenant.<range>.*) - Placement constraints for Swarm nodes (e.g.,
node.labels.tenant_range==<range>) - Strategy for adding/removing shards
- Optional tenant-range sharding by subject filters (e.g.,
- 10.4 Create Swarm stack definition (
swarm/stacks/platform.yml)- Service definition
- Replicas configuration
- Resource limits (CPU, memory)
- Health check integration
- Storage volume mapping for
edge-storagedata directory
- 10.5 Define rollout strategy
- Rolling update parameters
- Backfill/cutover strategy for versioned
view_typeupgrades - Safe rollback story (old view still present + routing switch back)
Tests
- T10.1 Stack file valid
docker stack config -c swarm/stacks/platform.yml - T10.2 Scale-out does not duplicate work (ignored by default; run with
PROJECTION_TEST_NATS_URL=... cargo test -- --ignored)- Start 2+ replicas pulling from the same durable consumer
- Verify the checkpoint is monotonic and view updates are not applied twice for the same sequence
- T10.3 Rolling restart preserves correctness (ignored by default; run with
PROJECTION_TEST_NATS_URL=... cargo test -- --ignored)- Restart replicas during active consumption
- Verify idempotency holds (no view corruption, checkpoint monotonic)
Milestone 11: Operational Endpoints & Observability
Goal: Provide the minimum operational surface required to provision, scale, and monitor Projection nodes in production.
Dependencies
- Milestone 9 (container & deployment)
- Milestone 10 (provisioning & scaling semantics)
Tasks
- 11.1 Implement
/healthendpoint- Process is up
- Storage opened successfully
- 11.2 Implement
/readyendpoint- NATS connection established
- JetStream consumer bound
- Storage writable
- 11.3 Implement
/metricsendpoint (Prometheus)- Consumer lag (stream sequence - checkpoint)
- Processing throughput and latency
- Redelivery count / ack failures
- Storage commit failures
- 11.4 Add build/runtime identity
- Version, commit hash (if available), configured
view_typeset
- Version, commit hash (if available), configured
- 11.5 Add graceful drain behavior for rollouts
- Report “not ready” before shutdown
- Stop pulling, wait for in-flight work up to a timeout
- 11.6 Correlation-aware logging for investigations
- When processing messages, include
tenant_id,correlation_id, andtrace_idin structured logs/spans when present in the envelope/headers
- When processing messages, include
Tests
- T11.1 Readiness fails when NATS is unavailable
- T11.2 Metrics include lag gauge and counters
- T11.3 Drain transitions ready → not ready before exit
- T11.4 Unit test: envelope decoding accepts optional correlation/trace fields and exposes them for logging