-- Create History Table CREATE TABLE IF NOT EXISTS madbase_realtime.messages ( id bigserial PRIMARY KEY, topic text NOT NULL, -- schema:table payload jsonb NOT NULL, created_at timestamptz DEFAULT now() ); CREATE INDEX IF NOT EXISTS idx_realtime_messages_topic_id ON madbase_realtime.messages (topic, id); -- Update Trigger Function CREATE OR REPLACE FUNCTION madbase_realtime.broadcast_changes() RETURNS trigger AS $$ DECLARE base_payload jsonb; final_payload jsonb; topic text; msg_id bigint; BEGIN -- Construct topic topic = TG_TABLE_SCHEMA || ':' || TG_TABLE_NAME; -- Construct base payload base_payload = jsonb_build_object( 'schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'type', TG_OP, 'timestamp', now() ); IF (TG_OP = 'INSERT') THEN base_payload = base_payload || jsonb_build_object('record', row_to_json(NEW)::jsonb); ELSIF (TG_OP = 'UPDATE') THEN base_payload = base_payload || jsonb_build_object( 'record', row_to_json(NEW)::jsonb, 'old_record', row_to_json(OLD)::jsonb ); ELSIF (TG_OP = 'DELETE') THEN base_payload = base_payload || jsonb_build_object('old_record', row_to_json(OLD)::jsonb); END IF; -- Insert into history INSERT INTO madbase_realtime.messages (topic, payload) VALUES (topic, base_payload) RETURNING id INTO msg_id; -- Add ID to payload final_payload = base_payload || jsonb_build_object('id', msg_id); -- Send notification -- Payload limit is 8000 bytes. Larger payloads will fail or need truncation. -- If payload is too large, we can send a "payload too large" message with ID, -- and client can fetch it from history. -- For MVP, we assume it fits or fail silently on notify (but insert succeeds). BEGIN PERFORM pg_notify('madbase_realtime', final_payload::text); EXCEPTION WHEN string_data_right_truncation OR others THEN -- If notification fails, client can still rely on history if they poll or reconnect. -- We could notify just the ID. PERFORM pg_notify('madbase_realtime', jsonb_build_object( 'id', msg_id, 'schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'type', TG_OP, 'truncated', true )::text); END; RETURN NEW; END; $$ LANGUAGE plpgsql;