Postgres变更监听
使用Supabase Realtime监听Postgres数据库变更
让我们探索如何使用 Realtime 的 Postgres Changes 功能来监听数据库事件。
快速开始
在本示例中,我们将设置一个数据库表,使用行级安全性(Row Level Security)进行保护,并通过Supabase客户端库订阅所有变更。
允许匿名访问
在本示例中,我们将为此表启用行级安全性并允许匿名访问。在生产环境中,请确保使用适当的权限保护您的应用。
12345678910-- 启用安全性alter table "todos"enable row level security;-- 允许匿名访问create policy "允许匿名访问"on todosfor selectto anonusing (true);
安装客户端
安装Supabase JavaScript客户端。
1npm install @supabase/supabase-js
创建客户端
此客户端将用于监听Postgres变更。
123456import { createClient } from '@supabase/supabase-js'const supabase = createClient( 'https://<project>.supabase.co', '<your-anon-key>')
按模式监听变更
通过将schema
属性设置为'public'并将事件名称设为*
,监听public
模式下所有表的变更。事件名称可以是以下之一:
INSERT
UPDATE
DELETE
*
频道名称可以是除'realtime'外的任何字符串。
123456789101112131415import { createClient } from '@supabase/supabase-js'const supabase = createClient('your_project_url', 'your_supabase_api_key')// ---cut---const channelA = supabase .channel('schema-db-changes') .on( 'postgres_changes', { event: '*', schema: 'public', }, (payload) => console.log(payload) ) .subscribe()
插入测试数据
现在我们可以向表中添加一些数据,这将触发channelA
的事件处理器。
123insert into todos (task)values ('变更数据!');
使用方法
您可以使用 Supabase 客户端库来订阅数据库变更。
监听特定模式(schema)
使用 schema
参数订阅特定模式的事件:
1234567891011const changes = supabase .channel('schema-db-changes') .on( 'postgres_changes', { schema: 'public', // 订阅 Postgres 中的 "public" 模式 event: '*', // 监听所有变更 }, (payload) => console.log(payload) ) .subscribe()
通道名称可以是除 'realtime' 外的任何字符串。
监听 INSERT
事件
使用 event
参数来仅监听数据库的 INSERT
操作:
1234567891011const changes = supabase .channel('schema-db-changes') .on( 'postgres_changes', { event: 'INSERT', // 仅监听 INSERT 操作 schema: 'public', }, (payload) => console.log(payload) ) .subscribe()
通道名称可以是除 'realtime' 之外的任意字符串。
监听 UPDATE
事件
使用 event
参数来仅监听数据库的 UPDATE
操作:
1234567891011const changes = supabase .channel('schema-db-changes') .on( 'postgres_changes', { event: 'UPDATE', // 仅监听 UPDATE 操作 schema: 'public', }, (payload) => console.log(payload) ) .subscribe()
通道名称可以是除 'realtime' 外的任意字符串。
监听 DELETE
事件
使用 event
参数来专门监听数据库的 DELETE
操作:
1234567891011const changes = supabase .channel('schema-db-changes') .on( 'postgres_changes', { event: 'DELETE', // 仅监听 DELETE 操作 schema: 'public', }, (payload) => console.log(payload) ) .subscribe()
通道名称可以是除 'realtime' 之外的任何字符串。
监听特定表
使用 table
参数订阅特定表的事件:
123456789101112const changes = supabase .channel('table-db-changes') .on( 'postgres_changes', { event: '*', schema: 'public', table: 'todos', }, (payload) => console.log(payload) ) .subscribe()
通道名称可以是除 'realtime' 外的任意字符串。
监听多个变更
要在同一个频道中监听不同事件和模式/表/过滤条件的组合:
123456789101112131415161718192021const channel = supabase .channel('db-changes') .on( 'postgres_changes', { event: '*', schema: 'public', table: 'messages', }, (payload) => console.log(payload) ) .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'users', }, (payload) => console.log(payload) ) .subscribe()
筛选特定变更
使用 filter
参数进行细粒度变更筛选:
12345678910111213const changes = supabase .channel('table-filter-changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'todos', filter: 'id=eq.1', }, (payload) => console.log(payload) ) .subscribe()
可用过滤器
Realtime 提供过滤器功能,让您可以更精细地控制客户端接收的数据。
等于 (eq
)
当表中某列的值等于客户端指定的值时监听变更:
12345678910111213const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'UPDATE', schema: 'public', table: 'messages', filter: 'body=eq.hey', }, (payload) => console.log(payload) ) .subscribe()
该过滤器使用 PostgreSQL 的 =
运算符进行过滤。
不等于 (neq
)
监听表中某列值不等于客户端指定值时的变更:
12345678910111213const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'messages', filter: 'body=neq.bye', }, (payload) => console.log(payload) ) .subscribe()
该过滤器使用 PostgreSQL 的 !=
运算符。
小于 (lt
)
用于监听表中某列值小于客户端指定值时的变更:
12345678910111213const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public, table: 'profiles', filter: 'age=lt.65', }, (payload) => console.log(payload) ) .subscribe()
该过滤器使用 Postgres 的 <
运算符,因此适用于非数值类型。使用时请注意检查被比较数据类型的预期行为。
小于或等于 (lte
)
用于监听表中某列值小于或等于客户端指定值时的变更:
12345678910111213const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'UPDATE', schema: 'public', table: 'profiles', filter: 'age=lte.65', }, (payload) => console.log(payload) ) .subscribe()
该过滤器使用 Postgres 的 <=
运算符,因此适用于非数字类型。请确保检查被比较数据类型的预期行为。
大于 (gt
)
用于监听表中某列值大于客户端指定值时的变更:
12345678910111213const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'products', filter: 'quantity=gt.10', }, (payload) => console.log(payload) ) .subscribe()
该过滤器使用 Postgres 的 >
运算符,因此适用于非数值类型。使用时请注意比较数据类型的预期行为。
大于或等于 (gte
)
用于监听表中某列值大于或等于客户端指定值时的变更:
12345678910111213const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'products', filter: 'quantity=gte.10', }, (payload) => console.log(payload) ) .subscribe()
该过滤器使用 Postgres 的 >=
运算符,因此适用于非数值类型。请确保检查被比较数据类型的预期行为。
包含在列表中 (in)
监听表中列值等于客户端指定任意值时的变更:
12345678910111213const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'colors', filter: 'name=in.(red, blue, yellow)', }, (payload) => console.log(payload) ) .subscribe()
此过滤器使用 Postgres 的 = ANY
语法。Realtime 服务对此过滤器最多支持 100 个值。
接收 old
记录
默认情况下,只会发送 new
记录变更。但如果您想在 UPDATE
或 DELETE
记录时获取 old
记录(先前值),可以将表的 replica identity
设置为 full
:
12alter table messages replica identity full;
RLS(行级安全)策略不会应用于 DELETE
语句,因为Postgres无法验证用户是否有权访问已删除的记录。当启用RLS且表的 replica identity
设为 full
时,old
记录仅包含主键信息。
私有模式
Postgres Changes 功能默认支持 public
模式下的表。要监听私有模式中的表,您需要向访问令牌中的数据库角色授予表的 SELECT
权限。可以执行类似以下查询:
1grant select on "non_private_schema"."some_table" to authenticated;
我们强烈建议您为私有模式中的表启用RLS并创建策略。否则,任何被授予访问权限的角色都将拥有对该表的无限制读取权限。
自定义令牌
您可以选择自行签署令牌,以定制可在RLS策略中检查的声明。
项目JWT密钥可在仪表板的项目API密钥中找到。
切勿在客户端暴露service_role
令牌,因为该角色被授权绕过行级安全策略。
要在Realtime中使用自定义JWT,请确保在实例化Supabase客户端后、连接Channel前设置令牌。
1234567891011121314151617181920const { createClient } = require('@supabase/supabase-js')const supabase = createClient(process.env.SUPABASE_URL, process.env.SUPABASE_KEY, {})// 在此设置您的自定义JWTsupabase.realtime.setAuth('your-custom-jwt')const channel = supabase .channel('db-changes') .on( 'postgres_changes', { event: '*', schema: 'public', table: 'messages', filter: 'body=eq.bye', }, (payload) => console.log(payload) ) .subscribe()
刷新令牌
您需要自行刷新令牌,但生成后可以将其传递给 Realtime。
例如,如果您使用的是 supabase-js
v2
客户端,可以这样传递令牌:
123// 客户端设置supabase.realtime.setAuth('fresh-token')
限制
删除事件不可过滤
在跟踪 Postgres 变更时,您无法过滤删除事件。这个限制是由于从 Postgres 提取变更的方式导致的。
表名中的空格
当前 Realtime 不支持表名包含空格的情况。
数据库实例与实时性能
实时系统通常需要预先规划,因为其扩展特性较为特殊。对于Postgres变更
功能,每个变更事件都需要检查订阅用户是否有访问权限。例如,如果有100个用户订阅了某个表,当您执行一次插入操作时,将触发100次"读取":每个用户各一次。
这可能导致数据库瓶颈,限制消息吞吐量。如果数据库无法快速完成变更授权,变更将被延迟,直至超时。
数据库变更在单线程中处理以保持变更顺序。这意味着计算资源升级对Postgres变更订阅的性能影响有限。您可以通过下方工具估算数据库的预期最大吞吐量。
如需大规模使用Postgres变更功能,建议考虑使用不带RLS(行级安全)和过滤器的独立"公共"表。或者,您可以仅在服务器端使用Realtime,然后通过Realtime广播将变更重新分发给客户端。
输入您的数据库设置以估算实例的最大吞吐量:
Set your expected parameters
Current maximum possible throughput
Total DB changes /sec | Max messages per client /sec | Max total messages /sec | Latency p95 |
---|---|---|---|
64 | 64 | 32,000 | 238ms |
View raw throughput table
请务必运行自己的基准测试,确保性能满足您的使用场景需求。
我们正在对Realtime的Postgres变更功能进行多项改进。如果您对特定使用场景的性能存在疑问,请通过支持表单联系我们,我们将竭诚为您提供帮助。我们拥有专业工程师团队,可为您量身定制最佳解决方案。