Files
madbase/_milestones/M2_storage_pillar.md
Vlad Durnea cffdf8af86
Some checks failed
CI/CD Pipeline / unit-tests (push) Failing after 1m16s
CI/CD Pipeline / integration-tests (push) Failing after 2m32s
CI/CD Pipeline / lint (push) Successful in 5m22s
CI/CD Pipeline / e2e-tests (push) Has been skipped
CI/CD Pipeline / build (push) Has been skipped
wip:milestone 0 fixes
2026-03-15 12:35:42 +02:00

17 KiB

Milestone 2: Storage Pillar

Goal: Storage becomes a first-class pillar supporting self-hosted MinIO or cloud S3 (Hetzner Object Storage, AWS S3, Backblaze B2). Complete the supabase-js storage API surface.

Depends on: M1 (Foundation)


2.1 — Storage Pillar Compose & Configuration

2.1.1 Create docker-compose.pillar-storage.yml

This compose file is used only for self-hosted mode. In cloud mode, workers connect directly to the external S3 endpoint and this compose file is not needed.

# MadBase - Pillar: Storage (Self-Hosted)
# S3-compatible object storage via MinIO

services:
  minio:
    image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
    container_name: madbase_minio
    command: server /data --console-address ":9001"
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      MINIO_ROOT_USER: ${S3_ACCESS_KEY}
      MINIO_ROOT_PASSWORD: ${S3_SECRET_KEY}
      MINIO_BROWSER_REDIRECT_URL: http://localhost:9001
    volumes:
      - minio_data:/data
    healthcheck:
      test: ["CMD", "mc", "ready", "local"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: unless-stopped

volumes:
  minio_data:

networks:
  default:
    name: madbase
    external: true

2.1.2 Add STORAGE_MODE env var

File: common/src/config.rs

Add to Config:

pub storage_mode: StorageMode,
pub s3_endpoint: String,
pub s3_access_key: String,
pub s3_secret_key: String,
pub s3_bucket: String,
pub s3_region: String,
#[derive(Clone, Debug)]
pub enum StorageMode {
    Cloud,       // External S3 (Hetzner, AWS, B2)
    SelfHosted,  // MinIO
}

Load from env:

let storage_mode = match env::var("STORAGE_MODE").unwrap_or_else(|_| "self-hosted".into()).as_str() {
    "cloud" | "s3" => StorageMode::Cloud,
    _ => StorageMode::SelfHosted,
};

2.1.3 Create storage-node.yaml template

File: templates/storage-node.yaml

id: storage-node
name: Dedicated Storage Node
description: MinIO object storage for self-hosted deployments
version: 1.0

min_hetzner_plan: CX21
estimated_monthly_cost: 6.94

services:
  - id: minio
    name: MinIO
    image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
    ports: ["9000:9000", "9001:9001"]
    command: ["server", "/data", "--console-address", ":9001"]
    volumes:
      - minio_data:/data
    resource_profile: storage_intensive

requirements:
  min_nodes: 1
  max_nodes: 4
  supports_ha: true
  recommended_deployment: "Dedicated node with attached block storage"

notes: |
  For HA, use distributed MinIO with 4+ nodes and erasure coding.
  For cloud deployments, skip this node — use Hetzner Object Storage.
  Estimated storage: 1TB on CX21 block storage = ~€6/mo additional.

2.1.4 Add shared Docker network

Add to each docker-compose.pillar-*.yml:

networks:
  default:
    name: madbase
    external: true

Create the network before first use: docker network create madbase


2.2 — Storage Backend Improvements

2.2.1 Route handlers through StorageBackend trait

Current problem: StorageState holds a raw aws_sdk_s3::Client and handlers call state.s3_client.put_object() directly, bypassing the StorageBackend trait entirely. The trait exists but is unused.

Fix:

  1. Expand the StorageBackend trait:
#[async_trait]
pub trait StorageBackend: Send + Sync {
    async fn put_object(&self, bucket: &str, key: &str, data: Bytes, content_type: Option<&str>) -> Result<()>;
    async fn get_object(&self, bucket: &str, key: &str) -> Result<GetObjectResponse>;
    async fn delete_object(&self, bucket: &str, key: &str) -> Result<()>;
    async fn copy_object(&self, bucket: &str, src_key: &str, dst_key: &str) -> Result<()>;
    async fn create_bucket(&self, bucket: &str) -> Result<()>;
    async fn delete_bucket(&self, bucket: &str) -> Result<()>;
    async fn head_object(&self, bucket: &str, key: &str) -> Result<ObjectMetadata>;
    async fn list_objects(&self, bucket: &str, prefix: &str) -> Result<Vec<ObjectMetadata>>;
}

pub struct GetObjectResponse {
    pub body: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send>>,
    pub content_type: Option<String>,
    pub content_length: Option<i64>,
}
  1. Change StorageState:
#[derive(Clone)]
pub struct StorageState {
    pub db: PgPool,
    pub backend: Arc<dyn StorageBackend>,
    pub config: Config,
    pub bucket_name: String,
}
  1. Update storage/src/lib.rs init:
pub async fn init(db: PgPool, config: Config) -> Router {
    let backend: Arc<dyn StorageBackend> = Arc::new(
        AwsS3Backend::new(&config).await.expect("Failed to init storage backend")
    );
    let bucket_name = config.s3_bucket.clone();
    backend.create_bucket(&bucket_name).await.ok();

    let state = StorageState { db, backend, config, bucket_name };
    // ...routes...
}

2.2.2 Add streaming to StorageBackend

Replace get_object() -> Bytes with streaming response. The AWS SDK already supports this:

async fn get_object(&self, _bucket: &str, key: &str) -> Result<GetObjectResponse> {
    let resp = self.client.get_object()
        .bucket(&self.bucket_name)
        .key(key)
        .send()
        .await?;

    let stream = resp.body.into_async_read();
    let byte_stream = tokio_util::io::ReaderStream::new(stream);
    let mapped = byte_stream.map(|r| r.map_err(|e| anyhow::anyhow!(e)));

    Ok(GetObjectResponse {
        body: Box::pin(mapped),
        content_type: resp.content_type.map(|s| s.to_string()),
        content_length: resp.content_length,
    })
}

In the handler, convert to axum Body:

let resp = state.backend.get_object(&state.bucket_name, &key).await?;
let body = Body::from_stream(resp.body);
Ok((headers, body))

2.2.3 Add missing HTTP endpoints

Delete object: DELETE /storage/v1/object/:bucket_id/*filename

pub async fn delete_object(
    State(state): State<StorageState>,
    Extension(auth_ctx): Extension<AuthContext>,
    Extension(project_ctx): Extension<ProjectContext>,
    Path((bucket_id, filename)): Path<(String, String)>,
    db: Option<Extension<PgPool>>,
) -> Result<StatusCode, ApiError> {
    let pool = db.map(|Extension(p)| p).unwrap_or(state.db.clone());
    let mut rls = RlsTransaction::begin(&pool, &auth_ctx).await?;

    // Verify object exists under RLS
    let exists = sqlx::query_scalar::<_, Uuid>(
        "SELECT id FROM storage.objects WHERE bucket_id = $1 AND name = $2"
    )
    .bind(&bucket_id).bind(&filename)
    .fetch_optional(&mut *rls.tx).await?;

    if exists.is_none() {
        return Err(ApiError::NotFound("Object not found".into()));
    }

    // Delete from S3
    let key = format!("{}/{}/{}", project_ctx.project_ref, bucket_id, filename);
    state.backend.delete_object(&state.bucket_name, &key).await
        .map_err(|e| ApiError::Internal(e.to_string()))?;

    // Delete from DB
    sqlx::query("DELETE FROM storage.objects WHERE bucket_id = $1 AND name = $2")
        .bind(&bucket_id).bind(&filename)
        .execute(&mut *rls.tx).await?;

    rls.commit().await?;
    Ok(StatusCode::NO_CONTENT)
}

Delete bucket: DELETE /storage/v1/bucket/:bucket_id

Copy object: POST /storage/v1/object/copy with { "sourceKey": "bucket/path", "destinationKey": "bucket/path" }

Move object: POST /storage/v1/object/move (copy + delete source)

Public URL: GET /storage/v1/object/public/:bucket_id/*filename — check storage.buckets.public = true, return redirect to S3 presigned URL or stream directly.

2.2.4 Add bucket constraints

Migration: Add columns to storage.buckets:

ALTER TABLE storage.buckets
    ADD COLUMN IF NOT EXISTS file_size_limit BIGINT,
    ADD COLUMN IF NOT EXISTS allowed_mime_types TEXT[];

Validation in upload handler:

// After fetching bucket info
if let Some(limit) = bucket.file_size_limit {
    if data.len() as i64 > limit {
        return Err(ApiError::BadRequest(format!(
            "File size {} exceeds bucket limit {}", data.len(), limit
        )));
    }
}
if let Some(allowed) = &bucket.allowed_mime_types {
    if !allowed.is_empty() && !allowed.contains(&content_type.to_string()) {
        return Err(ApiError::BadRequest(format!(
            "MIME type {} not allowed in this bucket", content_type
        )));
    }
}

2.2.5 Fix TUS completion — use S3 multipart upload

File: storage/src/tus.rstus_patch_upload completion block (line ~252)

Current problem: fs::read(&upload_path) loads the entire completed file into memory.

Fix: Use S3 multipart upload. On TUS create, start a multipart upload. On each PATCH, upload that chunk as a part. On completion, finalize the multipart upload.

Store the multipart upload ID in the .info file:

{
    "upload_length": 104857600,
    "bucket_id": "avatars",
    "filename": "photo.jpg",
    "s3_upload_id": "abc123...",
    "parts": [
        { "part_number": 1, "etag": "\"abc\"", "size": 5242880 },
        { "part_number": 2, "etag": "\"def\"", "size": 5242880 }
    ]
}

On PATCH:

let part_number = (current_offset / PART_SIZE) as i32 + 1;
let upload_part = state.backend.client()
    .upload_part()
    .bucket(&state.bucket_name)
    .key(&key)
    .upload_id(&s3_upload_id)
    .part_number(part_number)
    .body(ByteStream::from(data))
    .send()
    .await?;
// Store etag in info file

On completion:

state.backend.client()
    .complete_multipart_upload()
    .bucket(&state.bucket_name)
    .key(&key)
    .upload_id(&s3_upload_id)
    .multipart_upload(completed_parts)
    .send()
    .await?;
// Clean up local temp files

Note: S3 multipart parts must be at least 5MB (except the last part). Buffer PATCH data until 5MB before uploading a part.


2.3 — Storage Health & Observability

2.3.1 Health check endpoint

Add to storage/src/lib.rs router:

.route("/health", get(health_check))
async fn health_check(State(state): State<StorageState>) -> Result<&'static str, StatusCode> {
    state.backend.head_bucket(&state.bucket_name)
        .await
        .map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?;
    Ok("OK")
}

2.3.2 Structured logging

Replace all tracing::info!("File size: {} bytes", size) with structured fields:

tracing::info!(
    bucket = %bucket_id,
    filename = %filename,
    size_bytes = size,
    "Upload completed"
);

2.3.3 Image transforms — run async

File: storage/src/handlers.rstransform_image function (line 328)

Currently runs synchronously, blocking the async runtime. Use tokio::task::spawn_blocking:

if width.is_some() || height.is_some() || format.is_some() {
    let body_clone = body_bytes.clone();
    match tokio::task::spawn_blocking(move || {
        transform_image(body_clone, width, height, quality, format)
    }).await {
        Ok(Ok((new_bytes, new_ct))) => { ... },
        Ok(Err(e)) => { tracing::warn!(error = %e, "Image transform failed"); },
        Err(e) => { tracing::warn!(error = %e, "Image transform panicked"); },
    }
}

2.4 — MinIO HA (Optional)

2.4.1 Distributed MinIO documentation

For self-hosted production with HA, document the distributed mode setup:

# docker-compose.pillar-storage-ha.yml
services:
  minio1:
    image: quay.io/minio/minio:RELEASE.2024-06-13T22-53-53Z
    command: server http://minio{1...4}/data --console-address ":9001"
    # ... same for minio2, minio3, minio4

Requires 4 nodes minimum for erasure coding. Each node needs its own block storage volume.

2.4.2 Lifecycle rules

Configure via MinIO client:

mc ilm rule add madbase/madbase \
    --expire-delete-marker \
    --noncurrent-expire-days 30 \
    --prefix "tus-temp/"

This auto-cleans incomplete TUS uploads after 30 days.


Route Summary (after M2)

Method Path Handler supabase-js method
GET /storage/v1/bucket list_buckets listBuckets()
POST /storage/v1/bucket create_bucket createBucket()
DELETE /storage/v1/bucket/:id delete_bucket deleteBucket()
POST /storage/v1/object/list/:bucket_id list_objects list()
POST /storage/v1/object/:bucket_id/*filename upload_object upload()
GET /storage/v1/object/:bucket_id/*filename download_object download()
DELETE /storage/v1/object/:bucket_id/*filename delete_object remove()
POST /storage/v1/object/copy copy_object copy()
POST /storage/v1/object/move move_object move()
POST /storage/v1/object/sign/:bucket_id/*filename sign_object createSignedUrl()
GET /storage/v1/object/sign/:bucket_id/*filename get_signed_object (signed URL access)
GET /storage/v1/object/public/:bucket_id/*filename get_public_url getPublicUrl()
POST /storage/v1/upload/resumable tus_create_upload (TUS)
PATCH /storage/v1/upload/resumable/:id tus_patch_upload (TUS)
HEAD /storage/v1/upload/resumable/:id tus_head_upload (TUS)
GET /storage/v1/health health_check

Completion Requirements

This milestone is not complete until every item below is satisfied.

1. Full Test Suite — All Green

  • cargo test --workspace passes with zero failures
  • All pre-existing tests still pass (no regressions)
  • New unit tests are written for every feature in this milestone:
Test Location What it validates
test_s3_put_object storage/src/backend.rs put_object stores bytes and returns Ok
test_s3_get_object_streaming storage/src/backend.rs get_object returns a streaming body, not buffered
test_s3_delete_object storage/src/backend.rs delete_object removes the key; subsequent head_object returns NotFound
test_s3_copy_object storage/src/backend.rs copy_object duplicates object; both keys exist
test_s3_move_object storage/src/backend.rs After move_object, old key is gone, new key exists
test_s3_list_objects storage/src/backend.rs list_objects returns correct prefix-filtered results
test_s3_head_object_metadata storage/src/backend.rs head_object returns correct size and content_type
test_s3_create_and_delete_bucket storage/src/backend.rs create_bucket + delete_bucket round-trip succeeds
test_bucket_file_size_limit storage/src/handlers.rs Upload exceeding file_size_limit returns 413
test_bucket_allowed_mime_types storage/src/handlers.rs Upload with disallowed MIME type returns 415
test_tus_multipart_completion storage/src/tus.rs TUS completion assembles parts via S3 multipart, not in-memory buffer
test_health_check_minio_up storage/src/handlers.rs /health returns 200 when S3 is reachable
test_health_check_minio_down storage/src/handlers.rs /health returns 503 when S3 is unreachable
test_storage_mode_self_hosted storage/src/backend.rs STORAGE_MODE=self-hosted initializes with MinIO endpoint
test_storage_mode_cloud storage/src/backend.rs STORAGE_MODE=cloud initializes with custom S3 endpoint

2. Integration Verification

  • STORAGE_MODE=self-hosted docker compose -f docker-compose.pillar-storage.yml up starts MinIO and passes health checks
  • Upload a 10MB file via POST /storage/v1/object/test-bucket/big-file.bin — verify it doesn't OOM
  • Download a 10MB file — verify streaming (no OOM)
  • Delete an object via DELETE /storage/v1/object/test-bucket/file.txt — verify removed from S3 and DB
  • Copy an object — verify new key exists in S3
  • Move an object — verify old key removed, new key exists
  • Upload to a bucket with file_size_limit = 1000 — verify rejection for files over 1KB
  • TUS upload of a 50MB file completes without loading into memory
  • GET /storage/v1/health returns 200 when MinIO is up, 503 when down
  • STORAGE_MODE=cloud S3_ENDPOINT=https://fsn1.your-objectstorage.com ... works with Hetzner Object Storage
  • Every route in the Route Summary table above returns the correct response for both success and error cases

3. supabase-js Client Compatibility

  • supabase.storage.listBuckets() works
  • supabase.storage.from('bucket').upload('file.txt', blob) works
  • supabase.storage.from('bucket').download('file.txt') works
  • supabase.storage.from('bucket').remove(['file.txt']) works
  • supabase.storage.from('bucket').copy('a.txt', 'b.txt') works
  • supabase.storage.from('bucket').move('a.txt', 'c.txt') works
  • supabase.storage.from('bucket').createSignedUrl('file.txt', 3600) works
  • supabase.storage.from('public-bucket').getPublicUrl('file.txt') works

4. CI Gate

  • All unit tests run in cargo test --workspace
  • Integration tests that require MinIO are gated behind #[cfg(feature = "integration")] or #[ignore] with clear documentation
  • CI runs unit tests on every PR; integration tests run on merge to main (or nightly)