Some checks failed
CI/CD Pipeline / unit-tests (push) Failing after 1m16s
CI/CD Pipeline / integration-tests (push) Failing after 2m32s
CI/CD Pipeline / lint (push) Successful in 5m22s
CI/CD Pipeline / e2e-tests (push) Has been skipped
CI/CD Pipeline / build (push) Has been skipped
518 lines
17 KiB
Markdown
518 lines
17 KiB
Markdown
# Milestone 2: Storage Pillar
|
|
|
|
**Goal:** Storage becomes a first-class pillar supporting self-hosted MinIO or cloud S3 (Hetzner Object Storage, AWS S3, Backblaze B2). Complete the supabase-js `storage` API surface.
|
|
|
|
**Depends on:** M1 (Foundation)
|
|
|
|
---
|
|
|
|
## 2.1 — Storage Pillar Compose & Configuration
|
|
|
|
### 2.1.1 Create docker-compose.pillar-storage.yml
|
|
|
|
This compose file is used only for **self-hosted mode**. In cloud mode, workers connect directly to the external S3 endpoint and this compose file is not needed.
|
|
|
|
```yaml
|
|
# MadBase - Pillar: Storage (Self-Hosted)
|
|
# S3-compatible object storage via MinIO
|
|
|
|
services:
|
|
minio:
|
|
image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
|
|
container_name: madbase_minio
|
|
command: server /data --console-address ":9001"
|
|
ports:
|
|
- "9000:9000"
|
|
- "9001:9001"
|
|
environment:
|
|
MINIO_ROOT_USER: ${S3_ACCESS_KEY}
|
|
MINIO_ROOT_PASSWORD: ${S3_SECRET_KEY}
|
|
MINIO_BROWSER_REDIRECT_URL: http://localhost:9001
|
|
volumes:
|
|
- minio_data:/data
|
|
healthcheck:
|
|
test: ["CMD", "mc", "ready", "local"]
|
|
interval: 10s
|
|
timeout: 5s
|
|
retries: 5
|
|
restart: unless-stopped
|
|
|
|
volumes:
|
|
minio_data:
|
|
|
|
networks:
|
|
default:
|
|
name: madbase
|
|
external: true
|
|
```
|
|
|
|
### 2.1.2 Add STORAGE_MODE env var
|
|
|
|
**File:** `common/src/config.rs`
|
|
|
|
Add to `Config`:
|
|
```rust
|
|
pub storage_mode: StorageMode,
|
|
pub s3_endpoint: String,
|
|
pub s3_access_key: String,
|
|
pub s3_secret_key: String,
|
|
pub s3_bucket: String,
|
|
pub s3_region: String,
|
|
```
|
|
|
|
```rust
|
|
#[derive(Clone, Debug)]
|
|
pub enum StorageMode {
|
|
Cloud, // External S3 (Hetzner, AWS, B2)
|
|
SelfHosted, // MinIO
|
|
}
|
|
```
|
|
|
|
Load from env:
|
|
```rust
|
|
let storage_mode = match env::var("STORAGE_MODE").unwrap_or_else(|_| "self-hosted".into()).as_str() {
|
|
"cloud" | "s3" => StorageMode::Cloud,
|
|
_ => StorageMode::SelfHosted,
|
|
};
|
|
```
|
|
|
|
### 2.1.3 Create storage-node.yaml template
|
|
|
|
**File:** `templates/storage-node.yaml`
|
|
|
|
```yaml
|
|
id: storage-node
|
|
name: Dedicated Storage Node
|
|
description: MinIO object storage for self-hosted deployments
|
|
version: 1.0
|
|
|
|
min_hetzner_plan: CX21
|
|
estimated_monthly_cost: 6.94
|
|
|
|
services:
|
|
- id: minio
|
|
name: MinIO
|
|
image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
|
|
ports: ["9000:9000", "9001:9001"]
|
|
command: ["server", "/data", "--console-address", ":9001"]
|
|
volumes:
|
|
- minio_data:/data
|
|
resource_profile: storage_intensive
|
|
|
|
requirements:
|
|
min_nodes: 1
|
|
max_nodes: 4
|
|
supports_ha: true
|
|
recommended_deployment: "Dedicated node with attached block storage"
|
|
|
|
notes: |
|
|
For HA, use distributed MinIO with 4+ nodes and erasure coding.
|
|
For cloud deployments, skip this node — use Hetzner Object Storage.
|
|
Estimated storage: 1TB on CX21 block storage = ~€6/mo additional.
|
|
```
|
|
|
|
### 2.1.4 Add shared Docker network
|
|
|
|
Add to each `docker-compose.pillar-*.yml`:
|
|
|
|
```yaml
|
|
networks:
|
|
default:
|
|
name: madbase
|
|
external: true
|
|
```
|
|
|
|
Create the network before first use: `docker network create madbase`
|
|
|
|
---
|
|
|
|
## 2.2 — Storage Backend Improvements
|
|
|
|
### 2.2.1 Route handlers through StorageBackend trait
|
|
|
|
**Current problem:** `StorageState` holds a raw `aws_sdk_s3::Client` and handlers call `state.s3_client.put_object()` directly, bypassing the `StorageBackend` trait entirely. The trait exists but is unused.
|
|
|
|
**Fix:**
|
|
|
|
1. Expand the `StorageBackend` trait:
|
|
|
|
```rust
|
|
#[async_trait]
|
|
pub trait StorageBackend: Send + Sync {
|
|
async fn put_object(&self, bucket: &str, key: &str, data: Bytes, content_type: Option<&str>) -> Result<()>;
|
|
async fn get_object(&self, bucket: &str, key: &str) -> Result<GetObjectResponse>;
|
|
async fn delete_object(&self, bucket: &str, key: &str) -> Result<()>;
|
|
async fn copy_object(&self, bucket: &str, src_key: &str, dst_key: &str) -> Result<()>;
|
|
async fn create_bucket(&self, bucket: &str) -> Result<()>;
|
|
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
|
|
async fn head_object(&self, bucket: &str, key: &str) -> Result<ObjectMetadata>;
|
|
async fn list_objects(&self, bucket: &str, prefix: &str) -> Result<Vec<ObjectMetadata>>;
|
|
}
|
|
|
|
pub struct GetObjectResponse {
|
|
pub body: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>,
|
|
pub content_type: Option<String>,
|
|
pub content_length: Option<i64>,
|
|
}
|
|
```
|
|
|
|
2. Change `StorageState`:
|
|
|
|
```rust
|
|
#[derive(Clone)]
|
|
pub struct StorageState {
|
|
pub db: PgPool,
|
|
pub backend: Arc<dyn StorageBackend>,
|
|
pub config: Config,
|
|
pub bucket_name: String,
|
|
}
|
|
```
|
|
|
|
3. Update `storage/src/lib.rs` init:
|
|
|
|
```rust
|
|
pub async fn init(db: PgPool, config: Config) -> Router {
|
|
let backend: Arc<dyn StorageBackend> = Arc::new(
|
|
AwsS3Backend::new(&config).await.expect("Failed to init storage backend")
|
|
);
|
|
let bucket_name = config.s3_bucket.clone();
|
|
backend.create_bucket(&bucket_name).await.ok();
|
|
|
|
let state = StorageState { db, backend, config, bucket_name };
|
|
// ...routes...
|
|
}
|
|
```
|
|
|
|
### 2.2.2 Add streaming to StorageBackend
|
|
|
|
Replace `get_object() -> Bytes` with streaming response. The AWS SDK already supports this:
|
|
|
|
```rust
|
|
async fn get_object(&self, _bucket: &str, key: &str) -> Result<GetObjectResponse> {
|
|
let resp = self.client.get_object()
|
|
.bucket(&self.bucket_name)
|
|
.key(key)
|
|
.send()
|
|
.await?;
|
|
|
|
let stream = resp.body.into_async_read();
|
|
let byte_stream = tokio_util::io::ReaderStream::new(stream);
|
|
let mapped = byte_stream.map(|r| r.map_err(|e| anyhow::anyhow!(e)));
|
|
|
|
Ok(GetObjectResponse {
|
|
body: Box::pin(mapped),
|
|
content_type: resp.content_type.map(|s| s.to_string()),
|
|
content_length: resp.content_length,
|
|
})
|
|
}
|
|
```
|
|
|
|
In the handler, convert to axum Body:
|
|
|
|
```rust
|
|
let resp = state.backend.get_object(&state.bucket_name, &key).await?;
|
|
let body = Body::from_stream(resp.body);
|
|
Ok((headers, body))
|
|
```
|
|
|
|
### 2.2.3 Add missing HTTP endpoints
|
|
|
|
**Delete object:** `DELETE /storage/v1/object/:bucket_id/*filename`
|
|
|
|
```rust
|
|
pub async fn delete_object(
|
|
State(state): State<StorageState>,
|
|
Extension(auth_ctx): Extension<AuthContext>,
|
|
Extension(project_ctx): Extension<ProjectContext>,
|
|
Path((bucket_id, filename)): Path<(String, String)>,
|
|
db: Option<Extension<PgPool>>,
|
|
) -> Result<StatusCode, ApiError> {
|
|
let pool = db.map(|Extension(p)| p).unwrap_or(state.db.clone());
|
|
let mut rls = RlsTransaction::begin(&pool, &auth_ctx).await?;
|
|
|
|
// Verify object exists under RLS
|
|
let exists = sqlx::query_scalar::<_, Uuid>(
|
|
"SELECT id FROM storage.objects WHERE bucket_id = $1 AND name = $2"
|
|
)
|
|
.bind(&bucket_id).bind(&filename)
|
|
.fetch_optional(&mut *rls.tx).await?;
|
|
|
|
if exists.is_none() {
|
|
return Err(ApiError::NotFound("Object not found".into()));
|
|
}
|
|
|
|
// Delete from S3
|
|
let key = format!("{}/{}/{}", project_ctx.project_ref, bucket_id, filename);
|
|
state.backend.delete_object(&state.bucket_name, &key).await
|
|
.map_err(|e| ApiError::Internal(e.to_string()))?;
|
|
|
|
// Delete from DB
|
|
sqlx::query("DELETE FROM storage.objects WHERE bucket_id = $1 AND name = $2")
|
|
.bind(&bucket_id).bind(&filename)
|
|
.execute(&mut *rls.tx).await?;
|
|
|
|
rls.commit().await?;
|
|
Ok(StatusCode::NO_CONTENT)
|
|
}
|
|
```
|
|
|
|
**Delete bucket:** `DELETE /storage/v1/bucket/:bucket_id`
|
|
|
|
**Copy object:** `POST /storage/v1/object/copy` with `{ "sourceKey": "bucket/path", "destinationKey": "bucket/path" }`
|
|
|
|
**Move object:** `POST /storage/v1/object/move` (copy + delete source)
|
|
|
|
**Public URL:** `GET /storage/v1/object/public/:bucket_id/*filename` — check `storage.buckets.public = true`, return redirect to S3 presigned URL or stream directly.
|
|
|
|
### 2.2.4 Add bucket constraints
|
|
|
|
**Migration:** Add columns to `storage.buckets`:
|
|
|
|
```sql
|
|
ALTER TABLE storage.buckets
|
|
ADD COLUMN IF NOT EXISTS file_size_limit BIGINT,
|
|
ADD COLUMN IF NOT EXISTS allowed_mime_types TEXT[];
|
|
```
|
|
|
|
**Validation in upload handler:**
|
|
|
|
```rust
|
|
// After fetching bucket info
|
|
if let Some(limit) = bucket.file_size_limit {
|
|
if data.len() as i64 > limit {
|
|
return Err(ApiError::BadRequest(format!(
|
|
"File size {} exceeds bucket limit {}", data.len(), limit
|
|
)));
|
|
}
|
|
}
|
|
if let Some(allowed) = &bucket.allowed_mime_types {
|
|
if !allowed.is_empty() && !allowed.contains(&content_type.to_string()) {
|
|
return Err(ApiError::BadRequest(format!(
|
|
"MIME type {} not allowed in this bucket", content_type
|
|
)));
|
|
}
|
|
}
|
|
```
|
|
|
|
### 2.2.5 Fix TUS completion — use S3 multipart upload
|
|
|
|
**File:** `storage/src/tus.rs` — `tus_patch_upload` completion block (line ~252)
|
|
|
|
**Current problem:** `fs::read(&upload_path)` loads the entire completed file into memory.
|
|
|
|
**Fix:** Use S3 multipart upload. On TUS create, start a multipart upload. On each PATCH, upload that chunk as a part. On completion, finalize the multipart upload.
|
|
|
|
Store the multipart upload ID in the `.info` file:
|
|
|
|
```json
|
|
{
|
|
"upload_length": 104857600,
|
|
"bucket_id": "avatars",
|
|
"filename": "photo.jpg",
|
|
"s3_upload_id": "abc123...",
|
|
"parts": [
|
|
{ "part_number": 1, "etag": "\"abc\"", "size": 5242880 },
|
|
{ "part_number": 2, "etag": "\"def\"", "size": 5242880 }
|
|
]
|
|
}
|
|
```
|
|
|
|
On PATCH:
|
|
```rust
|
|
let part_number = (current_offset / PART_SIZE) as i32 + 1;
|
|
let upload_part = state.backend.client()
|
|
.upload_part()
|
|
.bucket(&state.bucket_name)
|
|
.key(&key)
|
|
.upload_id(&s3_upload_id)
|
|
.part_number(part_number)
|
|
.body(ByteStream::from(data))
|
|
.send()
|
|
.await?;
|
|
// Store etag in info file
|
|
```
|
|
|
|
On completion:
|
|
```rust
|
|
state.backend.client()
|
|
.complete_multipart_upload()
|
|
.bucket(&state.bucket_name)
|
|
.key(&key)
|
|
.upload_id(&s3_upload_id)
|
|
.multipart_upload(completed_parts)
|
|
.send()
|
|
.await?;
|
|
// Clean up local temp files
|
|
```
|
|
|
|
> **Note:** S3 multipart parts must be at least 5MB (except the last part). Buffer PATCH data until 5MB before uploading a part.
|
|
|
|
---
|
|
|
|
## 2.3 — Storage Health & Observability
|
|
|
|
### 2.3.1 Health check endpoint
|
|
|
|
Add to `storage/src/lib.rs` router:
|
|
|
|
```rust
|
|
.route("/health", get(health_check))
|
|
```
|
|
|
|
```rust
|
|
async fn health_check(State(state): State<StorageState>) -> Result<&'static str, StatusCode> {
|
|
state.backend.head_bucket(&state.bucket_name)
|
|
.await
|
|
.map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?;
|
|
Ok("OK")
|
|
}
|
|
```
|
|
|
|
### 2.3.2 Structured logging
|
|
|
|
Replace all `tracing::info!("File size: {} bytes", size)` with structured fields:
|
|
|
|
```rust
|
|
tracing::info!(
|
|
bucket = %bucket_id,
|
|
filename = %filename,
|
|
size_bytes = size,
|
|
"Upload completed"
|
|
);
|
|
```
|
|
|
|
### 2.3.3 Image transforms — run async
|
|
|
|
**File:** `storage/src/handlers.rs` — `transform_image` function (line 328)
|
|
|
|
Currently runs synchronously, blocking the async runtime. Use `tokio::task::spawn_blocking`:
|
|
|
|
```rust
|
|
if width.is_some() || height.is_some() || format.is_some() {
|
|
let body_clone = body_bytes.clone();
|
|
match tokio::task::spawn_blocking(move || {
|
|
transform_image(body_clone, width, height, quality, format)
|
|
}).await {
|
|
Ok(Ok((new_bytes, new_ct))) => { ... },
|
|
Ok(Err(e)) => { tracing::warn!(error = %e, "Image transform failed"); },
|
|
Err(e) => { tracing::warn!(error = %e, "Image transform panicked"); },
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 2.4 — MinIO HA (Optional)
|
|
|
|
### 2.4.1 Distributed MinIO documentation
|
|
|
|
For self-hosted production with HA, document the distributed mode setup:
|
|
|
|
```yaml
|
|
# docker-compose.pillar-storage-ha.yml
|
|
services:
|
|
minio1:
|
|
image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
|
|
command: server http://minio{1...4}/data --console-address ":9001"
|
|
# ... same for minio2, minio3, minio4
|
|
```
|
|
|
|
Requires 4 nodes minimum for erasure coding. Each node needs its own block storage volume.
|
|
|
|
### 2.4.2 Lifecycle rules
|
|
|
|
Configure via MinIO client:
|
|
|
|
```bash
|
|
mc ilm rule add madbase/madbase \
|
|
--expire-delete-marker \
|
|
--noncurrent-expire-days 30 \
|
|
--prefix "tus-temp/"
|
|
```
|
|
|
|
This auto-cleans incomplete TUS uploads after 30 days.
|
|
|
|
---
|
|
|
|
## Route Summary (after M2)
|
|
|
|
| Method | Path | Handler | supabase-js method |
|
|
|--------|------|---------|-------------------|
|
|
| GET | `/storage/v1/bucket` | `list_buckets` | `listBuckets()` |
|
|
| POST | `/storage/v1/bucket` | `create_bucket` | `createBucket()` |
|
|
| DELETE | `/storage/v1/bucket/:id` | `delete_bucket` | `deleteBucket()` |
|
|
| POST | `/storage/v1/object/list/:bucket_id` | `list_objects` | `list()` |
|
|
| POST | `/storage/v1/object/:bucket_id/*filename` | `upload_object` | `upload()` |
|
|
| GET | `/storage/v1/object/:bucket_id/*filename` | `download_object` | `download()` |
|
|
| DELETE | `/storage/v1/object/:bucket_id/*filename` | `delete_object` | `remove()` |
|
|
| POST | `/storage/v1/object/copy` | `copy_object` | `copy()` |
|
|
| POST | `/storage/v1/object/move` | `move_object` | `move()` |
|
|
| POST | `/storage/v1/object/sign/:bucket_id/*filename` | `sign_object` | `createSignedUrl()` |
|
|
| GET | `/storage/v1/object/sign/:bucket_id/*filename` | `get_signed_object` | (signed URL access) |
|
|
| GET | `/storage/v1/object/public/:bucket_id/*filename` | `get_public_url` | `getPublicUrl()` |
|
|
| POST | `/storage/v1/upload/resumable` | `tus_create_upload` | (TUS) |
|
|
| PATCH | `/storage/v1/upload/resumable/:id` | `tus_patch_upload` | (TUS) |
|
|
| HEAD | `/storage/v1/upload/resumable/:id` | `tus_head_upload` | (TUS) |
|
|
| GET | `/storage/v1/health` | `health_check` | — |
|
|
|
|
---
|
|
|
|
## Completion Requirements
|
|
|
|
This milestone is **not complete** until every item below is satisfied.
|
|
|
|
### 1. Full Test Suite — All Green
|
|
|
|
- [ ] `cargo test --workspace` passes with **zero failures**
|
|
- [ ] All **pre-existing tests** still pass (no regressions)
|
|
- [ ] **New unit tests** are written for every feature in this milestone:
|
|
|
|
| Test | Location | What it validates |
|
|
|------|----------|-------------------|
|
|
| `test_s3_put_object` | `storage/src/backend.rs` | `put_object` stores bytes and returns Ok |
|
|
| `test_s3_get_object_streaming` | `storage/src/backend.rs` | `get_object` returns a streaming body, not buffered |
|
|
| `test_s3_delete_object` | `storage/src/backend.rs` | `delete_object` removes the key; subsequent `head_object` returns NotFound |
|
|
| `test_s3_copy_object` | `storage/src/backend.rs` | `copy_object` duplicates object; both keys exist |
|
|
| `test_s3_move_object` | `storage/src/backend.rs` | After `move_object`, old key is gone, new key exists |
|
|
| `test_s3_list_objects` | `storage/src/backend.rs` | `list_objects` returns correct prefix-filtered results |
|
|
| `test_s3_head_object_metadata` | `storage/src/backend.rs` | `head_object` returns correct size and content_type |
|
|
| `test_s3_create_and_delete_bucket` | `storage/src/backend.rs` | `create_bucket` + `delete_bucket` round-trip succeeds |
|
|
| `test_bucket_file_size_limit` | `storage/src/handlers.rs` | Upload exceeding `file_size_limit` returns 413 |
|
|
| `test_bucket_allowed_mime_types` | `storage/src/handlers.rs` | Upload with disallowed MIME type returns 415 |
|
|
| `test_tus_multipart_completion` | `storage/src/tus.rs` | TUS completion assembles parts via S3 multipart, not in-memory buffer |
|
|
| `test_health_check_minio_up` | `storage/src/handlers.rs` | `/health` returns 200 when S3 is reachable |
|
|
| `test_health_check_minio_down` | `storage/src/handlers.rs` | `/health` returns 503 when S3 is unreachable |
|
|
| `test_storage_mode_self_hosted` | `storage/src/backend.rs` | `STORAGE_MODE=self-hosted` initializes with MinIO endpoint |
|
|
| `test_storage_mode_cloud` | `storage/src/backend.rs` | `STORAGE_MODE=cloud` initializes with custom S3 endpoint |
|
|
|
|
### 2. Integration Verification
|
|
|
|
- [ ] `STORAGE_MODE=self-hosted docker compose -f docker-compose.pillar-storage.yml up` starts MinIO and passes health checks
|
|
- [ ] Upload a 10MB file via `POST /storage/v1/object/test-bucket/big-file.bin` — verify it doesn't OOM
|
|
- [ ] Download a 10MB file — verify streaming (no OOM)
|
|
- [ ] Delete an object via `DELETE /storage/v1/object/test-bucket/file.txt` — verify removed from S3 and DB
|
|
- [ ] Copy an object — verify new key exists in S3
|
|
- [ ] Move an object — verify old key removed, new key exists
|
|
- [ ] Upload to a bucket with `file_size_limit = 1000` — verify rejection for files over 1KB
|
|
- [ ] TUS upload of a 50MB file completes without loading into memory
|
|
- [ ] `GET /storage/v1/health` returns 200 when MinIO is up, 503 when down
|
|
- [ ] `STORAGE_MODE=cloud S3_ENDPOINT=https://fsn1.your-objectstorage.com ...` works with Hetzner Object Storage
|
|
- [ ] Every route in the Route Summary table above returns the correct response for both success and error cases
|
|
|
|
### 3. supabase-js Client Compatibility
|
|
|
|
- [ ] `supabase.storage.listBuckets()` works
|
|
- [ ] `supabase.storage.from('bucket').upload('file.txt', blob)` works
|
|
- [ ] `supabase.storage.from('bucket').download('file.txt')` works
|
|
- [ ] `supabase.storage.from('bucket').remove(['file.txt'])` works
|
|
- [ ] `supabase.storage.from('bucket').copy('a.txt', 'b.txt')` works
|
|
- [ ] `supabase.storage.from('bucket').move('a.txt', 'c.txt')` works
|
|
- [ ] `supabase.storage.from('bucket').createSignedUrl('file.txt', 3600)` works
|
|
- [ ] `supabase.storage.from('public-bucket').getPublicUrl('file.txt')` works
|
|
|
|
### 4. CI Gate
|
|
|
|
- [ ] All unit tests run in `cargo test --workspace`
|
|
- [ ] Integration tests that require MinIO are gated behind `#[cfg(feature = "integration")]` or `#[ignore]` with clear documentation
|
|
- [ ] CI runs unit tests on every PR; integration tests run on merge to main (or nightly)
|