队列

快速入门

学习如何使用 Supabase Queues 添加和读取消息


本指南介绍如何通过仪表盘和官方客户端库与 Supabase 队列进行交互。更多 API 详情请参阅队列 API 参考

核心概念

Supabase 队列是基于拉取(pull-based)的消息队列系统,由三个主要组件构成:队列(Queues)、消息(Messages)和队列类型(Queue Types)。

拉取式队列

拉取式队列是一种消息存储和传递系统,消费者在准备好处理消息时会主动获取消息 - 类似于不断刷新网页以显示最新更新。我们的拉取式队列按照先进先出(FIFO)原则处理消息,不设优先级级别。

消息

队列中的消息是一个 JSON 对象,在被消费者显式处理和删除前会一直存储,就像待办事项列表中等待被检查和完成的任务。

队列类型

Supabase 队列提供三种类型的队列:

  • 基础队列(Basic Queue):持久化队列,将消息存储在已记录(logged)表中

  • 未记录队列(Unlogged Queue):临时队列,将消息存储在未记录(unlogged)表中以获得更好性能,但可能导致队列消息丢失

  • 分区队列(Partitioned Queue)(即将推出):持久化且可扩展的队列,将消息存储在多个表分区中以获得更好性能

创建队列

开始使用前,请先在仪表板的集成模块下导航至 Supabase 队列 Postgres 模块,并启用 pgmq 扩展。

队列页面上:

  • 点击添加新队列按钮
  • 为队列命名

创建队列时会发生什么?

每个新队列都会在 pgmq 模式中创建两张表:pgmq.q_<队列名称> 用于存储和处理活跃消息,pgmq.a_<队列名称> 用于存储已归档消息。

"基础队列"会创建 pgmq.q_<队列名称>pgmq.a_<队列名称> 作为常规表(logged tables)。

而"非日志队列"会创建 pgmq.q_<队列名称> 作为非日志表(unlogged table)以获得更好性能(但牺牲了持久性)。pgmq.a_<队列名称> 表仍会作为常规表创建,确保归档消息的安全存储。

向客户端消费者暴露队列

默认情况下,队列不会通过 Supabase 数据 API 暴露,只能通过 Postgres 客户端访问。

但是,您可以通过启用 Supabase 数据 API 并授予对队列 API 的权限,来允许客户端消费者访问您的队列。队列 API 是 pgmq_public 模式中的一组数据库函数,它们封装了 pgmq 模式中的数据库函数。

这样做是为了防止直接访问 pgmq 模式及其表(默认情况下所有表都没有启用 RLS)和数据库函数。

要开始使用,请导航至队列的设置页面并开启"通过 PostgREST 暴露队列"选项。启用后,Supabase 会创建并暴露一个 pgmq_public 模式,其中包含对 pgmq 部分数据库函数的封装。

pgmq 模式的表上启用 RLS

出于安全考虑,如果启用了数据 API,您必须在所有队列表(pgmq 模式中以 q_ 开头的所有表)上启用行级安全(RLS)。

您需要为您希望客户端消费者交互的任何队列创建 RLS 策略。

授予 pgmq_public 数据库函数权限

除了启用RLS(行级安全)并为底层队列表编写RLS策略外,您还必须为每个数据API角色授予pgmq_public数据库函数的正确权限。

每个队列API数据库函数所需的权限:

操作所需权限
send send_batchSelect Insert
read popSelect Update
archive deleteSelect Delete

要管理队列权限,请点击队列设置按钮。

然后启用所需角色的权限。

消息的入队与出队

创建队列后,您就可以开始进行消息的入队和出队操作了。

以下是使用官方 Supabase 客户端库的 TypeScript 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import { createClient } from '@supabase/supabase-js'const supabaseUrl = 'supabaseURL'const supabaseKey = 'supabaseKey'const supabase = createClient(supabaseUrl, supabaseKey)const QueuesTest: React.FC = () => { // 添加消息 const sendToQueue = async () => { const result = await supabase.schema('pgmq_public').rpc('send', { queue_name: 'foo', message: { hello: 'world' }, sleep_seconds: 30, }) console.log(result) } // 取出消息 const popFromQueue = async () => { const result = await supabase.schema('pgmq_public').rpc('pop', { queue_name: 'foo' }) console.log(result) } return ( <div className="p-6"> <h2 className="text-2xl font-bold mb-4">队列测试组件</h2> <button onClick={sendToQueue} className="bg-blue-500 text-white px-4 py-2 rounded hover:bg-blue-600 mr-4" > 添加消息 </button> <button onClick={popFromQueue} className="bg-blue-500 text-white px-4 py-2 rounded hover:bg-blue-600" > 取出消息 </button> </div> )}export default QueuesTest