CREATE OR REPLACE FUNCTION madbase_realtime.broadcast_changes() RETURNS trigger AS $$ DECLARE base_payload jsonb; final_payload jsonb; topic text; msg_id bigint; BEGIN -- Construct topic topic = TG_TABLE_SCHEMA || ':' || TG_TABLE_NAME; -- Construct base payload base_payload = jsonb_build_object( 'schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'type', TG_OP, 'timestamp', now() ); IF (TG_OP = 'INSERT') THEN base_payload = base_payload || jsonb_build_object('record', row_to_json(NEW)::jsonb); ELSIF (TG_OP = 'UPDATE') THEN base_payload = base_payload || jsonb_build_object( 'record', row_to_json(NEW)::jsonb, 'old_record', row_to_json(OLD)::jsonb ); ELSIF (TG_OP = 'DELETE') THEN base_payload = base_payload || jsonb_build_object('old_record', row_to_json(OLD)::jsonb); END IF; -- Insert into history INSERT INTO madbase_realtime.messages (topic, payload) VALUES (topic, base_payload) RETURNING id INTO msg_id; -- Add ID to payload final_payload = base_payload || jsonb_build_object('id', msg_id); -- Send notification BEGIN PERFORM pg_notify('madbase_realtime', final_payload::text); EXCEPTION WHEN string_data_right_truncation OR others THEN PERFORM pg_notify('madbase_realtime', jsonb_build_object( 'id', msg_id, 'schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'type', TG_OP, 'truncated', true )::text); END; RETURN NEW; END; $$ LANGUAGE plpgsql;