9.5 KiB
Milestone 5: Realtime
Goal: supabase.channel('room').on('postgres_changes', ...).subscribe() delivers filtered change events to authorized clients.
Depends on: M0 (Security), M1 (Foundation)
5.1 — Fix Core Functionality
5.1.1 Make the realtime crate compile
File: realtime/src/lib.rs
Current issue: pub mod lib; is self-referential and will fail. The crate also references PostgresPayload and PresenceMessage types that don't exist.
Fix:
- Remove
pub mod lib;— it creates a circular module reference - Define the missing types in a
types.rsmodule:
// realtime/src/types.rs
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PostgresPayload {
pub schema: String,
pub table: String,
#[serde(rename = "type")]
pub change_type: String, // INSERT, UPDATE, DELETE
pub record: Option<serde_json::Value>,
pub old_record: Option<serde_json::Value>,
pub columns: Option<Vec<ColumnInfo>>,
#[serde(default)]
pub truncated: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnInfo {
pub name: String,
pub type_: String,
}
Update lib.rs:
pub mod types;
pub mod replication;
pub mod ws;
pub mod presence;
pub use types::*;
pub use presence::{PresenceManager, PresenceInfo, PresenceStatus};
5.1.2 Per-table broadcast channels
Current problem: A single tokio::sync::broadcast::Sender is used for all table changes. Every connected client receives every change from every table, then filters client-side.
Fix: Use a DashMap<String, broadcast::Sender<PostgresPayload>> keyed by "schema.table":
use dashmap::DashMap;
pub struct RealtimeState {
pub channels: Arc<DashMap<String, broadcast::Sender<PostgresPayload>>>,
}
impl RealtimeState {
pub fn get_or_create_channel(&self, key: &str) -> broadcast::Sender<PostgresPayload> {
self.channels
.entry(key.to_string())
.or_insert_with(|| broadcast::channel(1024).0)
.clone()
}
}
The replication listener dispatches to the correct channel:
let key = format!("{}.{}", payload.schema, payload.table);
if let Some(sender) = state.channels.get(&key) {
let _ = sender.send(payload);
}
Clients subscribe to specific channels on join.
5.1.3 Authorization
On WebSocket join for a postgres_changes subscription:
async fn authorize_subscription(
pool: &PgPool,
auth_ctx: &AuthContext,
schema: &str,
table: &str,
) -> Result<bool, ApiError> {
let mut tx = pool.begin().await?;
// Set the user's role
let role_query = format!("SET LOCAL role = '{}'", auth_ctx.role);
sqlx::query(&role_query).execute(&mut *tx).await?;
if let Some(claims) = &auth_ctx.claims {
sqlx::query("SELECT set_config('request.jwt.claim.sub', $1, true)")
.bind(&claims.sub).execute(&mut *tx).await?;
}
// Attempt a SELECT — if RLS denies it, the user can't subscribe
let check = format!("SELECT 1 FROM \"{}\".\"{}\" LIMIT 0", schema, table);
match sqlx::query(&check).execute(&mut *tx).await {
Ok(_) => Ok(true),
Err(_) => Ok(false),
}
}
5.1.4 Event type filtering
Client sends a join message specifying which events to receive:
["1", "1", "realtime:public:posts", "phx_join", {
"config": {
"postgres_changes": [{
"event": "INSERT",
"schema": "public",
"table": "posts",
"filter": "user_id=eq.123"
}]
}
}]
Server-side, filter before sending:
if let Some(event_filter) = &subscription.event_filter {
if !event_filter.contains(&payload.change_type) {
continue; // Skip this event
}
}
5.1.5 Row-level filtering
Apply the filter expression from the subscription config:
if let Some(filter) = &subscription.filter {
// Parse "user_id=eq.123" into a condition
// Check if payload.record matches the condition
if !matches_filter(&payload.record, filter) {
continue;
}
}
5.1.6 Replication listener retry
File: gateway/src/worker.rs — replication spawn (line ~66)
tokio::spawn(async move {
loop {
match realtime::replication::start_replication_listener(repl_config.clone(), repl_tx.clone()).await {
Ok(_) => {
tracing::warn!("Replication listener exited normally, restarting...");
}
Err(e) => {
tracing::error!("Replication listener failed: {}, retrying in 5s", e);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
});
5.2 — Broadcast & Presence
5.2.1 Broadcast channels
Broadcast channels are server-side fan-out without touching the database. Clients send messages to a topic, and all subscribers on that topic receive them.
// On receiving a broadcast message from a client:
let key = format!("broadcast:{}", topic);
let sender = state.get_or_create_channel(&key);
sender.send(BroadcastPayload { event, payload }).ok();
5.2.2 Wire in presence
Connect realtime/src/presence.rs to the WebSocket handler:
- On
phx_joinwith presence config: callpresence_manager.join_channel(user_id, channel, metadata) - On
phx_leaveor disconnect: callpresence_manager.leave_channel(user_id, channel) - Periodic heartbeat: call
presence_manager.heartbeat(user_id, channel) - On
presence_staterequest: returnpresence_manager.get_channel_users(channel) - On presence change: broadcast
presence_diffto all channel subscribers
5.3 — Phoenix Protocol
5.3.1 Message format
supabase-js sends and expects JSON arrays: [join_ref, ref, topic, event, payload]
Verify the server parses this correctly. The current WS handler may expect JSON objects. Test with:
const channel = supabase.channel('test')
.on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'posts' }, (payload) => {
console.log(payload);
})
.subscribe();
Server responses must also be arrays:
["1", "1", "realtime:public:posts", "phx_reply", {"status": "ok", "response": {}}]
Completion Requirements
This milestone is not complete until every item below is satisfied.
1. Full Test Suite — All Green
cargo test --workspacepasses with zero failurescargo build -p realtimecompiles without errors- All pre-existing tests still pass (no regressions)
- New unit tests are written for every feature in this milestone:
| Test | Location | What it validates |
|---|---|---|
test_postgres_payload_deserialize |
realtime/src/lib.rs |
PostgresPayload correctly deserializes a pgoutput message |
test_column_info_mapping |
realtime/src/lib.rs |
ColumnInfo maps OIDs to column names and types |
test_per_table_channel_isolation |
realtime/src/lib.rs |
Messages for public.posts don't reach subscribers of public.users |
test_authorize_subscription_allowed |
realtime/src/lib.rs |
User with SELECT on table → authorize_subscription returns true |
test_authorize_subscription_denied |
realtime/src/lib.rs |
User without SELECT on table → authorize_subscription returns false |
test_event_type_filter |
realtime/src/lib.rs |
Subscribing to INSERT only → UPDATE events are filtered out |
test_row_level_filter |
realtime/src/lib.rs |
filter: 'user_id=eq.123' → only matching rows are delivered |
test_broadcast_delivery |
realtime/src/lib.rs |
Broadcast message to a topic reaches all subscribers |
test_broadcast_no_cross_topic_leak |
realtime/src/lib.rs |
Broadcast on topic A doesn't reach topic B subscribers |
test_presence_join |
realtime/src/presence.rs |
Joining a channel broadcasts a presence_state event |
test_presence_leave |
realtime/src/presence.rs |
Leaving a channel broadcasts an updated presence_diff |
test_presence_key_format_consistency |
realtime/src/presence.rs |
All Redis keys use presence:channel:{ch}:user:{id} format (regression for the existing bug) |
test_replication_listener_retry |
realtime/src/lib.rs |
After simulated disconnect, listener reconnects within 5s |
test_phoenix_message_format |
realtime/src/lib.rs |
Outbound messages match [join_ref, ref, topic, event, payload] Phoenix format |
2. Integration / supabase-js Compatibility Verification
- WebSocket connection to
/realtime/v1/websocketsucceeds - Subscribing to
postgres_changesfor a table the user has access to works - Subscribing to a table the user does NOT have access to is rejected
- INSERT into a subscribed table delivers a change event to the client
- Event type filter: subscribing to INSERT only → UPDATE events are not received
- Row-level filter:
filter: 'user_id=eq.123'→ only matching changes are received - Broadcast: sending a message to a topic → all subscribers receive it
- Presence: joining a channel → other members see the join event
- Replication listener auto-restarts after failure
supabase.channel('room').on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'messages' }, callback).subscribe()— full round-trip works
3. CI Gate
- All unit tests run in
cargo test --workspace - Tests requiring a Postgres replication slot are gated behind
#[ignore]or#[cfg(feature = "integration")] cargo build --workspacesucceeds (no compilation errors inrealtime)