实时

Postgres变更监听

使用Supabase Realtime监听Postgres数据库变更


让我们探索如何使用 Realtime 的 Postgres Changes 功能来监听数据库事件。

快速开始

在本示例中,我们将设置一个数据库表,使用行级安全性(Row Level Security)进行保护,并通过Supabase客户端库订阅所有变更。

1

设置带有'todos'表的Supabase项目

在Supabase控制台中创建新项目

项目创建完成后,在Supabase数据库中创建表。您可以使用表界面或SQL编辑器完成此操作。

1
2
3
4
5
6
-- 创建名为"todos"的表-- 包含存储任务的列create table todos ( id serial primary key, task text);
2

允许匿名访问

在本示例中,我们将为此表启用行级安全性并允许匿名访问。在生产环境中,请确保使用适当的权限保护您的应用。

1
2
3
4
5
6
7
8
9
10
-- 启用安全性alter table "todos"enable row level security;-- 允许匿名访问create policy "允许匿名访问"on todosfor selectto anonusing (true);
3

启用Postgres复制

前往项目的发布设置,在supabase_realtime下切换您想要监听的表。

4

安装客户端

安装Supabase JavaScript客户端。

1
npm install @supabase/supabase-js
5

创建客户端

此客户端将用于监听Postgres变更。

1
2
3
4
5
6
import { createClient } from '@supabase/supabase-js'const supabase = createClient( 'https://<project>.supabase.co', '<your-anon-key>')
6

按模式监听变更

通过将schema属性设置为'public'并将事件名称设为*,监听public模式下所有表的变更。事件名称可以是以下之一:

  • INSERT
  • UPDATE
  • DELETE
  • *

频道名称可以是除'realtime'外的任何字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { createClient } from '@supabase/supabase-js'const supabase = createClient('your_project_url', 'your_supabase_api_key')// ---cut---const channelA = supabase .channel('schema-db-changes') .on( 'postgres_changes', { event: '*', schema: 'public', }, (payload) => console.log(payload) ) .subscribe()
7

插入测试数据

现在我们可以向表中添加一些数据,这将触发channelA的事件处理器。

1
2
3
insert into todos (task)values ('变更数据!');

使用方法

您可以使用 Supabase 客户端库来订阅数据库变更。

监听特定模式(schema)

使用 schema 参数订阅特定模式的事件:

1
2
3
4
5
6
7
8
9
10
11
const changes = supabase .channel('schema-db-changes') .on( 'postgres_changes', { schema: 'public', // 订阅 Postgres 中的 "public" 模式 event: '*', // 监听所有变更 }, (payload) => console.log(payload) ) .subscribe()

通道名称可以是除 'realtime' 外的任何字符串。

监听 INSERT 事件

使用 event 参数来仅监听数据库的 INSERT 操作:

1
2
3
4
5
6
7
8
9
10
11
const changes = supabase .channel('schema-db-changes') .on( 'postgres_changes', { event: 'INSERT', // 仅监听 INSERT 操作 schema: 'public', }, (payload) => console.log(payload) ) .subscribe()

通道名称可以是除 'realtime' 之外的任意字符串。

监听 UPDATE 事件

使用 event 参数来仅监听数据库的 UPDATE 操作:

1
2
3
4
5
6
7
8
9
10
11
const changes = supabase .channel('schema-db-changes') .on( 'postgres_changes', { event: 'UPDATE', // 仅监听 UPDATE 操作 schema: 'public', }, (payload) => console.log(payload) ) .subscribe()

通道名称可以是除 'realtime' 外的任意字符串。

监听 DELETE 事件

使用 event 参数来专门监听数据库的 DELETE 操作:

1
2
3
4
5
6
7
8
9
10
11
const changes = supabase .channel('schema-db-changes') .on( 'postgres_changes', { event: 'DELETE', // 仅监听 DELETE 操作 schema: 'public', }, (payload) => console.log(payload) ) .subscribe()

通道名称可以是除 'realtime' 之外的任何字符串。

监听特定表

使用 table 参数订阅特定表的事件:

1
2
3
4
5
6
7
8
9
10
11
12
const changes = supabase .channel('table-db-changes') .on( 'postgres_changes', { event: '*', schema: 'public', table: 'todos', }, (payload) => console.log(payload) ) .subscribe()

通道名称可以是除 'realtime' 外的任意字符串。

监听多个变更

要在同一个频道中监听不同事件和模式/表/过滤条件的组合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const channel = supabase .channel('db-changes') .on( 'postgres_changes', { event: '*', schema: 'public', table: 'messages', }, (payload) => console.log(payload) ) .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'users', }, (payload) => console.log(payload) ) .subscribe()

筛选特定变更

使用 filter 参数进行细粒度变更筛选:

1
2
3
4
5
6
7
8
9
10
11
12
13
const changes = supabase .channel('table-filter-changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'todos', filter: 'id=eq.1', }, (payload) => console.log(payload) ) .subscribe()

可用过滤器

Realtime 提供过滤器功能,让您可以更精细地控制客户端接收的数据。

等于 (eq)

当表中某列的值等于客户端指定的值时监听变更:

1
2
3
4
5
6
7
8
9
10
11
12
13
const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'UPDATE', schema: 'public', table: 'messages', filter: 'body=eq.hey', }, (payload) => console.log(payload) ) .subscribe()

该过滤器使用 PostgreSQL 的 = 运算符进行过滤。

不等于 (neq)

监听表中某列值不等于客户端指定值时的变更:

1
2
3
4
5
6
7
8
9
10
11
12
13
const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'messages', filter: 'body=neq.bye', }, (payload) => console.log(payload) ) .subscribe()

该过滤器使用 PostgreSQL 的 != 运算符。

小于 (lt)

用于监听表中某列值小于客户端指定值时的变更:

1
2
3
4
5
6
7
8
9
10
11
12
13
const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public, table: 'profiles', filter: 'age=lt.65', }, (payload) => console.log(payload) ) .subscribe()

该过滤器使用 Postgres 的 < 运算符,因此适用于非数值类型。使用时请注意检查被比较数据类型的预期行为。

小于或等于 (lte)

用于监听表中某列值小于或等于客户端指定值时的变更:

1
2
3
4
5
6
7
8
9
10
11
12
13
const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'UPDATE', schema: 'public', table: 'profiles', filter: 'age=lte.65', }, (payload) => console.log(payload) ) .subscribe()

该过滤器使用 Postgres 的 <= 运算符,因此适用于非数字类型。请确保检查被比较数据类型的预期行为。

大于 (gt)

用于监听表中某列值大于客户端指定值时的变更:

1
2
3
4
5
6
7
8
9
10
11
12
13
const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'products', filter: 'quantity=gt.10', }, (payload) => console.log(payload) ) .subscribe()

该过滤器使用 Postgres 的 > 运算符,因此适用于非数值类型。使用时请注意比较数据类型的预期行为。

大于或等于 (gte)

用于监听表中某列值大于或等于客户端指定值时的变更:

1
2
3
4
5
6
7
8
9
10
11
12
13
const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'products', filter: 'quantity=gte.10', }, (payload) => console.log(payload) ) .subscribe()

该过滤器使用 Postgres 的 >= 运算符,因此适用于非数值类型。请确保检查被比较数据类型的预期行为。

包含在列表中 (in)

监听表中列值等于客户端指定任意值时的变更:

1
2
3
4
5
6
7
8
9
10
11
12
13
const channel = supabase .channel('changes') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'colors', filter: 'name=in.(red, blue, yellow)', }, (payload) => console.log(payload) ) .subscribe()

此过滤器使用 Postgres 的 = ANY 语法。Realtime 服务对此过滤器最多支持 100 个值。

接收 old 记录

默认情况下,只会发送 new 记录变更。但如果您想在 UPDATEDELETE 记录时获取 old 记录(先前值),可以将表的 replica identity 设置为 full

1
2
alter table messages replica identity full;

私有模式

Postgres Changes 功能默认支持 public 模式下的表。要监听私有模式中的表,您需要向访问令牌中的数据库角色授予表的 SELECT 权限。可以执行类似以下查询:

1
grant select on "non_private_schema"."some_table" to authenticated;

自定义令牌

您可以选择自行签署令牌,以定制可在RLS策略中检查的声明。

项目JWT密钥可在仪表板的项目API密钥中找到。

要在Realtime中使用自定义JWT,请确保在实例化Supabase客户端后、连接Channel前设置令牌。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const { createClient } = require('@supabase/supabase-js')const supabase = createClient(process.env.SUPABASE_URL, process.env.SUPABASE_KEY, {})// 在此设置您的自定义JWTsupabase.realtime.setAuth('your-custom-jwt')const channel = supabase .channel('db-changes') .on( 'postgres_changes', { event: '*', schema: 'public', table: 'messages', filter: 'body=eq.bye', }, (payload) => console.log(payload) ) .subscribe()

刷新令牌

您需要自行刷新令牌,但生成后可以将其传递给 Realtime。

例如,如果您使用的是 supabase-js v2 客户端,可以这样传递令牌:

1
2
3
// 客户端设置supabase.realtime.setAuth('fresh-token')

限制

删除事件不可过滤

在跟踪 Postgres 变更时,您无法过滤删除事件。这个限制是由于从 Postgres 提取变更的方式导致的。

表名中的空格

当前 Realtime 不支持表名包含空格的情况。

数据库实例与实时性能

实时系统通常需要预先规划,因为其扩展特性较为特殊。对于Postgres变更功能,每个变更事件都需要检查订阅用户是否有访问权限。例如,如果有100个用户订阅了某个表,当您执行一次插入操作时,将触发100次"读取":每个用户各一次。

这可能导致数据库瓶颈,限制消息吞吐量。如果数据库无法快速完成变更授权,变更将被延迟,直至超时。

数据库变更在单线程中处理以保持变更顺序。这意味着计算资源升级对Postgres变更订阅的性能影响有限。您可以通过下方工具估算数据库的预期最大吞吐量。

如需大规模使用Postgres变更功能,建议考虑使用不带RLS(行级安全)和过滤器的独立"公共"表。或者,您可以仅在服务器端使用Realtime,然后通过Realtime广播将变更重新分发给客户端。

输入您的数据库设置以估算实例的最大吞吐量:

Set your expected parameters

Current maximum possible throughput

Total DB changes /secMax messages per client /secMax total messages /secLatency p95
646432,000238ms

请务必运行自己的基准测试,确保性能满足您的使用场景需求。

我们正在对Realtime的Postgres变更功能进行多项改进。如果您对特定使用场景的性能存在疑问,请通过支持表单联系我们,我们将竭诚为您提供帮助。我们拥有专业工程师团队,可为您量身定制最佳解决方案。