Files
madbase/_milestones/M2_storage_pillar.md
Vlad Durnea cffdf8af86
Some checks failed
CI/CD Pipeline / unit-tests (push) Failing after 1m16s
CI/CD Pipeline / integration-tests (push) Failing after 2m32s
CI/CD Pipeline / lint (push) Successful in 5m22s
CI/CD Pipeline / e2e-tests (push) Has been skipped
CI/CD Pipeline / build (push) Has been skipped
wip:milestone 0 fixes
2026-03-15 12:35:42 +02:00

518 lines
17 KiB
Markdown

# Milestone 2: Storage Pillar
**Goal:** Storage becomes a first-class pillar supporting self-hosted MinIO or cloud S3 (Hetzner Object Storage, AWS S3, Backblaze B2). Complete the supabase-js `storage` API surface.
**Depends on:** M1 (Foundation)
---
## 2.1 — Storage Pillar Compose & Configuration
### 2.1.1 Create docker-compose.pillar-storage.yml
This compose file is used only for **self-hosted mode**. In cloud mode, workers connect directly to the external S3 endpoint and this compose file is not needed.
```yaml
# MadBase - Pillar: Storage (Self-Hosted)
# S3-compatible object storage via MinIO
services:
minio:
image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
container_name: madbase_minio
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: ${S3_ACCESS_KEY}
MINIO_ROOT_PASSWORD: ${S3_SECRET_KEY}
MINIO_BROWSER_REDIRECT_URL: http://localhost:9001
volumes:
- minio_data:/data
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
volumes:
minio_data:
networks:
default:
name: madbase
external: true
```
### 2.1.2 Add STORAGE_MODE env var
**File:** `common/src/config.rs`
Add to `Config`:
```rust
pub storage_mode: StorageMode,
pub s3_endpoint: String,
pub s3_access_key: String,
pub s3_secret_key: String,
pub s3_bucket: String,
pub s3_region: String,
```
```rust
#[derive(Clone, Debug)]
pub enum StorageMode {
Cloud, // External S3 (Hetzner, AWS, B2)
SelfHosted, // MinIO
}
```
Load from env:
```rust
let storage_mode = match env::var("STORAGE_MODE").unwrap_or_else(|_| "self-hosted".into()).as_str() {
"cloud" | "s3" => StorageMode::Cloud,
_ => StorageMode::SelfHosted,
};
```
### 2.1.3 Create storage-node.yaml template
**File:** `templates/storage-node.yaml`
```yaml
id: storage-node
name: Dedicated Storage Node
description: MinIO object storage for self-hosted deployments
version: 1.0
min_hetzner_plan: CX21
estimated_monthly_cost: 6.94
services:
- id: minio
name: MinIO
image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
ports: ["9000:9000", "9001:9001"]
command: ["server", "/data", "--console-address", ":9001"]
volumes:
- minio_data:/data
resource_profile: storage_intensive
requirements:
min_nodes: 1
max_nodes: 4
supports_ha: true
recommended_deployment: "Dedicated node with attached block storage"
notes: |
For HA, use distributed MinIO with 4+ nodes and erasure coding.
For cloud deployments, skip this node — use Hetzner Object Storage.
Estimated storage: 1TB on CX21 block storage = ~€6/mo additional.
```
### 2.1.4 Add shared Docker network
Add to each `docker-compose.pillar-*.yml`:
```yaml
networks:
default:
name: madbase
external: true
```
Create the network before first use: `docker network create madbase`
---
## 2.2 — Storage Backend Improvements
### 2.2.1 Route handlers through StorageBackend trait
**Current problem:** `StorageState` holds a raw `aws_sdk_s3::Client` and handlers call `state.s3_client.put_object()` directly, bypassing the `StorageBackend` trait entirely. The trait exists but is unused.
**Fix:**
1. Expand the `StorageBackend` trait:
```rust
#[async_trait]
pub trait StorageBackend: Send + Sync {
async fn put_object(&self, bucket: &str, key: &str, data: Bytes, content_type: Option<&str>) -> Result<()>;
async fn get_object(&self, bucket: &str, key: &str) -> Result<GetObjectResponse>;
async fn delete_object(&self, bucket: &str, key: &str) -> Result<()>;
async fn copy_object(&self, bucket: &str, src_key: &str, dst_key: &str) -> Result<()>;
async fn create_bucket(&self, bucket: &str) -> Result<()>;
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
async fn head_object(&self, bucket: &str, key: &str) -> Result<ObjectMetadata>;
async fn list_objects(&self, bucket: &str, prefix: &str) -> Result<Vec<ObjectMetadata>>;
}
pub struct GetObjectResponse {
pub body: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>,
pub content_type: Option<String>,
pub content_length: Option<i64>,
}
```
2. Change `StorageState`:
```rust
#[derive(Clone)]
pub struct StorageState {
pub db: PgPool,
pub backend: Arc<dyn StorageBackend>,
pub config: Config,
pub bucket_name: String,
}
```
3. Update `storage/src/lib.rs` init:
```rust
pub async fn init(db: PgPool, config: Config) -> Router {
let backend: Arc<dyn StorageBackend> = Arc::new(
AwsS3Backend::new(&config).await.expect("Failed to init storage backend")
);
let bucket_name = config.s3_bucket.clone();
backend.create_bucket(&bucket_name).await.ok();
let state = StorageState { db, backend, config, bucket_name };
// ...routes...
}
```
### 2.2.2 Add streaming to StorageBackend
Replace `get_object() -> Bytes` with streaming response. The AWS SDK already supports this:
```rust
async fn get_object(&self, _bucket: &str, key: &str) -> Result<GetObjectResponse> {
let resp = self.client.get_object()
.bucket(&self.bucket_name)
.key(key)
.send()
.await?;
let stream = resp.body.into_async_read();
let byte_stream = tokio_util::io::ReaderStream::new(stream);
let mapped = byte_stream.map(|r| r.map_err(|e| anyhow::anyhow!(e)));
Ok(GetObjectResponse {
body: Box::pin(mapped),
content_type: resp.content_type.map(|s| s.to_string()),
content_length: resp.content_length,
})
}
```
In the handler, convert to axum Body:
```rust
let resp = state.backend.get_object(&state.bucket_name, &key).await?;
let body = Body::from_stream(resp.body);
Ok((headers, body))
```
### 2.2.3 Add missing HTTP endpoints
**Delete object:** `DELETE /storage/v1/object/:bucket_id/*filename`
```rust
pub async fn delete_object(
State(state): State<StorageState>,
Extension(auth_ctx): Extension<AuthContext>,
Extension(project_ctx): Extension<ProjectContext>,
Path((bucket_id, filename)): Path<(String, String)>,
db: Option<Extension<PgPool>>,
) -> Result<StatusCode, ApiError> {
let pool = db.map(|Extension(p)| p).unwrap_or(state.db.clone());
let mut rls = RlsTransaction::begin(&pool, &auth_ctx).await?;
// Verify object exists under RLS
let exists = sqlx::query_scalar::<_, Uuid>(
"SELECT id FROM storage.objects WHERE bucket_id = $1 AND name = $2"
)
.bind(&bucket_id).bind(&filename)
.fetch_optional(&mut *rls.tx).await?;
if exists.is_none() {
return Err(ApiError::NotFound("Object not found".into()));
}
// Delete from S3
let key = format!("{}/{}/{}", project_ctx.project_ref, bucket_id, filename);
state.backend.delete_object(&state.bucket_name, &key).await
.map_err(|e| ApiError::Internal(e.to_string()))?;
// Delete from DB
sqlx::query("DELETE FROM storage.objects WHERE bucket_id = $1 AND name = $2")
.bind(&bucket_id).bind(&filename)
.execute(&mut *rls.tx).await?;
rls.commit().await?;
Ok(StatusCode::NO_CONTENT)
}
```
**Delete bucket:** `DELETE /storage/v1/bucket/:bucket_id`
**Copy object:** `POST /storage/v1/object/copy` with `{ "sourceKey": "bucket/path", "destinationKey": "bucket/path" }`
**Move object:** `POST /storage/v1/object/move` (copy + delete source)
**Public URL:** `GET /storage/v1/object/public/:bucket_id/*filename` — check `storage.buckets.public = true`, return redirect to S3 presigned URL or stream directly.
### 2.2.4 Add bucket constraints
**Migration:** Add columns to `storage.buckets`:
```sql
ALTER TABLE storage.buckets
ADD COLUMN IF NOT EXISTS file_size_limit BIGINT,
ADD COLUMN IF NOT EXISTS allowed_mime_types TEXT[];
```
**Validation in upload handler:**
```rust
// After fetching bucket info
if let Some(limit) = bucket.file_size_limit {
if data.len() as i64 > limit {
return Err(ApiError::BadRequest(format!(
"File size {} exceeds bucket limit {}", data.len(), limit
)));
}
}
if let Some(allowed) = &bucket.allowed_mime_types {
if !allowed.is_empty() && !allowed.contains(&content_type.to_string()) {
return Err(ApiError::BadRequest(format!(
"MIME type {} not allowed in this bucket", content_type
)));
}
}
```
### 2.2.5 Fix TUS completion — use S3 multipart upload
**File:** `storage/src/tus.rs``tus_patch_upload` completion block (line ~252)
**Current problem:** `fs::read(&upload_path)` loads the entire completed file into memory.
**Fix:** Use S3 multipart upload. On TUS create, start a multipart upload. On each PATCH, upload that chunk as a part. On completion, finalize the multipart upload.
Store the multipart upload ID in the `.info` file:
```json
{
"upload_length": 104857600,
"bucket_id": "avatars",
"filename": "photo.jpg",
"s3_upload_id": "abc123...",
"parts": [
{ "part_number": 1, "etag": "\"abc\"", "size": 5242880 },
{ "part_number": 2, "etag": "\"def\"", "size": 5242880 }
]
}
```
On PATCH:
```rust
let part_number = (current_offset / PART_SIZE) as i32 + 1;
let upload_part = state.backend.client()
.upload_part()
.bucket(&state.bucket_name)
.key(&key)
.upload_id(&s3_upload_id)
.part_number(part_number)
.body(ByteStream::from(data))
.send()
.await?;
// Store etag in info file
```
On completion:
```rust
state.backend.client()
.complete_multipart_upload()
.bucket(&state.bucket_name)
.key(&key)
.upload_id(&s3_upload_id)
.multipart_upload(completed_parts)
.send()
.await?;
// Clean up local temp files
```
> **Note:** S3 multipart parts must be at least 5MB (except the last part). Buffer PATCH data until 5MB before uploading a part.
---
## 2.3 — Storage Health & Observability
### 2.3.1 Health check endpoint
Add to `storage/src/lib.rs` router:
```rust
.route("/health", get(health_check))
```
```rust
async fn health_check(State(state): State<StorageState>) -> Result<&'static str, StatusCode> {
state.backend.head_bucket(&state.bucket_name)
.await
.map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?;
Ok("OK")
}
```
### 2.3.2 Structured logging
Replace all `tracing::info!("File size: {} bytes", size)` with structured fields:
```rust
tracing::info!(
bucket = %bucket_id,
filename = %filename,
size_bytes = size,
"Upload completed"
);
```
### 2.3.3 Image transforms — run async
**File:** `storage/src/handlers.rs``transform_image` function (line 328)
Currently runs synchronously, blocking the async runtime. Use `tokio::task::spawn_blocking`:
```rust
if width.is_some() || height.is_some() || format.is_some() {
let body_clone = body_bytes.clone();
match tokio::task::spawn_blocking(move || {
transform_image(body_clone, width, height, quality, format)
}).await {
Ok(Ok((new_bytes, new_ct))) => { ... },
Ok(Err(e)) => { tracing::warn!(error = %e, "Image transform failed"); },
Err(e) => { tracing::warn!(error = %e, "Image transform panicked"); },
}
}
```
---
## 2.4 — MinIO HA (Optional)
### 2.4.1 Distributed MinIO documentation
For self-hosted production with HA, document the distributed mode setup:
```yaml
# docker-compose.pillar-storage-ha.yml
services:
minio1:
image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
command: server http://minio{1...4}/data --console-address ":9001"
# ... same for minio2, minio3, minio4
```
Requires 4 nodes minimum for erasure coding. Each node needs its own block storage volume.
### 2.4.2 Lifecycle rules
Configure via MinIO client:
```bash
mc ilm rule add madbase/madbase \
--expire-delete-marker \
--noncurrent-expire-days 30 \
--prefix "tus-temp/"
```
This auto-cleans incomplete TUS uploads after 30 days.
---
## Route Summary (after M2)
| Method | Path | Handler | supabase-js method |
|--------|------|---------|-------------------|
| GET | `/storage/v1/bucket` | `list_buckets` | `listBuckets()` |
| POST | `/storage/v1/bucket` | `create_bucket` | `createBucket()` |
| DELETE | `/storage/v1/bucket/:id` | `delete_bucket` | `deleteBucket()` |
| POST | `/storage/v1/object/list/:bucket_id` | `list_objects` | `list()` |
| POST | `/storage/v1/object/:bucket_id/*filename` | `upload_object` | `upload()` |
| GET | `/storage/v1/object/:bucket_id/*filename` | `download_object` | `download()` |
| DELETE | `/storage/v1/object/:bucket_id/*filename` | `delete_object` | `remove()` |
| POST | `/storage/v1/object/copy` | `copy_object` | `copy()` |
| POST | `/storage/v1/object/move` | `move_object` | `move()` |
| POST | `/storage/v1/object/sign/:bucket_id/*filename` | `sign_object` | `createSignedUrl()` |
| GET | `/storage/v1/object/sign/:bucket_id/*filename` | `get_signed_object` | (signed URL access) |
| GET | `/storage/v1/object/public/:bucket_id/*filename` | `get_public_url` | `getPublicUrl()` |
| POST | `/storage/v1/upload/resumable` | `tus_create_upload` | (TUS) |
| PATCH | `/storage/v1/upload/resumable/:id` | `tus_patch_upload` | (TUS) |
| HEAD | `/storage/v1/upload/resumable/:id` | `tus_head_upload` | (TUS) |
| GET | `/storage/v1/health` | `health_check` | — |
---
## Completion Requirements
This milestone is **not complete** until every item below is satisfied.
### 1. Full Test Suite — All Green
- [ ] `cargo test --workspace` passes with **zero failures**
- [ ] All **pre-existing tests** still pass (no regressions)
- [ ] **New unit tests** are written for every feature in this milestone:
| Test | Location | What it validates |
|------|----------|-------------------|
| `test_s3_put_object` | `storage/src/backend.rs` | `put_object` stores bytes and returns Ok |
| `test_s3_get_object_streaming` | `storage/src/backend.rs` | `get_object` returns a streaming body, not buffered |
| `test_s3_delete_object` | `storage/src/backend.rs` | `delete_object` removes the key; subsequent `head_object` returns NotFound |
| `test_s3_copy_object` | `storage/src/backend.rs` | `copy_object` duplicates object; both keys exist |
| `test_s3_move_object` | `storage/src/backend.rs` | After `move_object`, old key is gone, new key exists |
| `test_s3_list_objects` | `storage/src/backend.rs` | `list_objects` returns correct prefix-filtered results |
| `test_s3_head_object_metadata` | `storage/src/backend.rs` | `head_object` returns correct size and content_type |
| `test_s3_create_and_delete_bucket` | `storage/src/backend.rs` | `create_bucket` + `delete_bucket` round-trip succeeds |
| `test_bucket_file_size_limit` | `storage/src/handlers.rs` | Upload exceeding `file_size_limit` returns 413 |
| `test_bucket_allowed_mime_types` | `storage/src/handlers.rs` | Upload with disallowed MIME type returns 415 |
| `test_tus_multipart_completion` | `storage/src/tus.rs` | TUS completion assembles parts via S3 multipart, not in-memory buffer |
| `test_health_check_minio_up` | `storage/src/handlers.rs` | `/health` returns 200 when S3 is reachable |
| `test_health_check_minio_down` | `storage/src/handlers.rs` | `/health` returns 503 when S3 is unreachable |
| `test_storage_mode_self_hosted` | `storage/src/backend.rs` | `STORAGE_MODE=self-hosted` initializes with MinIO endpoint |
| `test_storage_mode_cloud` | `storage/src/backend.rs` | `STORAGE_MODE=cloud` initializes with custom S3 endpoint |
### 2. Integration Verification
- [ ] `STORAGE_MODE=self-hosted docker compose -f docker-compose.pillar-storage.yml up` starts MinIO and passes health checks
- [ ] Upload a 10MB file via `POST /storage/v1/object/test-bucket/big-file.bin` — verify it doesn't OOM
- [ ] Download a 10MB file — verify streaming (no OOM)
- [ ] Delete an object via `DELETE /storage/v1/object/test-bucket/file.txt` — verify removed from S3 and DB
- [ ] Copy an object — verify new key exists in S3
- [ ] Move an object — verify old key removed, new key exists
- [ ] Upload to a bucket with `file_size_limit = 1000` — verify rejection for files over 1KB
- [ ] TUS upload of a 50MB file completes without loading into memory
- [ ] `GET /storage/v1/health` returns 200 when MinIO is up, 503 when down
- [ ] `STORAGE_MODE=cloud S3_ENDPOINT=https://fsn1.your-objectstorage.com ...` works with Hetzner Object Storage
- [ ] Every route in the Route Summary table above returns the correct response for both success and error cases
### 3. supabase-js Client Compatibility
- [ ] `supabase.storage.listBuckets()` works
- [ ] `supabase.storage.from('bucket').upload('file.txt', blob)` works
- [ ] `supabase.storage.from('bucket').download('file.txt')` works
- [ ] `supabase.storage.from('bucket').remove(['file.txt'])` works
- [ ] `supabase.storage.from('bucket').copy('a.txt', 'b.txt')` works
- [ ] `supabase.storage.from('bucket').move('a.txt', 'c.txt')` works
- [ ] `supabase.storage.from('bucket').createSignedUrl('file.txt', 3600)` works
- [ ] `supabase.storage.from('public-bucket').getPublicUrl('file.txt')` works
### 4. CI Gate
- [ ] All unit tests run in `cargo test --workspace`
- [ ] Integration tests that require MinIO are gated behind `#[cfg(feature = "integration")]` or `#[ignore]` with clear documentation
- [ ] CI runs unit tests on every PR; integration tests run on merge to main (or nightly)