# Milestone 2: Storage Pillar **Goal:** Storage becomes a first-class pillar supporting self-hosted MinIO or cloud S3 (Hetzner Object Storage, AWS S3, Backblaze B2). Complete the supabase-js `storage` API surface. **Depends on:** M1 (Foundation) --- ## 2.1 — Storage Pillar Compose & Configuration ### 2.1.1 Create docker-compose.pillar-storage.yml This compose file is used only for **self-hosted mode**. In cloud mode, workers connect directly to the external S3 endpoint and this compose file is not needed. ```yaml # MadBase - Pillar: Storage (Self-Hosted) # S3-compatible object storage via MinIO services: minio: image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z container_name: madbase_minio command: server /data --console-address ":9001" ports: - "9000:9000" - "9001:9001" environment: MINIO_ROOT_USER: ${S3_ACCESS_KEY} MINIO_ROOT_PASSWORD: ${S3_SECRET_KEY} MINIO_BROWSER_REDIRECT_URL: http://localhost:9001 volumes: - minio_data:/data healthcheck: test: ["CMD", "mc", "ready", "local"] interval: 10s timeout: 5s retries: 5 restart: unless-stopped volumes: minio_data: networks: default: name: madbase external: true ``` ### 2.1.2 Add STORAGE_MODE env var **File:** `common/src/config.rs` Add to `Config`: ```rust pub storage_mode: StorageMode, pub s3_endpoint: String, pub s3_access_key: String, pub s3_secret_key: String, pub s3_bucket: String, pub s3_region: String, ``` ```rust #[derive(Clone, Debug)] pub enum StorageMode { Cloud, // External S3 (Hetzner, AWS, B2) SelfHosted, // MinIO } ``` Load from env: ```rust let storage_mode = match env::var("STORAGE_MODE").unwrap_or_else(|_| "self-hosted".into()).as_str() { "cloud" | "s3" => StorageMode::Cloud, _ => StorageMode::SelfHosted, }; ``` ### 2.1.3 Create storage-node.yaml template **File:** `templates/storage-node.yaml` ```yaml id: storage-node name: Dedicated Storage Node description: MinIO object storage for self-hosted deployments version: 1.0 min_hetzner_plan: CX21 estimated_monthly_cost: 6.94 services: - id: minio name: MinIO image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z ports: ["9000:9000", "9001:9001"] command: ["server", "/data", "--console-address", ":9001"] volumes: - minio_data:/data resource_profile: storage_intensive requirements: min_nodes: 1 max_nodes: 4 supports_ha: true recommended_deployment: "Dedicated node with attached block storage" notes: | For HA, use distributed MinIO with 4+ nodes and erasure coding. For cloud deployments, skip this node — use Hetzner Object Storage. Estimated storage: 1TB on CX21 block storage = ~€6/mo additional. ``` ### 2.1.4 Add shared Docker network Add to each `docker-compose.pillar-*.yml`: ```yaml networks: default: name: madbase external: true ``` Create the network before first use: `docker network create madbase` --- ## 2.2 — Storage Backend Improvements ### 2.2.1 Route handlers through StorageBackend trait **Current problem:** `StorageState` holds a raw `aws_sdk_s3::Client` and handlers call `state.s3_client.put_object()` directly, bypassing the `StorageBackend` trait entirely. The trait exists but is unused. **Fix:** 1. Expand the `StorageBackend` trait: ```rust #[async_trait] pub trait StorageBackend: Send + Sync { async fn put_object(&self, bucket: &str, key: &str, data: Bytes, content_type: Option<&str>) -> Result<()>; async fn get_object(&self, bucket: &str, key: &str) -> Result; async fn delete_object(&self, bucket: &str, key: &str) -> Result<()>; async fn copy_object(&self, bucket: &str, src_key: &str, dst_key: &str) -> Result<()>; async fn create_bucket(&self, bucket: &str) -> Result<()>; async fn delete_bucket(&self, bucket: &str) -> Result<()>; async fn head_object(&self, bucket: &str, key: &str) -> Result; async fn list_objects(&self, bucket: &str, prefix: &str) -> Result>; } pub struct GetObjectResponse { pub body: Pin> + Send>>, pub content_type: Option, pub content_length: Option, } ``` 2. Change `StorageState`: ```rust #[derive(Clone)] pub struct StorageState { pub db: PgPool, pub backend: Arc, pub config: Config, pub bucket_name: String, } ``` 3. Update `storage/src/lib.rs` init: ```rust pub async fn init(db: PgPool, config: Config) -> Router { let backend: Arc = Arc::new( AwsS3Backend::new(&config).await.expect("Failed to init storage backend") ); let bucket_name = config.s3_bucket.clone(); backend.create_bucket(&bucket_name).await.ok(); let state = StorageState { db, backend, config, bucket_name }; // ...routes... } ``` ### 2.2.2 Add streaming to StorageBackend Replace `get_object() -> Bytes` with streaming response. The AWS SDK already supports this: ```rust async fn get_object(&self, _bucket: &str, key: &str) -> Result { let resp = self.client.get_object() .bucket(&self.bucket_name) .key(key) .send() .await?; let stream = resp.body.into_async_read(); let byte_stream = tokio_util::io::ReaderStream::new(stream); let mapped = byte_stream.map(|r| r.map_err(|e| anyhow::anyhow!(e))); Ok(GetObjectResponse { body: Box::pin(mapped), content_type: resp.content_type.map(|s| s.to_string()), content_length: resp.content_length, }) } ``` In the handler, convert to axum Body: ```rust let resp = state.backend.get_object(&state.bucket_name, &key).await?; let body = Body::from_stream(resp.body); Ok((headers, body)) ``` ### 2.2.3 Add missing HTTP endpoints **Delete object:** `DELETE /storage/v1/object/:bucket_id/*filename` ```rust pub async fn delete_object( State(state): State, Extension(auth_ctx): Extension, Extension(project_ctx): Extension, Path((bucket_id, filename)): Path<(String, String)>, db: Option>, ) -> Result { let pool = db.map(|Extension(p)| p).unwrap_or(state.db.clone()); let mut rls = RlsTransaction::begin(&pool, &auth_ctx).await?; // Verify object exists under RLS let exists = sqlx::query_scalar::<_, Uuid>( "SELECT id FROM storage.objects WHERE bucket_id = $1 AND name = $2" ) .bind(&bucket_id).bind(&filename) .fetch_optional(&mut *rls.tx).await?; if exists.is_none() { return Err(ApiError::NotFound("Object not found".into())); } // Delete from S3 let key = format!("{}/{}/{}", project_ctx.project_ref, bucket_id, filename); state.backend.delete_object(&state.bucket_name, &key).await .map_err(|e| ApiError::Internal(e.to_string()))?; // Delete from DB sqlx::query("DELETE FROM storage.objects WHERE bucket_id = $1 AND name = $2") .bind(&bucket_id).bind(&filename) .execute(&mut *rls.tx).await?; rls.commit().await?; Ok(StatusCode::NO_CONTENT) } ``` **Delete bucket:** `DELETE /storage/v1/bucket/:bucket_id` **Copy object:** `POST /storage/v1/object/copy` with `{ "sourceKey": "bucket/path", "destinationKey": "bucket/path" }` **Move object:** `POST /storage/v1/object/move` (copy + delete source) **Public URL:** `GET /storage/v1/object/public/:bucket_id/*filename` — check `storage.buckets.public = true`, return redirect to S3 presigned URL or stream directly. ### 2.2.4 Add bucket constraints **Migration:** Add columns to `storage.buckets`: ```sql ALTER TABLE storage.buckets ADD COLUMN IF NOT EXISTS file_size_limit BIGINT, ADD COLUMN IF NOT EXISTS allowed_mime_types TEXT[]; ``` **Validation in upload handler:** ```rust // After fetching bucket info if let Some(limit) = bucket.file_size_limit { if data.len() as i64 > limit { return Err(ApiError::BadRequest(format!( "File size {} exceeds bucket limit {}", data.len(), limit ))); } } if let Some(allowed) = &bucket.allowed_mime_types { if !allowed.is_empty() && !allowed.contains(&content_type.to_string()) { return Err(ApiError::BadRequest(format!( "MIME type {} not allowed in this bucket", content_type ))); } } ``` ### 2.2.5 Fix TUS completion — use S3 multipart upload **File:** `storage/src/tus.rs` — `tus_patch_upload` completion block (line ~252) **Current problem:** `fs::read(&upload_path)` loads the entire completed file into memory. **Fix:** Use S3 multipart upload. On TUS create, start a multipart upload. On each PATCH, upload that chunk as a part. On completion, finalize the multipart upload. Store the multipart upload ID in the `.info` file: ```json { "upload_length": 104857600, "bucket_id": "avatars", "filename": "photo.jpg", "s3_upload_id": "abc123...", "parts": [ { "part_number": 1, "etag": "\"abc\"", "size": 5242880 }, { "part_number": 2, "etag": "\"def\"", "size": 5242880 } ] } ``` On PATCH: ```rust let part_number = (current_offset / PART_SIZE) as i32 + 1; let upload_part = state.backend.client() .upload_part() .bucket(&state.bucket_name) .key(&key) .upload_id(&s3_upload_id) .part_number(part_number) .body(ByteStream::from(data)) .send() .await?; // Store etag in info file ``` On completion: ```rust state.backend.client() .complete_multipart_upload() .bucket(&state.bucket_name) .key(&key) .upload_id(&s3_upload_id) .multipart_upload(completed_parts) .send() .await?; // Clean up local temp files ``` > **Note:** S3 multipart parts must be at least 5MB (except the last part). Buffer PATCH data until 5MB before uploading a part. --- ## 2.3 — Storage Health & Observability ### 2.3.1 Health check endpoint Add to `storage/src/lib.rs` router: ```rust .route("/health", get(health_check)) ``` ```rust async fn health_check(State(state): State) -> Result<&'static str, StatusCode> { state.backend.head_bucket(&state.bucket_name) .await .map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?; Ok("OK") } ``` ### 2.3.2 Structured logging Replace all `tracing::info!("File size: {} bytes", size)` with structured fields: ```rust tracing::info!( bucket = %bucket_id, filename = %filename, size_bytes = size, "Upload completed" ); ``` ### 2.3.3 Image transforms — run async **File:** `storage/src/handlers.rs` — `transform_image` function (line 328) Currently runs synchronously, blocking the async runtime. Use `tokio::task::spawn_blocking`: ```rust if width.is_some() || height.is_some() || format.is_some() { let body_clone = body_bytes.clone(); match tokio::task::spawn_blocking(move || { transform_image(body_clone, width, height, quality, format) }).await { Ok(Ok((new_bytes, new_ct))) => { ... }, Ok(Err(e)) => { tracing::warn!(error = %e, "Image transform failed"); }, Err(e) => { tracing::warn!(error = %e, "Image transform panicked"); }, } } ``` --- ## 2.4 — MinIO HA (Optional) ### 2.4.1 Distributed MinIO documentation For self-hosted production with HA, document the distributed mode setup: ```yaml # docker-compose.pillar-storage-ha.yml services: minio1: image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z command: server http://minio{1...4}/data --console-address ":9001" # ... same for minio2, minio3, minio4 ``` Requires 4 nodes minimum for erasure coding. Each node needs its own block storage volume. ### 2.4.2 Lifecycle rules Configure via MinIO client: ```bash mc ilm rule add madbase/madbase \ --expire-delete-marker \ --noncurrent-expire-days 30 \ --prefix "tus-temp/" ``` This auto-cleans incomplete TUS uploads after 30 days. --- ## Route Summary (after M2) | Method | Path | Handler | supabase-js method | |--------|------|---------|-------------------| | GET | `/storage/v1/bucket` | `list_buckets` | `listBuckets()` | | POST | `/storage/v1/bucket` | `create_bucket` | `createBucket()` | | DELETE | `/storage/v1/bucket/:id` | `delete_bucket` | `deleteBucket()` | | POST | `/storage/v1/object/list/:bucket_id` | `list_objects` | `list()` | | POST | `/storage/v1/object/:bucket_id/*filename` | `upload_object` | `upload()` | | GET | `/storage/v1/object/:bucket_id/*filename` | `download_object` | `download()` | | DELETE | `/storage/v1/object/:bucket_id/*filename` | `delete_object` | `remove()` | | POST | `/storage/v1/object/copy` | `copy_object` | `copy()` | | POST | `/storage/v1/object/move` | `move_object` | `move()` | | POST | `/storage/v1/object/sign/:bucket_id/*filename` | `sign_object` | `createSignedUrl()` | | GET | `/storage/v1/object/sign/:bucket_id/*filename` | `get_signed_object` | (signed URL access) | | GET | `/storage/v1/object/public/:bucket_id/*filename` | `get_public_url` | `getPublicUrl()` | | POST | `/storage/v1/upload/resumable` | `tus_create_upload` | (TUS) | | PATCH | `/storage/v1/upload/resumable/:id` | `tus_patch_upload` | (TUS) | | HEAD | `/storage/v1/upload/resumable/:id` | `tus_head_upload` | (TUS) | | GET | `/storage/v1/health` | `health_check` | — | --- ## Completion Requirements This milestone is **not complete** until every item below is satisfied. ### 1. Full Test Suite — All Green - [ ] `cargo test --workspace` passes with **zero failures** - [ ] All **pre-existing tests** still pass (no regressions) - [ ] **New unit tests** are written for every feature in this milestone: | Test | Location | What it validates | |------|----------|-------------------| | `test_s3_put_object` | `storage/src/backend.rs` | `put_object` stores bytes and returns Ok | | `test_s3_get_object_streaming` | `storage/src/backend.rs` | `get_object` returns a streaming body, not buffered | | `test_s3_delete_object` | `storage/src/backend.rs` | `delete_object` removes the key; subsequent `head_object` returns NotFound | | `test_s3_copy_object` | `storage/src/backend.rs` | `copy_object` duplicates object; both keys exist | | `test_s3_move_object` | `storage/src/backend.rs` | After `move_object`, old key is gone, new key exists | | `test_s3_list_objects` | `storage/src/backend.rs` | `list_objects` returns correct prefix-filtered results | | `test_s3_head_object_metadata` | `storage/src/backend.rs` | `head_object` returns correct size and content_type | | `test_s3_create_and_delete_bucket` | `storage/src/backend.rs` | `create_bucket` + `delete_bucket` round-trip succeeds | | `test_bucket_file_size_limit` | `storage/src/handlers.rs` | Upload exceeding `file_size_limit` returns 413 | | `test_bucket_allowed_mime_types` | `storage/src/handlers.rs` | Upload with disallowed MIME type returns 415 | | `test_tus_multipart_completion` | `storage/src/tus.rs` | TUS completion assembles parts via S3 multipart, not in-memory buffer | | `test_health_check_minio_up` | `storage/src/handlers.rs` | `/health` returns 200 when S3 is reachable | | `test_health_check_minio_down` | `storage/src/handlers.rs` | `/health` returns 503 when S3 is unreachable | | `test_storage_mode_self_hosted` | `storage/src/backend.rs` | `STORAGE_MODE=self-hosted` initializes with MinIO endpoint | | `test_storage_mode_cloud` | `storage/src/backend.rs` | `STORAGE_MODE=cloud` initializes with custom S3 endpoint | ### 2. Integration Verification - [ ] `STORAGE_MODE=self-hosted docker compose -f docker-compose.pillar-storage.yml up` starts MinIO and passes health checks - [ ] Upload a 10MB file via `POST /storage/v1/object/test-bucket/big-file.bin` — verify it doesn't OOM - [ ] Download a 10MB file — verify streaming (no OOM) - [ ] Delete an object via `DELETE /storage/v1/object/test-bucket/file.txt` — verify removed from S3 and DB - [ ] Copy an object — verify new key exists in S3 - [ ] Move an object — verify old key removed, new key exists - [ ] Upload to a bucket with `file_size_limit = 1000` — verify rejection for files over 1KB - [ ] TUS upload of a 50MB file completes without loading into memory - [ ] `GET /storage/v1/health` returns 200 when MinIO is up, 503 when down - [ ] `STORAGE_MODE=cloud S3_ENDPOINT=https://fsn1.your-objectstorage.com ...` works with Hetzner Object Storage - [ ] Every route in the Route Summary table above returns the correct response for both success and error cases ### 3. supabase-js Client Compatibility - [ ] `supabase.storage.listBuckets()` works - [ ] `supabase.storage.from('bucket').upload('file.txt', blob)` works - [ ] `supabase.storage.from('bucket').download('file.txt')` works - [ ] `supabase.storage.from('bucket').remove(['file.txt'])` works - [ ] `supabase.storage.from('bucket').copy('a.txt', 'b.txt')` works - [ ] `supabase.storage.from('bucket').move('a.txt', 'c.txt')` works - [ ] `supabase.storage.from('bucket').createSignedUrl('file.txt', 3600)` works - [ ] `supabase.storage.from('public-bucket').getPublicUrl('file.txt')` works ### 4. CI Gate - [ ] All unit tests run in `cargo test --workspace` - [ ] Integration tests that require MinIO are gated behind `#[cfg(feature = "integration")]` or `#[ignore]` with clear documentation - [ ] CI runs unit tests on every PR; integration tests run on merge to main (or nightly)