Some checks failed
CI/CD Pipeline / unit-tests (push) Failing after 1m16s
CI/CD Pipeline / integration-tests (push) Failing after 2m32s
CI/CD Pipeline / lint (push) Successful in 5m22s
CI/CD Pipeline / e2e-tests (push) Has been skipped
CI/CD Pipeline / build (push) Has been skipped
274 lines
9.5 KiB
Markdown
274 lines
9.5 KiB
Markdown
# Milestone 5: Realtime
|
|
|
|
**Goal:** `supabase.channel('room').on('postgres_changes', ...).subscribe()` delivers filtered change events to authorized clients.
|
|
|
|
**Depends on:** M0 (Security), M1 (Foundation)
|
|
|
|
---
|
|
|
|
## 5.1 — Fix Core Functionality
|
|
|
|
### 5.1.1 Make the realtime crate compile
|
|
|
|
**File:** `realtime/src/lib.rs`
|
|
|
|
Current issue: `pub mod lib;` is self-referential and will fail. The crate also references `PostgresPayload` and `PresenceMessage` types that don't exist.
|
|
|
|
**Fix:**
|
|
1. Remove `pub mod lib;` — it creates a circular module reference
|
|
2. Define the missing types in a `types.rs` module:
|
|
|
|
```rust
|
|
// realtime/src/types.rs
|
|
use serde::{Serialize, Deserialize};
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct PostgresPayload {
|
|
pub schema: String,
|
|
pub table: String,
|
|
#[serde(rename = "type")]
|
|
pub change_type: String, // INSERT, UPDATE, DELETE
|
|
pub record: Option<serde_json::Value>,
|
|
pub old_record: Option<serde_json::Value>,
|
|
pub columns: Option<Vec<ColumnInfo>>,
|
|
#[serde(default)]
|
|
pub truncated: bool,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct ColumnInfo {
|
|
pub name: String,
|
|
pub type_: String,
|
|
}
|
|
```
|
|
|
|
Update `lib.rs`:
|
|
```rust
|
|
pub mod types;
|
|
pub mod replication;
|
|
pub mod ws;
|
|
pub mod presence;
|
|
|
|
pub use types::*;
|
|
pub use presence::{PresenceManager, PresenceInfo, PresenceStatus};
|
|
```
|
|
|
|
### 5.1.2 Per-table broadcast channels
|
|
|
|
**Current problem:** A single `tokio::sync::broadcast::Sender` is used for all table changes. Every connected client receives every change from every table, then filters client-side.
|
|
|
|
**Fix:** Use a `DashMap<String, broadcast::Sender<PostgresPayload>>` keyed by `"schema.table"`:
|
|
|
|
```rust
|
|
use dashmap::DashMap;
|
|
|
|
pub struct RealtimeState {
|
|
pub channels: Arc<DashMap<String, broadcast::Sender<PostgresPayload>>>,
|
|
}
|
|
|
|
impl RealtimeState {
|
|
pub fn get_or_create_channel(&self, key: &str) -> broadcast::Sender<PostgresPayload> {
|
|
self.channels
|
|
.entry(key.to_string())
|
|
.or_insert_with(|| broadcast::channel(1024).0)
|
|
.clone()
|
|
}
|
|
}
|
|
```
|
|
|
|
The replication listener dispatches to the correct channel:
|
|
```rust
|
|
let key = format!("{}.{}", payload.schema, payload.table);
|
|
if let Some(sender) = state.channels.get(&key) {
|
|
let _ = sender.send(payload);
|
|
}
|
|
```
|
|
|
|
Clients subscribe to specific channels on join.
|
|
|
|
### 5.1.3 Authorization
|
|
|
|
On WebSocket join for a postgres_changes subscription:
|
|
|
|
```rust
|
|
async fn authorize_subscription(
|
|
pool: &PgPool,
|
|
auth_ctx: &AuthContext,
|
|
schema: &str,
|
|
table: &str,
|
|
) -> Result<bool, ApiError> {
|
|
let mut tx = pool.begin().await?;
|
|
|
|
// Set the user's role
|
|
let role_query = format!("SET LOCAL role = '{}'", auth_ctx.role);
|
|
sqlx::query(&role_query).execute(&mut *tx).await?;
|
|
|
|
if let Some(claims) = &auth_ctx.claims {
|
|
sqlx::query("SELECT set_config('request.jwt.claim.sub', $1, true)")
|
|
.bind(&claims.sub).execute(&mut *tx).await?;
|
|
}
|
|
|
|
// Attempt a SELECT — if RLS denies it, the user can't subscribe
|
|
let check = format!("SELECT 1 FROM \"{}\".\"{}\" LIMIT 0", schema, table);
|
|
match sqlx::query(&check).execute(&mut *tx).await {
|
|
Ok(_) => Ok(true),
|
|
Err(_) => Ok(false),
|
|
}
|
|
}
|
|
```
|
|
|
|
### 5.1.4 Event type filtering
|
|
|
|
Client sends a join message specifying which events to receive:
|
|
```json
|
|
["1", "1", "realtime:public:posts", "phx_join", {
|
|
"config": {
|
|
"postgres_changes": [{
|
|
"event": "INSERT",
|
|
"schema": "public",
|
|
"table": "posts",
|
|
"filter": "user_id=eq.123"
|
|
}]
|
|
}
|
|
}]
|
|
```
|
|
|
|
Server-side, filter before sending:
|
|
```rust
|
|
if let Some(event_filter) = &subscription.event_filter {
|
|
if !event_filter.contains(&payload.change_type) {
|
|
continue; // Skip this event
|
|
}
|
|
}
|
|
```
|
|
|
|
### 5.1.5 Row-level filtering
|
|
|
|
Apply the filter expression from the subscription config:
|
|
```rust
|
|
if let Some(filter) = &subscription.filter {
|
|
// Parse "user_id=eq.123" into a condition
|
|
// Check if payload.record matches the condition
|
|
if !matches_filter(&payload.record, filter) {
|
|
continue;
|
|
}
|
|
}
|
|
```
|
|
|
|
### 5.1.6 Replication listener retry
|
|
|
|
**File:** `gateway/src/worker.rs` — replication spawn (line ~66)
|
|
|
|
```rust
|
|
tokio::spawn(async move {
|
|
loop {
|
|
match realtime::replication::start_replication_listener(repl_config.clone(), repl_tx.clone()).await {
|
|
Ok(_) => {
|
|
tracing::warn!("Replication listener exited normally, restarting...");
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("Replication listener failed: {}, retrying in 5s", e);
|
|
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## 5.2 — Broadcast & Presence
|
|
|
|
### 5.2.1 Broadcast channels
|
|
|
|
Broadcast channels are server-side fan-out without touching the database. Clients send messages to a topic, and all subscribers on that topic receive them.
|
|
|
|
```rust
|
|
// On receiving a broadcast message from a client:
|
|
let key = format!("broadcast:{}", topic);
|
|
let sender = state.get_or_create_channel(&key);
|
|
sender.send(BroadcastPayload { event, payload }).ok();
|
|
```
|
|
|
|
### 5.2.2 Wire in presence
|
|
|
|
Connect `realtime/src/presence.rs` to the WebSocket handler:
|
|
|
|
- On `phx_join` with presence config: call `presence_manager.join_channel(user_id, channel, metadata)`
|
|
- On `phx_leave` or disconnect: call `presence_manager.leave_channel(user_id, channel)`
|
|
- Periodic heartbeat: call `presence_manager.heartbeat(user_id, channel)`
|
|
- On `presence_state` request: return `presence_manager.get_channel_users(channel)`
|
|
- On presence change: broadcast `presence_diff` to all channel subscribers
|
|
|
|
---
|
|
|
|
## 5.3 — Phoenix Protocol
|
|
|
|
### 5.3.1 Message format
|
|
|
|
supabase-js sends and expects JSON arrays: `[join_ref, ref, topic, event, payload]`
|
|
|
|
Verify the server parses this correctly. The current WS handler may expect JSON objects. Test with:
|
|
|
|
```javascript
|
|
const channel = supabase.channel('test')
|
|
.on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'posts' }, (payload) => {
|
|
console.log(payload);
|
|
})
|
|
.subscribe();
|
|
```
|
|
|
|
Server responses must also be arrays:
|
|
```json
|
|
["1", "1", "realtime:public:posts", "phx_reply", {"status": "ok", "response": {}}]
|
|
```
|
|
|
|
---
|
|
|
|
## Completion Requirements
|
|
|
|
This milestone is **not complete** until every item below is satisfied.
|
|
|
|
### 1. Full Test Suite — All Green
|
|
|
|
- [ ] `cargo test --workspace` passes with **zero failures**
|
|
- [ ] `cargo build -p realtime` compiles without errors
|
|
- [ ] All **pre-existing tests** still pass (no regressions)
|
|
- [ ] **New unit tests** are written for every feature in this milestone:
|
|
|
|
| Test | Location | What it validates |
|
|
|------|----------|-------------------|
|
|
| `test_postgres_payload_deserialize` | `realtime/src/lib.rs` | `PostgresPayload` correctly deserializes a pgoutput message |
|
|
| `test_column_info_mapping` | `realtime/src/lib.rs` | `ColumnInfo` maps OIDs to column names and types |
|
|
| `test_per_table_channel_isolation` | `realtime/src/lib.rs` | Messages for `public.posts` don't reach subscribers of `public.users` |
|
|
| `test_authorize_subscription_allowed` | `realtime/src/lib.rs` | User with SELECT on table → `authorize_subscription` returns true |
|
|
| `test_authorize_subscription_denied` | `realtime/src/lib.rs` | User without SELECT on table → `authorize_subscription` returns false |
|
|
| `test_event_type_filter` | `realtime/src/lib.rs` | Subscribing to INSERT only → UPDATE events are filtered out |
|
|
| `test_row_level_filter` | `realtime/src/lib.rs` | `filter: 'user_id=eq.123'` → only matching rows are delivered |
|
|
| `test_broadcast_delivery` | `realtime/src/lib.rs` | Broadcast message to a topic reaches all subscribers |
|
|
| `test_broadcast_no_cross_topic_leak` | `realtime/src/lib.rs` | Broadcast on topic A doesn't reach topic B subscribers |
|
|
| `test_presence_join` | `realtime/src/presence.rs` | Joining a channel broadcasts a `presence_state` event |
|
|
| `test_presence_leave` | `realtime/src/presence.rs` | Leaving a channel broadcasts an updated `presence_diff` |
|
|
| `test_presence_key_format_consistency` | `realtime/src/presence.rs` | All Redis keys use `presence:channel:{ch}:user:{id}` format (regression for the existing bug) |
|
|
| `test_replication_listener_retry` | `realtime/src/lib.rs` | After simulated disconnect, listener reconnects within 5s |
|
|
| `test_phoenix_message_format` | `realtime/src/lib.rs` | Outbound messages match `[join_ref, ref, topic, event, payload]` Phoenix format |
|
|
|
|
### 2. Integration / supabase-js Compatibility Verification
|
|
|
|
- [ ] WebSocket connection to `/realtime/v1/websocket` succeeds
|
|
- [ ] Subscribing to `postgres_changes` for a table the user has access to works
|
|
- [ ] Subscribing to a table the user does NOT have access to is rejected
|
|
- [ ] INSERT into a subscribed table delivers a change event to the client
|
|
- [ ] Event type filter: subscribing to INSERT only → UPDATE events are not received
|
|
- [ ] Row-level filter: `filter: 'user_id=eq.123'` → only matching changes are received
|
|
- [ ] Broadcast: sending a message to a topic → all subscribers receive it
|
|
- [ ] Presence: joining a channel → other members see the join event
|
|
- [ ] Replication listener auto-restarts after failure
|
|
- [ ] `supabase.channel('room').on('postgres_changes', { event: 'INSERT', schema: 'public', table: 'messages' }, callback).subscribe()` — full round-trip works
|
|
|
|
### 3. CI Gate
|
|
|
|
- [ ] All unit tests run in `cargo test --workspace`
|
|
- [ ] Tests requiring a Postgres replication slot are gated behind `#[ignore]` or `#[cfg(feature = "integration")]`
|
|
- [ ] `cargo build --workspace` succeeds (no compilation errors in `realtime`)
|