# aggregate ## Running ### Configuration Configuration is loaded in this order: 1. If `AGGREGATE_CONFIG_PATH` is set and points to a readable config file, load that file and apply env overrides. 2. Otherwise load defaults and apply env overrides. Supported config formats: - YAML (`.yaml`, `.yml`) - TOML (`.toml`) - JSON (`.json`) ### Environment Variables #### Core - `AGGREGATE_NATS_URL` (default: `nats://localhost:4222`): NATS server URL. - `AGGREGATE_STORAGE_PATH` (default: `./data`): Path used by the snapshot storage. - `AGGREGATE_SNAPSHOT_THRESHOLD` (default: `10`): Save snapshot when events since last snapshot reach this threshold. - `AGGREGATE_MAX_RETRIES` (default: `3`): Max retries for version conflicts in command handling. - `AGGREGATE_HTTP_ADDR` (default: `0.0.0.0:8080`): HTTP bind address. - `AGGREGATE_GRPC_ADDR` (default: `0.0.0.0:50051`): gRPC bind address for command submission. #### Multi-tenant - `AGGREGATE_MULTI_TENANT` (default: `true`): Enables multi-tenant behavior when parsing/validating tenant ids. - `AGGREGATE_DEFAULT_TENANT_ID` (default: unset): Default tenant id when the incoming request doesn't specify one. - `AGGREGATE_SHARD_ID` (default: `local`): Shard id used when applying placement maps. #### Logging - `AGGREGATE_LOGGER_SOCKET` (default: unset): Socket path for `edge-logger-client` integration (if enabled). #### Server - `AGGREGATE_CONFIG_PATH` (default: unset): Path to a YAML/TOML/JSON config file. #### Placement - `AGGREGATE_PLACEMENT_BUCKET` (default: `AGGREGATE_PLACEMENT`): NATS KV bucket to watch. - `AGGREGATE_PLACEMENT_KEY` (default: `aggregate_placement`): NATS KV key to watch. Value is a JSON object mapping `tenant_id -> shard_id`. #### Runtime Programs - `AGGREGATE_DECIDE_PROGRAM` / `AGGREGATE_APPLY_PROGRAM`: Inline program source strings. - `AGGREGATE_DECIDE_PROGRAM_PATH` / `AGGREGATE_APPLY_PROGRAM_PATH`: File paths to program source strings. ## HTTP Endpoints - `GET /health` → JSON health report - `GET /ready` → JSON boolean readiness - `GET /metrics` → Prometheus text format - `GET /admin/tenants` → JSON list of hosted tenants - `POST /admin/drain` → marks tenant draining and waits for in-flight commands to finish (`{"tenant_id":"..."}`) - `POST /admin/reload` → updates hosted tenant allowlist (`{"hosted_tenants":[...]}`) or applies a placement map (`{"placement":{...}}`) - `GET /admin/tenant/{tenant_id}/status` → JSON tenant status (`hosted`, `accepting`, `draining`, `in_flight`) - `GET /admin/tenant/{tenant_id}/ready` → JSON boolean (node ready AND accepting tenant) - `POST /admin/tenant/{tenant_id}/drain` → drains tenant with optional timeout (`{"timeout_ms":10000}`) ## gRPC Aggregate exposes a command submission API for the Gateway: - Service: `aggregate.gateway.v1.CommandService` - Method: `SubmitCommand` - Metadata: `x-tenant-id` (tenant routing hint) Proto definition: [aggregate.proto](file:///Users/vlad/Developer/cloudlysis/aggregate/proto/aggregate.proto) ## Container Build and run locally: ```bash docker build -t cloudlysis/aggregate:local -f docker/Dockerfile.rust --build-arg PACKAGE=aggregate --build-arg BIN=aggregate . docker compose up -d --build ``` Container smoke test (requires Docker installed): ```bash sh docker/scripts/verify_aggregate_container.sh ```