# Milestone 5: Realtime **Goal:** `supabase.channel('room').on('postgres_changes', ...).subscribe()` delivers filtered change events to authorized clients. **Depends on:** M0 (Security), M1 (Foundation) --- ## 5.1 — Fix Core Functionality ### 5.1.1 Make the realtime crate compile **File:** `realtime/src/lib.rs` Current issue: `pub mod lib;` is self-referential and will fail. The crate also references `PostgresPayload` and `PresenceMessage` types that don't exist. **Fix:** 1. Remove `pub mod lib;` — it creates a circular module reference 2. Define the missing types in a `types.rs` module: ```rust // realtime/src/types.rs use serde::{Serialize, Deserialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PostgresPayload { pub schema: String, pub table: String, #[serde(rename = "type")] pub change_type: String, // INSERT, UPDATE, DELETE pub record: Option, pub old_record: Option, pub columns: Option>, #[serde(default)] pub truncated: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ColumnInfo { pub name: String, pub type_: String, } ``` Update `lib.rs`: ```rust pub mod types; pub mod replication; pub mod ws; pub mod presence; pub use types::*; pub use presence::{PresenceManager, PresenceInfo, PresenceStatus}; ``` ### 5.1.2 Per-table broadcast channels **Current problem:** A single `tokio::sync::broadcast::Sender` is used for all table changes. Every connected client receives every change from every table, then filters client-side. **Fix:** Use a `DashMap>` keyed by `"schema.table"`: ```rust use dashmap::DashMap; pub struct RealtimeState { pub channels: Arc>>, } impl RealtimeState { pub fn get_or_create_channel(&self, key: &str) -> broadcast::Sender { self.channels .entry(key.to_string()) .or_insert_with(|| broadcast::channel(1024).0) .clone() } } ``` The replication listener dispatches to the correct channel: ```rust let key = format!("{}.{}", payload.schema, payload.table); if let Some(sender) = state.channels.get(&key) { let _ = sender.send(payload); } ``` Clients subscribe to specific channels on join. ### 5.1.3 Authorization On WebSocket join for a postgres_changes subscription: ```rust async fn authorize_subscription( pool: &PgPool, auth_ctx: &AuthContext, schema: &str, table: &str, ) -> Result { let mut tx = pool.begin().await?; // Set the user's role let role_query = format!("SET LOCAL role = '{}'", auth_ctx.role); sqlx::query(&role_query).execute(&mut *tx).await?; if let Some(claims) = &auth_ctx.claims { sqlx::query("SELECT set_config('request.jwt.claim.sub', $1, true)") .bind(&claims.sub).execute(&mut *tx).await?; } // Attempt a SELECT — if RLS denies it, the user can't subscribe let check = format!("SELECT 1 FROM \"{}\".\"{}\" LIMIT 0", schema, table); match sqlx::query(&check).execute(&mut *tx).await { Ok(_) => Ok(true), Err(_) => Ok(false), } } ``` ### 5.1.4 Event type filtering Client sends a join message specifying which events to receive: ```json ["1", "1", "realtime:public:posts", "phx_join", { "config": { "postgres_changes": [{ "event": "INSERT", "schema": "public", "table": "posts", "filter": "user_id=eq.123" }] } }] ``` Server-side, filter before sending: ```rust if let Some(event_filter) = &subscription.event_filter { if !event_filter.contains(&payload.change_type) { continue; // Skip this event } } ``` ### 5.1.5 Row-level filtering Apply the filter expression from the subscription config: ```rust if let Some(filter) = &subscription.filter { // Parse "user_id=eq.123" into a condition // Check if payload.record matches the condition if !matches_filter(&payload.record, filter) { continue; } } ``` ### 5.1.6 Replication listener retry **File:** `gateway/src/worker.rs` — replication spawn (line ~66) ```rust tokio::spawn(async move { loop { match realtime::replication::start_replication_listener(repl_config.clone(), repl_tx.clone()).await { Ok(_) => { tracing::warn!("Replication listener exited normally, restarting..."); } Err(e) => { tracing::error!("Replication listener failed: {}, retrying in 5s", e); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } } }); ``` --- ## 5.2 — Broadcast & Presence ### 5.2.1 Broadcast channels Broadcast channels are server-side fan-out without touching the database. Clients send messages to a topic, and all subscribers on that topic receive them. ```rust // On receiving a broadcast message from a client: let key = format!("broadcast:{}", topic); let sender = state.get_or_create_channel(&key); sender.send(BroadcastPayload { event, payload }).ok(); ``` ### 5.2.2 Wire in presence Connect `realtime/src/presence.rs` to the WebSocket handler: - On `phx_join` with presence config: call `presence_manager.join_channel(user_id, channel, metadata)` - On `phx_leave` or disconnect: call `presence_manager.leave_channel(user_id, channel)` - Periodic heartbeat: call `presence_manager.heartbeat(user_id, channel)` - On `presence_state` request: return `presence_manager.get_channel_users(channel)` - On presence change: broadcast `presence_diff` to all channel subscribers --- ## 5.3 — Phoenix Protocol ### 5.3.1 Message format supabase-js sends and expects JSON arrays: `[join_ref, ref, topic, event, payload]` Verify the server parses this correctly. The current WS handler may expect JSON objects. Test with: ```javascript const channel = supabase.channel('test') .on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'posts' }, (payload) => { console.log(payload); }) .subscribe(); ``` Server responses must also be arrays: ```json ["1", "1", "realtime:public:posts", "phx_reply", {"status": "ok", "response": {}}] ``` --- ## Completion Requirements This milestone is **not complete** until every item below is satisfied. ### 1. Full Test Suite — All Green - [ ] `cargo test --workspace` passes with **zero failures** - [ ] `cargo build -p realtime` compiles without errors - [ ] All **pre-existing tests** still pass (no regressions) - [ ] **New unit tests** are written for every feature in this milestone: | Test | Location | What it validates | |------|----------|-------------------| | `test_postgres_payload_deserialize` | `realtime/src/lib.rs` | `PostgresPayload` correctly deserializes a pgoutput message | | `test_column_info_mapping` | `realtime/src/lib.rs` | `ColumnInfo` maps OIDs to column names and types | | `test_per_table_channel_isolation` | `realtime/src/lib.rs` | Messages for `public.posts` don't reach subscribers of `public.users` | | `test_authorize_subscription_allowed` | `realtime/src/lib.rs` | User with SELECT on table → `authorize_subscription` returns true | | `test_authorize_subscription_denied` | `realtime/src/lib.rs` | User without SELECT on table → `authorize_subscription` returns false | | `test_event_type_filter` | `realtime/src/lib.rs` | Subscribing to INSERT only → UPDATE events are filtered out | | `test_row_level_filter` | `realtime/src/lib.rs` | `filter: 'user_id=eq.123'` → only matching rows are delivered | | `test_broadcast_delivery` | `realtime/src/lib.rs` | Broadcast message to a topic reaches all subscribers | | `test_broadcast_no_cross_topic_leak` | `realtime/src/lib.rs` | Broadcast on topic A doesn't reach topic B subscribers | | `test_presence_join` | `realtime/src/presence.rs` | Joining a channel broadcasts a `presence_state` event | | `test_presence_leave` | `realtime/src/presence.rs` | Leaving a channel broadcasts an updated `presence_diff` | | `test_presence_key_format_consistency` | `realtime/src/presence.rs` | All Redis keys use `presence:channel:{ch}:user:{id}` format (regression for the existing bug) | | `test_replication_listener_retry` | `realtime/src/lib.rs` | After simulated disconnect, listener reconnects within 5s | | `test_phoenix_message_format` | `realtime/src/lib.rs` | Outbound messages match `[join_ref, ref, topic, event, payload]` Phoenix format | ### 2. Integration / supabase-js Compatibility Verification - [ ] WebSocket connection to `/realtime/v1/websocket` succeeds - [ ] Subscribing to `postgres_changes` for a table the user has access to works - [ ] Subscribing to a table the user does NOT have access to is rejected - [ ] INSERT into a subscribed table delivers a change event to the client - [ ] Event type filter: subscribing to INSERT only → UPDATE events are not received - [ ] Row-level filter: `filter: 'user_id=eq.123'` → only matching changes are received - [ ] Broadcast: sending a message to a topic → all subscribers receive it - [ ] Presence: joining a channel → other members see the join event - [ ] Replication listener auto-restarts after failure - [ ] `supabase.channel('room').on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'messages' }, callback).subscribe()` — full round-trip works ### 3. CI Gate - [ ] All unit tests run in `cargo test --workspace` - [ ] Tests requiring a Postgres replication slot are gated behind `#[ignore]` or `#[cfg(feature = "integration")]` - [ ] `cargo build --workspace` succeeds (no compilation errors in `realtime`)