快速入门
学习如何使用 Supabase Queues 添加和读取消息
本指南介绍如何通过仪表盘和官方客户端库与 Supabase 队列进行交互。更多 API 详情请参阅队列 API 参考。
核心概念
Supabase 队列是基于拉取(pull-based)的消息队列系统,由三个主要组件构成:队列(Queues)、消息(Messages)和队列类型(Queue Types)。
拉取式队列
拉取式队列是一种消息存储和传递系统,消费者在准备好处理消息时会主动获取消息 - 类似于不断刷新网页以显示最新更新。我们的拉取式队列按照先进先出(FIFO)原则处理消息,不设优先级级别。
消息
队列中的消息是一个 JSON 对象,在被消费者显式处理和删除前会一直存储,就像待办事项列表中等待被检查和完成的任务。
队列类型
Supabase 队列提供三种类型的队列:
-
基础队列(Basic Queue):持久化队列,将消息存储在已记录(logged)表中
-
未记录队列(Unlogged Queue):临时队列,将消息存储在未记录(unlogged)表中以获得更好性能,但可能导致队列消息丢失
-
分区队列(Partitioned Queue)(即将推出):持久化且可扩展的队列,将消息存储在多个表分区中以获得更好性能
创建队列
开始使用前,请先在仪表板的集成模块下导航至 Supabase 队列 Postgres 模块,并启用 pgmq
扩展。
pgmq
扩展要求 Postgres 版本为 15.6.1.143 或更高。
在队列页面上:
- 点击添加新队列按钮
如果已创建过队列,请点击创建队列按钮。
- 为队列命名
队列名称只能使用小写字母,允许使用连字符和下划线。
- 选择队列类型
创建队列时会发生什么?
每个新队列都会在 pgmq
模式中创建两张表:pgmq.q_<队列名称>
用于存储和处理活跃消息,pgmq.a_<队列名称>
用于存储已归档消息。
"基础队列"会创建 pgmq.q_<队列名称>
和 pgmq.a_<队列名称>
作为常规表(logged tables)。
而"非日志队列"会创建 pgmq.q_<队列名称>
作为非日志表(unlogged table)以获得更好性能(但牺牲了持久性)。pgmq.a_<队列名称>
表仍会作为常规表创建,确保归档消息的安全存储。
向客户端消费者暴露队列
默认情况下,队列不会通过 Supabase 数据 API 暴露,只能通过 Postgres 客户端访问。
但是,您可以通过启用 Supabase 数据 API 并授予对队列 API 的权限,来允许客户端消费者访问您的队列。队列 API 是 pgmq_public
模式中的一组数据库函数,它们封装了 pgmq
模式中的数据库函数。
这样做是为了防止直接访问 pgmq
模式及其表(默认情况下所有表都没有启用 RLS)和数据库函数。
要开始使用,请导航至队列的设置页面并开启"通过 PostgREST 暴露队列"选项。启用后,Supabase 会创建并暴露一个 pgmq_public
模式,其中包含对 pgmq
部分数据库函数的封装。
在 pgmq
模式的表上启用 RLS
出于安全考虑,如果启用了数据 API,您必须在所有队列表(pgmq
模式中以 q_
开头的所有表)上启用行级安全(RLS)。
您需要为您希望客户端消费者交互的任何队列创建 RLS 策略。
授予 pgmq_public
数据库函数权限
除了启用RLS(行级安全)并为底层队列表编写RLS策略外,您还必须为每个数据API角色授予pgmq_public
数据库函数的正确权限。
每个队列API数据库函数所需的权限:
操作 | 所需权限 |
---|---|
send send_batch | Select Insert |
read pop | Select Update |
archive delete | Select Delete |
要管理队列权限,请点击队列设置按钮。
然后启用所需角色的权限。
postgres
和 service_role
角色绝不应该暴露在客户端。
消息的入队与出队
创建队列后,您就可以开始进行消息的入队和出队操作了。
以下是使用官方 Supabase 客户端库的 TypeScript 示例:
1234567891011121314151617181920212223242526272829303132333435363738394041424344import { createClient } from '@supabase/supabase-js'const supabaseUrl = 'supabaseURL'const supabaseKey = 'supabaseKey'const supabase = createClient(supabaseUrl, supabaseKey)const QueuesTest: React.FC = () => { // 添加消息 const sendToQueue = async () => { const result = await supabase.schema('pgmq_public').rpc('send', { queue_name: 'foo', message: { hello: 'world' }, sleep_seconds: 30, }) console.log(result) } // 取出消息 const popFromQueue = async () => { const result = await supabase.schema('pgmq_public').rpc('pop', { queue_name: 'foo' }) console.log(result) } return ( <div className="p-6"> <h2 className="text-2xl font-bold mb-4">队列测试组件</h2> <button onClick={sendToQueue} className="bg-blue-500 text-white px-4 py-2 rounded hover:bg-blue-600 mr-4" > 添加消息 </button> <button onClick={popFromQueue} className="bg-blue-500 text-white px-4 py-2 rounded hover:bg-blue-600" > 取出消息 </button> </div> )}export default QueuesTest