Skip to main content

Chat System Documentation

Complete Frontend Integration Guide

This document covers the chat system architecture, RPC functions, Realtime setup, and frontend integration patterns for the DM-only MVP.


1. Architecture Overview

Flow Summary

StepDescription
1–2Already-authenticated frontend requests a Supabase Realtime token from the backend
3–4Backend verifies the httpOnly session cookie, mints a 1-hour JWT with user_id and org_id claims
5–7Frontend uses the JWT for PostgREST calls (RPC + INSERT) and Realtime WebSocket
8–11Database triggers broadcast new messages to conversation members via Realtime

2. Schema Summary

All tables live in the private schema.

Core Tables

TablePurposeFrontend Access
chat_conversationsDM / group / channel containers (MVP: DM only)SELECT via RLS
chat_conversation_membersMembership + read cursor per user per conversationSELECT via RLS
chat_messagesMessages with optional thread parentSELECT + INSERT via RLS
chat_user_presenceHeartbeat-based online/away/offline statusSELECT via RLS

Future Tables (schema exists, no RLS policies in MVP)

TablePurpose
chat_reactionsEmoji reactions on messages
chat_attachmentsFile attachments on messages

User Profile Columns

TableColumnTypeDescription
usersemailVARCHAR(255)Global user email
user_organizationsdisplay_nameVARCHAR(100)Per-org display name
user_organizationsavatar_urlTEXTPer-org avatar URL

3. Authentication & Realtime Token

Token Flow

The user must already be logged in (authenticated session via httpOnly cookie). The frontend then requests a short-lived Supabase token from a dedicated endpoint.

Endpoint: POST /api/v1/realtime/token

Detail
MethodPOST
BodyEmpty
AuthhttpOnly session cookie (sent automatically by the browser)
ReturnsSigned JWT valid for 1 hour
// Fetch the Supabase Realtime token
const res = await fetch('/api/v1/realtime/token', { method: 'POST', credentials: 'include' });
const { token } = await res.json();

// Set it on the Supabase client BEFORE subscribing to channels
supabase.realtime.setAuth(token);

JWT Claims

The backend mints the JWT with these claims. RLS policies and helper functions extract them via current_setting('request.jwt.claims', true).

{
"aud": "authenticated",
"role": "authenticated",
"exp": 1768664544,
"iat": 1768660944,
"user_id": "30",
"org_id": "45"
}

Token Refresh

The token expires after 1 hour. The frontend should refresh it before expiry:

// Refresh ~5 minutes before expiry
function scheduleTokenRefresh(token: string) {
const payload = JSON.parse(atob(token.split('.')[1]));
const expiresIn = payload.exp * 1000 - Date.now();
const refreshIn = expiresIn - 5 * 60 * 1000; // 5 min before expiry

setTimeout(async () => {
const res = await fetch('/api/v1/realtime/token', {
method: 'POST',
credentials: 'include'
});
const { token: newToken } = await res.json();
supabase.realtime.setAuth(newToken);
scheduleTokenRefresh(newToken);
}, Math.max(refreshIn, 0));
}

How Claims Map to Helper Functions

Helper FunctionExtractsUsed By
private.current_user_id()jwt.claims.user_idbigintAll RLS policies & triggers
private.current_org_id()jwt.claims.org_idbigintAll RLS policies & triggers
private.user_conversation_ids()Returns conversation IDs where the current user is a member in the current orgConversation/message/member RLS

4. RPC Functions (Frontend API)

These are the primary functions the frontend calls via supabase.rpc(). All are SECURITY DEFINER — they run with elevated privileges and enforce authorization internally using JWT claims.


4.1 get_recent_dms

Purpose: Fetch the DM conversation list with peer profile, last message, unread count, and presence. This is the main "inbox" query.

Signature:

private.get_recent_dms(
p_limit int DEFAULT 20,
p_cursor timestamptz DEFAULT NULL
) RETURNS TABLE (...)

Parameters:

ParameterTypeDefaultDescription
p_limitint20Max conversations to return
p_cursortimestamptzNULLCursor for pagination — pass updated_at of the last item from the previous page. NULL for the first page.

Returns:

ColumnTypeDescription
conversation_idbigintConversation ID
peer_user_idbigintThe other user's ID (or self for self-DMs)
peer_emailvarcharPeer's email
peer_display_namevarcharPeer's display name in the current org
peer_avatar_urltextPeer's avatar URL in the current org
peer_statusvarchar'online', 'away', or 'offline'
peer_last_seen_attimestamptzPeer's last heartbeat time
last_message_contenttextContent of the most recent message
last_message_senderbigintSender ID of the most recent message
last_message_attimestamptzTimestamp of the most recent message
unread_countbigintMessages from the peer after user's read cursor
updated_attimestamptzConversation's last activity time (use as pagination cursor)

Frontend Usage:

// First page
const { data, error } = await supabase.rpc('get_recent_dms', {
p_limit: 20
});

// Next page — pass updated_at of the last item
const { data: nextPage } = await supabase.rpc('get_recent_dms', {
p_limit: 20,
p_cursor: data[data.length - 1].updated_at
});

Notes:

  • Ordered by updated_at DESC (most recently active first).
  • The updated_at field is bumped by a trigger every time a new message is sent.
  • For self-DMs (notes to self), the peer is the current user.
  • Unread count excludes messages sent by the current user.

4.2 get_conversation_messages

Purpose: Fetch paginated messages for a single conversation. Returns newest-first; the frontend reverses for display.

Signature:

private.get_conversation_messages(
p_conversation_id bigint,
p_limit int DEFAULT 50,
p_cursor bigint DEFAULT NULL
) RETURNS TABLE (...)

Parameters:

ParameterTypeDefaultDescription
p_conversation_idbigintThe conversation to fetch messages from
p_limitint50Max messages to return
p_cursorbigintNULLMessage ID cursor — pass the smallest message_id from the previous page. NULL for the first (newest) page.

Returns:

ColumnTypeDescription
message_idbigintMessage ID
sender_idbigintWho sent the message
contenttextMessage body
parent_message_idbigintThread parent (NULL if top-level)
reply_countintNumber of thread replies
is_editedbooleanWhether the message was edited
is_deletedbooleanWhether the message was soft-deleted
created_attimestamptzWhen the message was sent

Frontend Usage:

// Load latest messages
const { data } = await supabase.rpc('get_conversation_messages', {
p_conversation_id: 123,
p_limit: 50
});

// Load older messages (scroll up)
const { data: older } = await supabase.rpc('get_conversation_messages', {
p_conversation_id: 123,
p_limit: 50,
p_cursor: data[data.length - 1].message_id
});

// Reverse for chronological display
const chronological = data.reverse();

Notes:

  • Membership is enforced internally — the function checks that p_conversation_id is in user_conversation_ids().
  • Calling with a conversation the user is not a member of returns zero rows (no error).

4.3 update_read_cursor

Purpose: Mark messages as read. Forward-only — the cursor can only advance, never go backward.

Signature:

private.update_read_cursor(
p_conversation_id bigint,
p_last_read_message_id bigint
) RETURNS void

Parameters:

ParameterTypeDescription
p_conversation_idbigintConversation to update
p_last_read_message_idbigintID of the latest message the user has seen

Frontend Usage:

// When user views a conversation, mark latest message as read
await supabase.rpc('update_read_cursor', {
p_conversation_id: 123,
p_last_read_message_id: 456
});

Notes:

  • No-op if the provided message ID is less than or equal to the current cursor.
  • Only updates the calling user's cursor (enforced via current_user_id()).
  • Call this when the user opens a conversation or when new messages arrive while the conversation is focused.

4.4 update_presence

Purpose: Heartbeat-based presence. Frontend calls this periodically to keep the user's status current.

Signature:

private.update_presence(
p_status varchar
) RETURNS void

Parameters:

ParameterTypeDescription
p_statusvarcharOne of 'online', 'away', 'offline'

Frontend Usage:

// On app focus / periodic heartbeat (every ~30 seconds)
await supabase.rpc('update_presence', { p_status: 'online' });

// On app blur / idle
await supabase.rpc('update_presence', { p_status: 'away' });

// On app close / logout
await supabase.rpc('update_presence', { p_status: 'offline' });

Notes:

  • Uses INSERT ... ON CONFLICT DO UPDATE — creates the row on first call, updates on subsequent calls.
  • A cron job runs every minute and sets users to 'offline' if last_seen_at is older than 90 seconds.
  • Recommended heartbeat interval: 30 seconds (gives a 60-second buffer before timeout).

4.5 get_recent_dm_presence

Purpose: Batch-fetch presence for peers in the DM list. Uses the same pagination window as get_recent_dms so you can fetch presence for exactly the conversations currently visible.

Signature:

private.get_recent_dm_presence(
p_limit int DEFAULT 20,
p_cursor timestamptz DEFAULT NULL
) RETURNS TABLE (...)

Parameters:

ParameterTypeDefaultDescription
p_limitint20Same limit used in get_recent_dms
p_cursortimestamptzNULLSame cursor used in get_recent_dms

Returns:

ColumnTypeDescription
user_idbigintPeer user ID
statusvarchar'online', 'away', or 'offline'
last_seen_attimestamptzLast heartbeat timestamp

Frontend Usage:

// Fetch presence matching the current DM list page
const { data: presence } = await supabase.rpc('get_recent_dm_presence', {
p_limit: 20
});

// Build a lookup map
const presenceMap = new Map(
presence.map(p => [p.user_id, p])
);

Notes:

  • This is an optimization to avoid N+1 presence queries. Use the same p_limit and p_cursor values you passed to get_recent_dms.
  • get_recent_dms already includes peer_status and peer_last_seen_at, so this function is primarily useful for polling presence updates without re-fetching the full DM list.

5. Direct Table Access

Sending Messages (INSERT)

Messages are inserted directly into chat_messages via PostgREST. Server-side triggers enforce defaults — the frontend only needs to provide conversation_id and content.

const { data, error } = await supabase
.from('chat_messages')
.insert({
conversation_id: 123,
content: 'Hello!',
// Optional: for thread replies
parent_message_id: 456
})
.select()
.single();

What the trigger auto-sets (you cannot override these):

FieldValue
sender_idFrom JWT user_id
org_idFrom JWT org_id
created_atNOW()
updated_atNOW()
reply_count0
is_editedfalse
is_deletedfalse

RLS enforces: The conversation must be one the user is a member of, and the sender_id must match the JWT.

Reading Tables Directly

All four core tables support direct SELECT via PostgREST, filtered by RLS:

// Conversations the user is a member of
const { data } = await supabase.from('chat_conversations').select('*');

// Members of conversations the user is in
const { data } = await supabase.from('chat_conversation_members').select('*');

// Messages in the user's conversations
const { data } = await supabase.from('chat_messages').select('*');

// Presence for users in the same org
const { data } = await supabase.from('chat_user_presence').select('*');

Recommendation: Prefer the RPC functions (get_recent_dms, get_conversation_messages) over raw table queries — they join related data efficiently and handle pagination correctly.


6. Realtime Subscriptions

Published Tables

These tables are added to the supabase_realtime publication:

TableEventsUse Case
chat_messagesINSERTNew messages in conversations
chat_conversation_membersINSERT, UPDATE, DELETEMembership changes, read cursor updates
chat_user_presenceINSERT, UPDATEPresence status changes

Broadcast Notifications (Trigger-Based)

In addition to postgres_changes, the trg_notify_new_message trigger sends a lightweight broadcast to each conversation member's personal channel (excluding the sender).

Channel format: user:{org_id}:{user_id}

Payload:

{
"message_id": 789,
"conversation_id": 123,
"sender_id": 30,
"content": "Hello! (truncated to 200 chars)",
"created_at": "2026-02-20T12:00:00Z"
}

Frontend Subscription Pattern

// 0. Set auth BEFORE creating any channels
supabase.realtime.setAuth(token);

// 1. Subscribe to personal broadcast channel for notifications
const notificationChannel = supabase
.channel(`user:${orgId}:${userId}`)
.on('broadcast', { event: 'new_message' }, (payload) => {
// Show notification / update unread badge
handleNewMessageNotification(payload);
})
.subscribe();

// 2. Subscribe to postgres_changes for the active conversation
const messagesChannel = supabase
.channel('active-conversation')
.on(
'postgres_changes',
{
event: 'INSERT',
schema: 'private',
table: 'chat_messages',
filter: `conversation_id=eq.${conversationId}`
},
(payload) => {
appendMessage(payload.new);
}
)
.subscribe();

// 3. Subscribe to presence changes in the org
const presenceChannel = supabase
.channel('presence-updates')
.on(
'postgres_changes',
{
event: '*',
schema: 'private',
table: 'chat_user_presence',
filter: `org_id=eq.${orgId}`
},
(payload) => {
updatePresenceIndicator(payload.new);
}
)
.subscribe();

Cleanup

// When leaving a conversation or unmounting
await supabase.removeChannel(messagesChannel);

// On logout / app close
await supabase.removeAllChannels();

7. RLS Policies

Policy Summary

TableOperationRule
chat_conversationsSELECTUser must be a member (via user_conversation_ids())
chat_conversation_membersSELECTUser must be a co-member of the conversation
chat_messagesSELECTUser must be a member of the message's conversation
chat_messagesINSERTMust be a member + sender_id must match JWT
chat_user_presenceSELECTorg_id must match JWT
chat_reactionsRLS enabled, no policies → blocked in MVP
chat_attachmentsRLS enabled, no policies → blocked in MVP

8. Rate Limits & Triggers

Rate Limits

Two server-side rate limits are enforced via BEFORE INSERT triggers on chat_messages:

ScopeLimitWindowError CodeError Key
Per-conversation5 messages10 secondsP0429rate_limit_conversation
Global (per user per org)20 messages60 secondsP0430rate_limit_global

Frontend Handling:

const { data, error } = await supabase
.from('chat_messages')
.insert({ conversation_id: 123, content: 'Hello!' });

if (error) {
if (error.code === 'P0429') {
showToast('Slow down! Too many messages in this conversation.');
} else if (error.code === 'P0430') {
showToast('Global rate limit reached. Please wait a moment.');
}
}

All Triggers on chat_messages

TriggerTimingPurpose
trg_enforce_message_defaultsBEFORE INSERTSets sender_id, org_id, timestamps, and resets counters
trg_check_message_rate_limitBEFORE INSERTPer-conversation rate limit (5 / 10s)
trg_check_global_rate_limitBEFORE INSERTGlobal rate limit (20 / 60s)
trg_update_parent_reply_countAFTER INSERTIncrements reply_count on parent message for threads
trg_notify_new_messageAFTER INSERTBroadcasts to member channels via realtime.send()
trg_touch_conversationAFTER INSERTUpdates chat_conversations.updated_at for DM list ordering

Presence Timeout (Cron)

A pg_cron job runs every minute and marks users as 'offline' if their last_seen_at is older than 90 seconds.


9. Common Patterns & Examples

Initialize Chat

async function initializeChat() {
// 1. Get Supabase Realtime token (user must already be logged in)
const res = await fetch('/api/v1/realtime/token', {
method: 'POST',
credentials: 'include'
});
const { token } = await res.json();

// 2. Set auth on Supabase client
supabase.realtime.setAuth(token);

// 3. Schedule token refresh
scheduleTokenRefresh(token);

// 4. Start presence heartbeat
startPresenceHeartbeat();

// 5. Subscribe to personal notification channel
subscribeToNotifications();

// 6. Load DM list
return await supabase.rpc('get_recent_dms', { p_limit: 20 });
}

Open a Conversation

async function openConversation(conversationId: number) {
// Fetch messages
const { data: messages } = await supabase.rpc('get_conversation_messages', {
p_conversation_id: conversationId,
p_limit: 50
});

// Mark as read
if (messages.length > 0) {
await supabase.rpc('update_read_cursor', {
p_conversation_id: conversationId,
p_last_read_message_id: messages[0].message_id // newest
});
}

return messages.reverse(); // chronological order
}

Presence Heartbeat

let heartbeatInterval: NodeJS.Timer;

function startPresenceHeartbeat() {
supabase.rpc('update_presence', { p_status: 'online' });

heartbeatInterval = setInterval(() => {
supabase.rpc('update_presence', { p_status: 'online' });
}, 30_000); // Every 30 seconds
}

function stopPresenceHeartbeat() {
clearInterval(heartbeatInterval);
supabase.rpc('update_presence', { p_status: 'offline' });
}

window.addEventListener('focus', () => {
supabase.rpc('update_presence', { p_status: 'online' });
});

window.addEventListener('blur', () => {
supabase.rpc('update_presence', { p_status: 'away' });
});

10. Troubleshooting

Issue: get_recent_dms Returns No Results

CauseSolution
User is not a member of any DM conversationVerify rows exist in chat_conversation_members
JWT missing org_id or user_idDecode the JWT and check claims
Conversation type is not 'dm'Only type = 'dm' conversations are returned
user_organizations row missing for peerThe peer profile JOIN excludes conversations without a matching entry

Issue: Message INSERT Fails

CauseSolution
User not a memberAdd user to chat_conversation_members first
Rate limitedCheck error code: P0429 (per-conversation) or P0430 (global)
Missing sequence grantEnsure GRANT USAGE, SELECT ON SEQUENCE private.chat_messages_id_seq TO authenticated

Issue: Realtime Events Not Received

CauseSolution
Table not in publicationVerify with SELECT * FROM pg_publication_tables WHERE pubname = 'supabase_realtime'
setAuth() not calledCall supabase.realtime.setAuth(token) before subscribing
Token expiredRe-fetch from POST /api/v1/realtime/token and call setAuth() again
RLS blocking SELECTThe RLS SELECT policy must pass for the user to receive Realtime events
Wrong schema in subscriptionUse schema: 'private' (not 'public')

Issue: Unread Count Seems Wrong

CauseSolution
Read cursor never setCall update_read_cursor when the user views a conversation
Cursor is NULLWhen last_read_message_id is NULL, all messages from other users count as unread
Self-messages includedUnread count already excludes sender_id = current_user_id()

Debug Queries

-- Check user's conversation memberships
SELECT * FROM private.chat_conversation_members
WHERE user_id = 30 AND org_id = 45;

-- Check RLS policies
SELECT tablename, policyname, roles, cmd, qual
FROM pg_policies WHERE schemaname = 'private';

-- Check Realtime publication
SELECT * FROM pg_publication_tables
WHERE pubname = 'supabase_realtime' AND schemaname = 'private';

-- Check grants
SELECT grantee, table_name, privilege_type
FROM information_schema.table_privileges
WHERE table_schema = 'private';

Quick Reference Card

┌──────────────────────────────────────────────────────────────────┐
│ CHAT SYSTEM — QUICK REFERENCE │
├──────────────────────────────────────────────────────────────────┤
│ │
│ TOKEN │
│ └─ POST /api/v1/realtime/token (empty body, httpOnly cookie) │
│ → 1-hour Supabase JWT with user_id + org_id │
│ │
│ RPC FUNCTIONS (supabase.rpc) │
│ ├─ get_recent_dms(limit, cursor) → DM inbox list │
│ ├─ get_conversation_messages(id, limit, cursor) → messages │
│ ├─ update_read_cursor(conv_id, msg_id) → mark as read │
│ ├─ update_presence(status) → heartbeat │
│ └─ get_recent_dm_presence(limit, cursor)→ batch presence │
│ │
│ DIRECT TABLE ACCESS │
│ ├─ chat_messages → INSERT (send message) │
│ └─ All 4 tables → SELECT (RLS-filtered) │
│ │
│ REALTIME CHANNELS │
│ ├─ user:{org_id}:{user_id} → broadcast notifications │
│ ├─ postgres_changes: chat_messages → new messages │
│ └─ postgres_changes: chat_user_presence → presence updates │
│ │
│ RATE LIMITS │
│ ├─ Per-conversation: 5 msgs / 10 sec (P0429) │
│ └─ Global: 20 msgs / 60 sec (P0430) │
│ │
│ PRESENCE │
│ ├─ Heartbeat interval: 30 seconds │
│ ├─ Timeout: 90 seconds → auto-offline │
│ └─ Statuses: online | away | offline │
│ │
│ JWT CLAIMS (required) │
│ ├─ user_id: bigint (as string) │
│ ├─ org_id: bigint (as string) │
│ ├─ aud: "authenticated" │
│ └─ role: "authenticated" │
│ │
└──────────────────────────────────────────────────────────────────┘