Files
cloudlysis/projection/DEVELOPMENT_PLAN.md
Vlad Durnea 1298d9a3df
Some checks failed
ci / rust (push) Failing after 2m34s
ci / ui (push) Failing after 30s
Monorepo consolidation: workspace, shared types, transport plans, docker/swam assets
2026-03-30 11:40:42 +03:00

16 KiB

Development Plan: Projection Node

Overview

This plan breaks down the Projection node implementation into milestones ordered by dependency. Each milestone includes:

  • Tasks with clear deliverables
  • Test Requirements (unit tests + tautological tests + integration tests where applicable)
  • Dependencies on previous milestones

Development Approach:

  1. Complete one milestone at a time
  2. Write tests before implementation (TDD where applicable)
  3. All tests must pass before moving to the next milestone
  4. Mark tasks complete with [x] as you progress

Milestone 1: Project Foundation

Goal: Set up the Rust project with proper structure, dependencies, and basic tooling.

Tasks

  • 1.1 Initialize Cargo project
    • Create src/lib.rs and src/main.rs
    • Configure Cargo.toml with madapes registry
  • 1.2 Configure dependencies
    • edge-storage (KvStore)
    • runtime-function (DAG program execution for project)
    • query-engine (UQF query support)
    • edge-logger-client (structured logs client)
    • async-nats (JetStream consumption)
    • tokio, serde, serde_json, thiserror, anyhow, tracing
  • 1.3 Establish initial module layout
    src/
    ├── lib.rs
    ├── main.rs
    ├── types/
    │   ├── mod.rs
    │   ├── id.rs
    │   ├── event.rs
    │   ├── view.rs
    │   ├── checkpoint.rs
    │   └── error.rs
    ├── config/
    │   ├── mod.rs
    │   └── settings.rs
    ├── storage/
    │   ├── mod.rs
    │   └── kv.rs
    ├── stream/
    │   ├── mod.rs
    │   └── jetstream.rs
    ├── project/
    │   ├── mod.rs
    │   ├── runtime.rs
    │   └── manifest.rs
    ├── query/
    │   ├── mod.rs
    │   └── uqf.rs
    └── observability/
        └── mod.rs
    
  • 1.4 Configure clippy and rustfmt

Tests

  • T1.1 Project compiles successfully
  • T1.2 Dependencies resolve from madapes registry
  • T1.3 Clippy passes with no warnings

Milestone 2: Core Types (Envelopes, View Keys, Checkpoints)

Goal: Define all core types required for event consumption, view persistence, and idempotency.

Dependencies

  • Milestone 1 (project foundation)

Tasks

  • 2.1 Implement TenantId type
    • Optional with default empty string for non-multi-tenant setups
    • Display, FromStr, Serialize, Deserialize
  • 2.2 Implement ViewType and ViewId types
    • String wrappers (consistent with ../aggregate style: no validation in the type wrapper)
    • Display, FromStr, Serialize, Deserialize
  • 2.3 Implement ViewKey composition
    • view:{tenant_id}:{view_type}:{view_id}
    • Centralize formatting/parsing in one place
  • 2.4 Implement CheckpointKey composition
    • checkpoint:{tenant_id}:{view_type}
  • 2.5 Define event envelope type consumed from JetStream
    • tenant_id, aggregate_id, aggregate_type, event_type, payload, timestamp
    • Support forward-compatible decoding (unknown fields ignored)
  • 2.6 Define checkpoint representation
    • Persisted value holds JetStream stream sequence (u64) and optional metadata
  • 2.7 Implement Projection error model
    • Storage errors, stream errors, decode errors, project errors, tenant access errors
  • 2.8 Implement ProjectionManifest type
    • Defines projections (view_type) and the project program reference for each
    • Validates referenced programs exist
  • 2.9 Add correlation/trace context fields to the event envelope (forward compatible)
    • Support optional correlation_id and traceparent (or trace_id) so logs/traces can be stitched back to Gateway flows
    • Preserve unknown fields for forward compatibility where practical

Tests

  • T2.1 TenantId round-trips serialization and defaults to empty
  • T2.2 ViewType, ViewId and key composition produce stable strings
  • T2.3 Checkpoint encoding/decoding round-trips
  • T2.4 Envelope decoding handles unknown fields
  • T2.5 Tautological test: core types are Send + Sync

Milestone 3: Configuration

Goal: Implement configuration loading and validation for the Projection node.

Dependencies

  • Milestone 2 (core types)

Tasks

  • 3.1 Define Settings struct
    • NATS URL, stream name, subject filter(s)
    • Durable consumer name strategy (per tenant/view)
    • Storage path
    • Multi-tenancy enabled flag + default tenant behavior
    • Backpressure configuration (max in-flight, batching, ack timeout)
    • Manifest path (projection definitions and project program refs)
  • 3.2 Implement config loading from environment
  • 3.3 Implement config loading from file (YAML/TOML/JSON)
    • Environment overrides file
  • 3.4 Implement config validation
    • Required fields present
    • Manifest loads and validates at startup (not inside Settings::validate)

Tests

  • T3.1 Settings loads from environment variables
  • T3.2 Settings validation catches missing/invalid values
  • T3.3 Tautological test: Settings is Clone + Debug

Milestone 4: Storage Layer (KvStore Views + Atomic Checkpoints)

Goal: Integrate edge-storage KvStore with transactionally correct view + checkpoint updates.

Dependencies

  • Milestone 2 (core types)
  • Milestone 3 (configuration)

Tasks

  • 4.1 Create KvClient wrapper
    • Opens MDBX-backed KvStore at configured path
    • Tenant-aware key composition helpers
  • 4.2 Implement view CRUD primitives
    • get_view(view_key) -> Option<Value>
    • put_view(view_key, value)
    • delete_view_prefix(tenant_id, view_type) for rebuilds
  • 4.3 Implement checkpoint primitives
    • get_checkpoint(checkpoint_key) -> Option<Sequence>
    • put_checkpoint(checkpoint_key, sequence)
  • 4.4 Implement atomic commit primitive
    • txn { put_view(s); put_checkpoint } as one MDBX transaction
    • Expose API that makes it hard to update one without the other
  • 4.5 Optional: storage circuit breaker
    • Protects node from tight retry loops when storage is degraded

Tests

  • T4.1 View round-trip: put/get returns identical JSON
  • T4.2 Checkpoint round-trip: put/get returns identical sequence
  • T4.3 Atomicity: if transaction fails, neither view nor checkpoint is committed
  • T4.4 Prefix delete removes all keys for tenant/view_type

Milestone 5: JetStream Consumption (Durable Consumer + Idempotency)

Goal: Consume events from NATS JetStream with correct delivery semantics and checkpoint-based idempotency.

Dependencies

  • Milestone 4 (storage layer)

Tasks

  • 5.1 Implement JetStream client wrapper
    • Connect to NATS and bind to configured stream
    • Create/bind durable consumer (single filter subject for now)
  • 5.2 Decode messages into event envelopes
    • Extract JetStream stream sequence from message metadata
    • Decode payload into the envelope type
  • 5.3 Implement idempotency gate
    • Load checkpoint for (tenant_id, view_type)
    • Skip/ack messages with sequence <= checkpoint
  • 5.4 Implement ack discipline
    • Ack only after the MDBX transaction commits
    • Define behavior for transient errors (no ack, allow redelivery)
  • 5.5 Implement poison-message policy
    • Enforced via consumer max-deliver + TERM ack on excessive deliveries and KV quarantine record

Tests

  • T5.1 Unit test: checkpoint gate skips sequences <= checkpoint
  • T5.2 Unit test: ack is not called when storage commit fails
  • T5.3 Integration test: JetStream redelivery re-processes unacked message and is made idempotent by checkpoint (ignored by default; run with PROJECTION_TEST_NATS_URL=... cargo test -- --ignored)

Milestone 6: Projection Execution (runtime-function project Program)

Goal: Apply deterministic projection logic to turn (current_view, event) into new_view.

Dependencies

  • Milestone 5 (stream consumption)

Tasks

  • 6.1 Implement project execution wrapper
    • Loads program referenced by manifest
    • Executes deterministically with gas limits + timeouts via runtime-function
  • 6.2 Define projection invocation contract
    • Input: { current_view, event }
    • Output: { new_view, view_id } (or equivalent)
  • 6.3 Implement per-message processing pipeline
    • Load current view
    • Execute project program
    • Atomically write new view + checkpoint
    • Ack message
  • 6.4 Enforce per-entity ordering where required
    • Serialize updates per (tenant_id, view_type, view_id) if correctness depends on ordering

Tests

  • T6.1 Unit test: project program transforms input deterministically (same input → same output)
  • T6.2 Unit test: pipeline writes checkpoint only when view write succeeds
  • T6.3 Integration test: concurrent processing does not violate per-key ordering when enabled

Milestone 7: Query Support (query-engine UQF)

Goal: Provide query access to stored views using query-engine UQF over KvStore::query() prefix scans.

Dependencies

  • Milestone 4 (storage layer)

Tasks

  • 7.1 Implement query wrapper around KvStore prefix scans
    • Tenant-scoped prefix scanning
    • UQF filtering/sorting support
  • 7.2 Define query interface (library API and/or service endpoint)
    • Ensure tenant isolation for all query paths
  • 7.3 Add query-time safeguards
    • Limits on result size and scan cost
    • Stable pagination strategy (if required)

Tests

  • T7.1 Unit test: tenant-scoped queries never return other-tenant keys
  • T7.2 Unit test: UQF filter works on a small in-memory fixture dataset

Milestone 8: Replay, Rebuild, and Hot Provisioning

Goal: Implement operational workflows: rebuilds, backfills, rolling upgrades, and safety checks.

Dependencies

  • Milestone 5 (JetStream consumption)
  • Milestone 6 (projection execution)

Tasks

  • 8.1 Implement catch-up mode
    • When no checkpoint exists, start from sequence 1 and process until tail
  • 8.2 Implement rebuild workflow
    • Delete view prefix + checkpoint for (tenant_id, view_type)
    • Re-consume from sequence 1 (or chosen seed sequence)
  • 8.3 Implement hot upgrade workflow (versioned views)
    • New view_type (or suffix) + independent checkpoint
    • Backfill then cutover routing, then retire old
  • 8.4 Add health/readiness signals
    • Storage reachable
    • NATS reachable
    • Consumer lag below threshold (optional)

Tests

  • T8.1 Integration test: rebuild from scratch produces identical view as uninterrupted consumption (ignored by default; run with PROJECTION_TEST_NATS_URL=... cargo test -- --ignored)
  • T8.2 Integration test: rolling restart resumes from checkpoint without duplicating results (ignored by default; run with PROJECTION_TEST_NATS_URL=... cargo test -- --ignored)

Milestone 9: Container & Deployment

Goal: Package as a container and enable predictable local and production deployment.

Dependencies

  • Milestone 8 (replay/rebuild + health/readiness)

Tasks

  • 9.1 Create docker/Dockerfile.rust
    • Multi-stage build
    • Minimal runtime image
    • Health check integration
  • 9.2 Create docker-compose.yml for local dev
    • Projection container
    • NATS server with JetStream enabled
    • Optional: Grafana, Victoria Metrics, Loki
  • 9.3 Create container entrypoint behavior
    • Config loading
    • Graceful shutdown on SIGTERM
    • Stop pulling new JetStream messages
    • Complete or safely abandon in-flight processing without acking early
  • 9.4 Define environment variables and defaults
    • NATS URL, stream name, subject filters
    • Storage path
    • Consumer naming strategy (durable)
    • Multi-tenancy enabled flag
  • 9.5 Create release build optimization
    • LTO, strip, single codegen unit

Tests

  • T9.1 Container builds successfully
    docker build -f docker/Dockerfile.rust --build-arg PACKAGE=projection --build-arg BIN=projection -t cloudlysis/projection:local .
    docker run cloudlysis/projection:local --help
    
  • T9.2 Container starts with valid config
    docker run -e PROJECTION_NATS_URL=nats://nats:4222 cloudlysis/projection:local
    

Milestone 10: Provisioning, Scalability, and Docker Swarm Deployment

Goal: Support horizontal scaling and safe rollouts in Docker Swarm with clear provisioning semantics for JetStream consumers.

Dependencies

  • Milestone 9 (container & deployment)

Tasks

  • 10.1 Define the scaling model for JetStream consumption
    • Use a durable consumer per (view_type, shard) so multiple replicas can share the same consumer workload
    • Define the subject filter(s) used by each consumer (tenant wildcard vs tenant-range sharded)
    • Document consumer configuration requirements (ack policy, max in-flight, replay policy)
  • 10.2 Implement replica-safe consumption
    • Multiple replicas pulling from the same durable consumer distribute work
    • Enforce per-key serialization if required for correctness
  • 10.3 Add tenant-aware provisioning option (sharding)
    • Optional tenant-range sharding by subject filters (e.g., tenant.<range>.*)
    • Placement constraints for Swarm nodes (e.g., node.labels.tenant_range==<range>)
    • Strategy for adding/removing shards
  • 10.4 Create Swarm stack definition (swarm/stacks/platform.yml)
    • Service definition
    • Replicas configuration
    • Resource limits (CPU, memory)
    • Health check integration
    • Storage volume mapping for edge-storage data directory
  • 10.5 Define rollout strategy
    • Rolling update parameters
    • Backfill/cutover strategy for versioned view_type upgrades
    • Safe rollback story (old view still present + routing switch back)

Tests

  • T10.1 Stack file valid
    docker stack config -c swarm/stacks/platform.yml
    
  • T10.2 Scale-out does not duplicate work (ignored by default; run with PROJECTION_TEST_NATS_URL=... cargo test -- --ignored)
    • Start 2+ replicas pulling from the same durable consumer
    • Verify the checkpoint is monotonic and view updates are not applied twice for the same sequence
  • T10.3 Rolling restart preserves correctness (ignored by default; run with PROJECTION_TEST_NATS_URL=... cargo test -- --ignored)
    • Restart replicas during active consumption
    • Verify idempotency holds (no view corruption, checkpoint monotonic)

Milestone 11: Operational Endpoints & Observability

Goal: Provide the minimum operational surface required to provision, scale, and monitor Projection nodes in production.

Dependencies

  • Milestone 9 (container & deployment)
  • Milestone 10 (provisioning & scaling semantics)

Tasks

  • 11.1 Implement /health endpoint
    • Process is up
    • Storage opened successfully
  • 11.2 Implement /ready endpoint
    • NATS connection established
    • JetStream consumer bound
    • Storage writable
  • 11.3 Implement /metrics endpoint (Prometheus)
    • Consumer lag (stream sequence - checkpoint)
    • Processing throughput and latency
    • Redelivery count / ack failures
    • Storage commit failures
  • 11.4 Add build/runtime identity
    • Version, commit hash (if available), configured view_type set
  • 11.5 Add graceful drain behavior for rollouts
    • Report “not ready” before shutdown
    • Stop pulling, wait for in-flight work up to a timeout
  • 11.6 Correlation-aware logging for investigations
    • When processing messages, include tenant_id, correlation_id, and trace_id in structured logs/spans when present in the envelope/headers

Tests

  • T11.1 Readiness fails when NATS is unavailable
  • T11.2 Metrics include lag gauge and counters
  • T11.3 Drain transitions ready → not ready before exit
  • T11.4 Unit test: envelope decoding accepts optional correlation/trace fields and exposes them for logging