Files
madbase/_milestones/M5_realtime.md
Vlad Durnea cffdf8af86
Some checks failed
CI/CD Pipeline / unit-tests (push) Failing after 1m16s
CI/CD Pipeline / integration-tests (push) Failing after 2m32s
CI/CD Pipeline / lint (push) Successful in 5m22s
CI/CD Pipeline / e2e-tests (push) Has been skipped
CI/CD Pipeline / build (push) Has been skipped
wip:milestone 0 fixes
2026-03-15 12:35:42 +02:00

9.5 KiB

Milestone 5: Realtime

Goal: supabase.channel('room').on('postgres_changes', ...).subscribe() delivers filtered change events to authorized clients.

Depends on: M0 (Security), M1 (Foundation)


5.1 — Fix Core Functionality

5.1.1 Make the realtime crate compile

File: realtime/src/lib.rs

Current issue: pub mod lib; is self-referential and will fail. The crate also references PostgresPayload and PresenceMessage types that don't exist.

Fix:

  1. Remove pub mod lib; — it creates a circular module reference
  2. Define the missing types in a types.rs module:
// realtime/src/types.rs
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PostgresPayload {
    pub schema: String,
    pub table: String,
    #[serde(rename = "type")]
    pub change_type: String,  // INSERT, UPDATE, DELETE
    pub record: Option<serde_json::Value>,
    pub old_record: Option<serde_json::Value>,
    pub columns: Option<Vec<ColumnInfo>>,
    #[serde(default)]
    pub truncated: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnInfo {
    pub name: String,
    pub type_: String,
}

Update lib.rs:

pub mod types;
pub mod replication;
pub mod ws;
pub mod presence;

pub use types::*;
pub use presence::{PresenceManager, PresenceInfo, PresenceStatus};

5.1.2 Per-table broadcast channels

Current problem: A single tokio::sync::broadcast::Sender is used for all table changes. Every connected client receives every change from every table, then filters client-side.

Fix: Use a DashMap<String, broadcast::Sender<PostgresPayload>> keyed by "schema.table":

use dashmap::DashMap;

pub struct RealtimeState {
    pub channels: Arc<DashMap<String, broadcast::Sender<PostgresPayload>>>,
}

impl RealtimeState {
    pub fn get_or_create_channel(&self, key: &str) -> broadcast::Sender<PostgresPayload> {
        self.channels
            .entry(key.to_string())
            .or_insert_with(|| broadcast::channel(1024).0)
            .clone()
    }
}

The replication listener dispatches to the correct channel:

let key = format!("{}.{}", payload.schema, payload.table);
if let Some(sender) = state.channels.get(&key) {
    let _ = sender.send(payload);
}

Clients subscribe to specific channels on join.

5.1.3 Authorization

On WebSocket join for a postgres_changes subscription:

async fn authorize_subscription(
    pool: &PgPool,
    auth_ctx: &AuthContext,
    schema: &str,
    table: &str,
) -> Result<bool, ApiError> {
    let mut tx = pool.begin().await?;

    // Set the user's role
    let role_query = format!("SET LOCAL role = '{}'", auth_ctx.role);
    sqlx::query(&role_query).execute(&mut *tx).await?;

    if let Some(claims) = &auth_ctx.claims {
        sqlx::query("SELECT set_config('request.jwt.claim.sub', $1, true)")
            .bind(&claims.sub).execute(&mut *tx).await?;
    }

    // Attempt a SELECT — if RLS denies it, the user can't subscribe
    let check = format!("SELECT 1 FROM \"{}\".\"{}\" LIMIT 0", schema, table);
    match sqlx::query(&check).execute(&mut *tx).await {
        Ok(_) => Ok(true),
        Err(_) => Ok(false),
    }
}

5.1.4 Event type filtering

Client sends a join message specifying which events to receive:

["1", "1", "realtime:public:posts", "phx_join", {
    "config": {
        "postgres_changes": [{
            "event": "INSERT",
            "schema": "public",
            "table": "posts",
            "filter": "user_id=eq.123"
        }]
    }
}]

Server-side, filter before sending:

if let Some(event_filter) = &subscription.event_filter {
    if !event_filter.contains(&payload.change_type) {
        continue; // Skip this event
    }
}

5.1.5 Row-level filtering

Apply the filter expression from the subscription config:

if let Some(filter) = &subscription.filter {
    // Parse "user_id=eq.123" into a condition
    // Check if payload.record matches the condition
    if !matches_filter(&payload.record, filter) {
        continue;
    }
}

5.1.6 Replication listener retry

File: gateway/src/worker.rs — replication spawn (line ~66)

tokio::spawn(async move {
    loop {
        match realtime::replication::start_replication_listener(repl_config.clone(), repl_tx.clone()).await {
            Ok(_) => {
                tracing::warn!("Replication listener exited normally, restarting...");
            }
            Err(e) => {
                tracing::error!("Replication listener failed: {}, retrying in 5s", e);
                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            }
        }
    }
});

5.2 — Broadcast & Presence

5.2.1 Broadcast channels

Broadcast channels are server-side fan-out without touching the database. Clients send messages to a topic, and all subscribers on that topic receive them.

// On receiving a broadcast message from a client:
let key = format!("broadcast:{}", topic);
let sender = state.get_or_create_channel(&key);
sender.send(BroadcastPayload { event, payload }).ok();

5.2.2 Wire in presence

Connect realtime/src/presence.rs to the WebSocket handler:

  • On phx_join with presence config: call presence_manager.join_channel(user_id, channel, metadata)
  • On phx_leave or disconnect: call presence_manager.leave_channel(user_id, channel)
  • Periodic heartbeat: call presence_manager.heartbeat(user_id, channel)
  • On presence_state request: return presence_manager.get_channel_users(channel)
  • On presence change: broadcast presence_diff to all channel subscribers

5.3 — Phoenix Protocol

5.3.1 Message format

supabase-js sends and expects JSON arrays: [join_ref, ref, topic, event, payload]

Verify the server parses this correctly. The current WS handler may expect JSON objects. Test with:

const channel = supabase.channel('test')
  .on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'posts' }, (payload) => {
    console.log(payload);
  })
  .subscribe();

Server responses must also be arrays:

["1", "1", "realtime:public:posts", "phx_reply", {"status": "ok", "response": {}}]

Completion Requirements

This milestone is not complete until every item below is satisfied.

1. Full Test Suite — All Green

  • cargo test --workspace passes with zero failures
  • cargo build -p realtime compiles without errors
  • All pre-existing tests still pass (no regressions)
  • New unit tests are written for every feature in this milestone:
Test Location What it validates
test_postgres_payload_deserialize realtime/src/lib.rs PostgresPayload correctly deserializes a pgoutput message
test_column_info_mapping realtime/src/lib.rs ColumnInfo maps OIDs to column names and types
test_per_table_channel_isolation realtime/src/lib.rs Messages for public.posts don't reach subscribers of public.users
test_authorize_subscription_allowed realtime/src/lib.rs User with SELECT on table → authorize_subscription returns true
test_authorize_subscription_denied realtime/src/lib.rs User without SELECT on table → authorize_subscription returns false
test_event_type_filter realtime/src/lib.rs Subscribing to INSERT only → UPDATE events are filtered out
test_row_level_filter realtime/src/lib.rs filter: 'user_id=eq.123' → only matching rows are delivered
test_broadcast_delivery realtime/src/lib.rs Broadcast message to a topic reaches all subscribers
test_broadcast_no_cross_topic_leak realtime/src/lib.rs Broadcast on topic A doesn't reach topic B subscribers
test_presence_join realtime/src/presence.rs Joining a channel broadcasts a presence_state event
test_presence_leave realtime/src/presence.rs Leaving a channel broadcasts an updated presence_diff
test_presence_key_format_consistency realtime/src/presence.rs All Redis keys use presence:channel:{ch}:user:{id} format (regression for the existing bug)
test_replication_listener_retry realtime/src/lib.rs After simulated disconnect, listener reconnects within 5s
test_phoenix_message_format realtime/src/lib.rs Outbound messages match [join_ref, ref, topic, event, payload] Phoenix format

2. Integration / supabase-js Compatibility Verification

  • WebSocket connection to /realtime/v1/websocket succeeds
  • Subscribing to postgres_changes for a table the user has access to works
  • Subscribing to a table the user does NOT have access to is rejected
  • INSERT into a subscribed table delivers a change event to the client
  • Event type filter: subscribing to INSERT only → UPDATE events are not received
  • Row-level filter: filter: 'user_id=eq.123' → only matching changes are received
  • Broadcast: sending a message to a topic → all subscribers receive it
  • Presence: joining a channel → other members see the join event
  • Replication listener auto-restarts after failure
  • supabase.channel('room').on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'messages' }, callback).subscribe() — full round-trip works

3. CI Gate

  • All unit tests run in cargo test --workspace
  • Tests requiring a Postgres replication slot are gated behind #[ignore] or #[cfg(feature = "integration")]
  • cargo build --workspace succeeds (no compilation errors in realtime)