17 KiB
Milestone 2: Storage Pillar
Goal: Storage becomes a first-class pillar supporting self-hosted MinIO or cloud S3 (Hetzner Object Storage, AWS S3, Backblaze B2). Complete the supabase-js storage API surface.
Depends on: M1 (Foundation)
2.1 — Storage Pillar Compose & Configuration
2.1.1 Create docker-compose.pillar-storage.yml
This compose file is used only for self-hosted mode. In cloud mode, workers connect directly to the external S3 endpoint and this compose file is not needed.
# MadBase - Pillar: Storage (Self-Hosted)
# S3-compatible object storage via MinIO
services:
minio:
image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
container_name: madbase_minio
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: ${S3_ACCESS_KEY}
MINIO_ROOT_PASSWORD: ${S3_SECRET_KEY}
MINIO_BROWSER_REDIRECT_URL: http://localhost:9001
volumes:
- minio_data:/data
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
volumes:
minio_data:
networks:
default:
name: madbase
external: true
2.1.2 Add STORAGE_MODE env var
File: common/src/config.rs
Add to Config:
pub storage_mode: StorageMode,
pub s3_endpoint: String,
pub s3_access_key: String,
pub s3_secret_key: String,
pub s3_bucket: String,
pub s3_region: String,
#[derive(Clone, Debug)]
pub enum StorageMode {
Cloud, // External S3 (Hetzner, AWS, B2)
SelfHosted, // MinIO
}
Load from env:
let storage_mode = match env::var("STORAGE_MODE").unwrap_or_else(|_| "self-hosted".into()).as_str() {
"cloud" | "s3" => StorageMode::Cloud,
_ => StorageMode::SelfHosted,
};
2.1.3 Create storage-node.yaml template
File: templates/storage-node.yaml
id: storage-node
name: Dedicated Storage Node
description: MinIO object storage for self-hosted deployments
version: 1.0
min_hetzner_plan: CX21
estimated_monthly_cost: 6.94
services:
- id: minio
name: MinIO
image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
ports: ["9000:9000", "9001:9001"]
command: ["server", "/data", "--console-address", ":9001"]
volumes:
- minio_data:/data
resource_profile: storage_intensive
requirements:
min_nodes: 1
max_nodes: 4
supports_ha: true
recommended_deployment: "Dedicated node with attached block storage"
notes: |
For HA, use distributed MinIO with 4+ nodes and erasure coding.
For cloud deployments, skip this node — use Hetzner Object Storage.
Estimated storage: 1TB on CX21 block storage = ~€6/mo additional.
2.1.4 Add shared Docker network
Add to each docker-compose.pillar-*.yml:
networks:
default:
name: madbase
external: true
Create the network before first use: docker network create madbase
2.2 — Storage Backend Improvements
2.2.1 Route handlers through StorageBackend trait
Current problem: StorageState holds a raw aws_sdk_s3::Client and handlers call state.s3_client.put_object() directly, bypassing the StorageBackend trait entirely. The trait exists but is unused.
Fix:
- Expand the
StorageBackendtrait:
#[async_trait]
pub trait StorageBackend: Send + Sync {
async fn put_object(&self, bucket: &str, key: &str, data: Bytes, content_type: Option<&str>) -> Result<()>;
async fn get_object(&self, bucket: &str, key: &str) -> Result<GetObjectResponse>;
async fn delete_object(&self, bucket: &str, key: &str) -> Result<()>;
async fn copy_object(&self, bucket: &str, src_key: &str, dst_key: &str) -> Result<()>;
async fn create_bucket(&self, bucket: &str) -> Result<()>;
async fn delete_bucket(&self, bucket: &str) -> Result<()>;
async fn head_object(&self, bucket: &str, key: &str) -> Result<ObjectMetadata>;
async fn list_objects(&self, bucket: &str, prefix: &str) -> Result<Vec<ObjectMetadata>>;
}
pub struct GetObjectResponse {
pub body: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>,
pub content_type: Option<String>,
pub content_length: Option<i64>,
}
- Change
StorageState:
#[derive(Clone)]
pub struct StorageState {
pub db: PgPool,
pub backend: Arc<dyn StorageBackend>,
pub config: Config,
pub bucket_name: String,
}
- Update
storage/src/lib.rsinit:
pub async fn init(db: PgPool, config: Config) -> Router {
let backend: Arc<dyn StorageBackend> = Arc::new(
AwsS3Backend::new(&config).await.expect("Failed to init storage backend")
);
let bucket_name = config.s3_bucket.clone();
backend.create_bucket(&bucket_name).await.ok();
let state = StorageState { db, backend, config, bucket_name };
// ...routes...
}
2.2.2 Add streaming to StorageBackend
Replace get_object() -> Bytes with streaming response. The AWS SDK already supports this:
async fn get_object(&self, _bucket: &str, key: &str) -> Result<GetObjectResponse> {
let resp = self.client.get_object()
.bucket(&self.bucket_name)
.key(key)
.send()
.await?;
let stream = resp.body.into_async_read();
let byte_stream = tokio_util::io::ReaderStream::new(stream);
let mapped = byte_stream.map(|r| r.map_err(|e| anyhow::anyhow!(e)));
Ok(GetObjectResponse {
body: Box::pin(mapped),
content_type: resp.content_type.map(|s| s.to_string()),
content_length: resp.content_length,
})
}
In the handler, convert to axum Body:
let resp = state.backend.get_object(&state.bucket_name, &key).await?;
let body = Body::from_stream(resp.body);
Ok((headers, body))
2.2.3 Add missing HTTP endpoints
Delete object: DELETE /storage/v1/object/:bucket_id/*filename
pub async fn delete_object(
State(state): State<StorageState>,
Extension(auth_ctx): Extension<AuthContext>,
Extension(project_ctx): Extension<ProjectContext>,
Path((bucket_id, filename)): Path<(String, String)>,
db: Option<Extension<PgPool>>,
) -> Result<StatusCode, ApiError> {
let pool = db.map(|Extension(p)| p).unwrap_or(state.db.clone());
let mut rls = RlsTransaction::begin(&pool, &auth_ctx).await?;
// Verify object exists under RLS
let exists = sqlx::query_scalar::<_, Uuid>(
"SELECT id FROM storage.objects WHERE bucket_id = $1 AND name = $2"
)
.bind(&bucket_id).bind(&filename)
.fetch_optional(&mut *rls.tx).await?;
if exists.is_none() {
return Err(ApiError::NotFound("Object not found".into()));
}
// Delete from S3
let key = format!("{}/{}/{}", project_ctx.project_ref, bucket_id, filename);
state.backend.delete_object(&state.bucket_name, &key).await
.map_err(|e| ApiError::Internal(e.to_string()))?;
// Delete from DB
sqlx::query("DELETE FROM storage.objects WHERE bucket_id = $1 AND name = $2")
.bind(&bucket_id).bind(&filename)
.execute(&mut *rls.tx).await?;
rls.commit().await?;
Ok(StatusCode::NO_CONTENT)
}
Delete bucket: DELETE /storage/v1/bucket/:bucket_id
Copy object: POST /storage/v1/object/copy with { "sourceKey": "bucket/path", "destinationKey": "bucket/path" }
Move object: POST /storage/v1/object/move (copy + delete source)
Public URL: GET /storage/v1/object/public/:bucket_id/*filename — check storage.buckets.public = true, return redirect to S3 presigned URL or stream directly.
2.2.4 Add bucket constraints
Migration: Add columns to storage.buckets:
ALTER TABLE storage.buckets
ADD COLUMN IF NOT EXISTS file_size_limit BIGINT,
ADD COLUMN IF NOT EXISTS allowed_mime_types TEXT[];
Validation in upload handler:
// After fetching bucket info
if let Some(limit) = bucket.file_size_limit {
if data.len() as i64 > limit {
return Err(ApiError::BadRequest(format!(
"File size {} exceeds bucket limit {}", data.len(), limit
)));
}
}
if let Some(allowed) = &bucket.allowed_mime_types {
if !allowed.is_empty() && !allowed.contains(&content_type.to_string()) {
return Err(ApiError::BadRequest(format!(
"MIME type {} not allowed in this bucket", content_type
)));
}
}
2.2.5 Fix TUS completion — use S3 multipart upload
File: storage/src/tus.rs — tus_patch_upload completion block (line ~252)
Current problem: fs::read(&upload_path) loads the entire completed file into memory.
Fix: Use S3 multipart upload. On TUS create, start a multipart upload. On each PATCH, upload that chunk as a part. On completion, finalize the multipart upload.
Store the multipart upload ID in the .info file:
{
"upload_length": 104857600,
"bucket_id": "avatars",
"filename": "photo.jpg",
"s3_upload_id": "abc123...",
"parts": [
{ "part_number": 1, "etag": "\"abc\"", "size": 5242880 },
{ "part_number": 2, "etag": "\"def\"", "size": 5242880 }
]
}
On PATCH:
let part_number = (current_offset / PART_SIZE) as i32 + 1;
let upload_part = state.backend.client()
.upload_part()
.bucket(&state.bucket_name)
.key(&key)
.upload_id(&s3_upload_id)
.part_number(part_number)
.body(ByteStream::from(data))
.send()
.await?;
// Store etag in info file
On completion:
state.backend.client()
.complete_multipart_upload()
.bucket(&state.bucket_name)
.key(&key)
.upload_id(&s3_upload_id)
.multipart_upload(completed_parts)
.send()
.await?;
// Clean up local temp files
Note: S3 multipart parts must be at least 5MB (except the last part). Buffer PATCH data until 5MB before uploading a part.
2.3 — Storage Health & Observability
2.3.1 Health check endpoint
Add to storage/src/lib.rs router:
.route("/health", get(health_check))
async fn health_check(State(state): State<StorageState>) -> Result<&'static str, StatusCode> {
state.backend.head_bucket(&state.bucket_name)
.await
.map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?;
Ok("OK")
}
2.3.2 Structured logging
Replace all tracing::info!("File size: {} bytes", size) with structured fields:
tracing::info!(
bucket = %bucket_id,
filename = %filename,
size_bytes = size,
"Upload completed"
);
2.3.3 Image transforms — run async
File: storage/src/handlers.rs — transform_image function (line 328)
Currently runs synchronously, blocking the async runtime. Use tokio::task::spawn_blocking:
if width.is_some() || height.is_some() || format.is_some() {
let body_clone = body_bytes.clone();
match tokio::task::spawn_blocking(move || {
transform_image(body_clone, width, height, quality, format)
}).await {
Ok(Ok((new_bytes, new_ct))) => { ... },
Ok(Err(e)) => { tracing::warn!(error = %e, "Image transform failed"); },
Err(e) => { tracing::warn!(error = %e, "Image transform panicked"); },
}
}
2.4 — MinIO HA (Optional)
2.4.1 Distributed MinIO documentation
For self-hosted production with HA, document the distributed mode setup:
# docker-compose.pillar-storage-ha.yml
services:
minio1:
image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
command: server http://minio{1...4}/data --console-address ":9001"
# ... same for minio2, minio3, minio4
Requires 4 nodes minimum for erasure coding. Each node needs its own block storage volume.
2.4.2 Lifecycle rules
Configure via MinIO client:
mc ilm rule add madbase/madbase \
--expire-delete-marker \
--noncurrent-expire-days 30 \
--prefix "tus-temp/"
This auto-cleans incomplete TUS uploads after 30 days.
Route Summary (after M2)
| Method | Path | Handler | supabase-js method |
|---|---|---|---|
| GET | /storage/v1/bucket |
list_buckets |
listBuckets() |
| POST | /storage/v1/bucket |
create_bucket |
createBucket() |
| DELETE | /storage/v1/bucket/:id |
delete_bucket |
deleteBucket() |
| POST | /storage/v1/object/list/:bucket_id |
list_objects |
list() |
| POST | /storage/v1/object/:bucket_id/*filename |
upload_object |
upload() |
| GET | /storage/v1/object/:bucket_id/*filename |
download_object |
download() |
| DELETE | /storage/v1/object/:bucket_id/*filename |
delete_object |
remove() |
| POST | /storage/v1/object/copy |
copy_object |
copy() |
| POST | /storage/v1/object/move |
move_object |
move() |
| POST | /storage/v1/object/sign/:bucket_id/*filename |
sign_object |
createSignedUrl() |
| GET | /storage/v1/object/sign/:bucket_id/*filename |
get_signed_object |
(signed URL access) |
| GET | /storage/v1/object/public/:bucket_id/*filename |
get_public_url |
getPublicUrl() |
| POST | /storage/v1/upload/resumable |
tus_create_upload |
(TUS) |
| PATCH | /storage/v1/upload/resumable/:id |
tus_patch_upload |
(TUS) |
| HEAD | /storage/v1/upload/resumable/:id |
tus_head_upload |
(TUS) |
| GET | /storage/v1/health |
health_check |
— |
Completion Requirements
This milestone is not complete until every item below is satisfied.
1. Full Test Suite — All Green
cargo test --workspacepasses with zero failures- All pre-existing tests still pass (no regressions)
- New unit tests are written for every feature in this milestone:
| Test | Location | What it validates |
|---|---|---|
test_s3_put_object |
storage/src/backend.rs |
put_object stores bytes and returns Ok |
test_s3_get_object_streaming |
storage/src/backend.rs |
get_object returns a streaming body, not buffered |
test_s3_delete_object |
storage/src/backend.rs |
delete_object removes the key; subsequent head_object returns NotFound |
test_s3_copy_object |
storage/src/backend.rs |
copy_object duplicates object; both keys exist |
test_s3_move_object |
storage/src/backend.rs |
After move_object, old key is gone, new key exists |
test_s3_list_objects |
storage/src/backend.rs |
list_objects returns correct prefix-filtered results |
test_s3_head_object_metadata |
storage/src/backend.rs |
head_object returns correct size and content_type |
test_s3_create_and_delete_bucket |
storage/src/backend.rs |
create_bucket + delete_bucket round-trip succeeds |
test_bucket_file_size_limit |
storage/src/handlers.rs |
Upload exceeding file_size_limit returns 413 |
test_bucket_allowed_mime_types |
storage/src/handlers.rs |
Upload with disallowed MIME type returns 415 |
test_tus_multipart_completion |
storage/src/tus.rs |
TUS completion assembles parts via S3 multipart, not in-memory buffer |
test_health_check_minio_up |
storage/src/handlers.rs |
/health returns 200 when S3 is reachable |
test_health_check_minio_down |
storage/src/handlers.rs |
/health returns 503 when S3 is unreachable |
test_storage_mode_self_hosted |
storage/src/backend.rs |
STORAGE_MODE=self-hosted initializes with MinIO endpoint |
test_storage_mode_cloud |
storage/src/backend.rs |
STORAGE_MODE=cloud initializes with custom S3 endpoint |
2. Integration Verification
STORAGE_MODE=self-hosted docker compose -f docker-compose.pillar-storage.yml upstarts MinIO and passes health checks- Upload a 10MB file via
POST /storage/v1/object/test-bucket/big-file.bin— verify it doesn't OOM - Download a 10MB file — verify streaming (no OOM)
- Delete an object via
DELETE /storage/v1/object/test-bucket/file.txt— verify removed from S3 and DB - Copy an object — verify new key exists in S3
- Move an object — verify old key removed, new key exists
- Upload to a bucket with
file_size_limit = 1000— verify rejection for files over 1KB - TUS upload of a 50MB file completes without loading into memory
GET /storage/v1/healthreturns 200 when MinIO is up, 503 when downSTORAGE_MODE=cloud S3_ENDPOINT=https://fsn1.your-objectstorage.com ...works with Hetzner Object Storage- Every route in the Route Summary table above returns the correct response for both success and error cases
3. supabase-js Client Compatibility
supabase.storage.listBuckets()workssupabase.storage.from('bucket').upload('file.txt', blob)workssupabase.storage.from('bucket').download('file.txt')workssupabase.storage.from('bucket').remove(['file.txt'])workssupabase.storage.from('bucket').copy('a.txt', 'b.txt')workssupabase.storage.from('bucket').move('a.txt', 'c.txt')workssupabase.storage.from('bucket').createSignedUrl('file.txt', 3600)workssupabase.storage.from('public-bucket').getPublicUrl('file.txt')works
4. CI Gate
- All unit tests run in
cargo test --workspace - Integration tests that require MinIO are gated behind
#[cfg(feature = "integration")]or#[ignore]with clear documentation - CI runs unit tests on every PR; integration tests run on merge to main (or nightly)