PGMQ 扩展
pgmq 是一个基于 Postgres 构建的轻量级消息队列。
功能特性
- 轻量级 - 无需后台工作进程或外部依赖,仅包含打包在扩展中的 Postgres 函数
- 在可见性超时内实现消息"精确一次"投递给消费者
- 提供与 AWS SQS 和 RSMQ 相同的 API 接口
- 消息会保留在队列中直到被显式删除
- 消息可选择归档而非删除,支持长期保留和重放
启用扩展
1create extension pgmq;
使用方法
队列管理
create
创建新队列。
12pgmq.create(queue_name text)returns void
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
示例:
123select from pgmq.create('my_queue'); create--------
create_unlogged
创建非持久化(unlogged)表。当写入吞吐量比持久性更重要时非常有用。 详情请参阅 Postgres 文档关于非持久化表的说明。
12pgmq.create_unlogged(queue_name text)returns void
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
示例:
123select pgmq.create_unlogged('my_unlogged'); create_unlogged-----------------
detach_archive
将队列的归档表从 PGMQ 扩展中分离。可防止执行drop extension pgmq
时删除队列的归档表。
此操作不会阻止后续的 archives() 操作继续向归档表追加数据。
1pgmq.detach_archive(queue_name text)
参数:
参数 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
示例:
123select * from pgmq.detach_archive('my_queue'); detach_archive----------------
drop_queue
删除队列及其归档表。
12pgmq.drop_queue(queue_name text)returns boolean
参数:
参数 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
示例:
1234select * from pgmq.drop_queue('my_unlogged'); drop_queue------------ t
发送消息
send
向队列发送单条消息。
123456pgmq.send( queue_name text, msg jsonb, delay integer default 0)returns setof bigint
参数:
参数 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
msg | jsonb | 要发送到队列的消息 |
delay | integer | 消息变为可见前的延迟时间(秒)。默认为0。 |
示例:
1234select * from pgmq.send('my_queue', '{"hello": "world"}'); send------ 4
send_batch
向队列发送一条或多条消息。
123456pgmq.send_batch( queue_name text, msgs jsonb[], delay integer default 0)returns setof bigint
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
msgs | jsonb[] | 要发送到队列的消息数组 |
delay | integer | 消息变为可见前的延迟时间(秒),默认为0 |
1234567891011select * from pgmq.send_batch( 'my_queue', array[ '{"hello": "world_0"}'::jsonb, '{"hello": "world_1"}'::jsonb ]); send_batch------------ 1 2
读取消息
read
从队列中读取1条或多条消息。VT参数指定读取后消息对其他消费者不可见的延迟时间(秒)。
1234567pgmq.read( queue_name text, vt integer, qty integer)returns setof pgmq.message_record
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
vt | integer | 消息被读取后保持不可见状态的秒数 |
qty | integer | 从队列中读取的消息数量,默认为1 |
示例:
123456select * from pgmq.read('my_queue', 10, 2); msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+---------------------- 1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"} 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"}(2 rows)
read_with_poll
功能与read()相同,但额外提供便捷的长轮询功能。
当队列中没有消息时,函数调用会等待最多max_poll_seconds
秒后返回。
如果在此期间队列接收到新消息,会立即读取并返回这些消息。
12345678pgmq.read_with_poll( queue_name text, vt integer, qty integer, max_poll_seconds integer default 5, poll_interval_ms integer default 100)returns setof pgmq.message_record
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
vt | integer | 消息被读取后变为不可见的持续时间(秒) |
qty | integer | 从队列中读取的消息数量,默认为1 |
max_poll_seconds | integer | 等待新消息到达队列的最长时间(秒),默认为5 |
poll_interval_ms | integer | 内部轮询操作之间的间隔时间(毫秒),默认为100 |
示例:
1234select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100); msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+-------------------- 1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}
pop
方法
从队列中读取单条消息并在读取后立即删除。
注意:如果消费应用程序不能保证消息被处理,使用pop()方法将实现至多一次(at-most-once)的投递语义。
12pgmq.pop(queue_name text)returns setof pgmq.message_record
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
示例:
1234pgmq=# select * from pgmq.pop('my_queue'); msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+-------------------- 1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}
删除/归档消息
delete
(单条)
从队列中删除单条消息。
12pgmq.delete (queue_name text, msg_id: bigint)returns boolean
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
msg_id | bigint | 要删除消息的ID |
示例:
1234select pgmq.delete('my_queue', 5); delete-------- t
delete
(批量)
从队列中删除一条或多条消息。
12pgmq.delete (queue_name text, msg_ids: bigint[])returns setof bigint
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
msg_ids | bigint[] | 要删除的消息ID数组 |
示例:
删除两条存在的消息:
12345select * from pgmq.delete('my_queue', array[2, 3]); delete-------- 2 3
删除两条消息(一条存在,一条不存在)。消息999
不存在:
1234select * from pgmq.delete('my_queue', array[6, 999]); delete-------- 6
purge_queue
永久删除队列中的所有消息。返回被删除的消息数量。
12purge_queue(queue_name text)returns bigint
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
示例:
当队列中包含8条消息时清空队列:
1234select * from pgmq.purge_queue('my_queue'); purge_queue------------- 8
archive
(单条)
从指定队列中移除单条请求的消息并将其插入到队列的归档表中。
12pgmq.archive(queue_name text, msg_id bigint)returns boolean
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
msg_id | bigint | 需要归档的消息ID |
返回值 布尔值,表示操作成功或失败。
示例:从my_queue
队列中移除ID为1的消息并归档:
1234select * from pgmq.archive('my_queue', 1); archive--------- t
archive
(批量)
从指定队列中移除一批请求的消息并将它们插入到队列的归档表中。 返回成功归档的消息ID数组。
12pgmq.archive(queue_name text, msg_ids bigint[])RETURNS SETOF bigint
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
msg_ids | bigint[] | 需要归档的消息ID数组 |
示例:
从队列my_queue
中删除ID为1和2的消息并移动到归档。
12345select * from pgmq.archive('my_queue', array[1, 2]); archive--------- 1 2
删除存在的消息4和不存在的消息999。
1234select * from pgmq.archive('my_queue', array[4, 999]); archive--------- 4
实用工具
set_vt
设置消息的可见性超时时间,将其推迟到指定的未来时间。返回被更新消息的记录。
123456pgmq.set_vt( queue_name text, msg_id bigint, vt_offset integer)returns pgmq.message_record
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 队列名称 |
msg_id | bigint | 需要设置可见性时间的消息ID |
vt_offset | integer | 从当前时间开始计算的秒数,表示消息的可见性超时应设置的时长 |
示例:
将消息1的可见性超时设置为从现在起30秒后。
1234select * from pgmq.set_vt('my_queue', 11, 30); msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+---------------------- 1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"}
list_queues
列出当前存在的所有队列。
1234567list_queues()RETURNS TABLE( queue_name text, created_at timestamp with time zone, is_partitioned boolean, is_unlogged boolean)
示例:
123456select * from pgmq.list_queues(); queue_name | created_at | is_partitioned | is_unlogged----------------------+-------------------------------+----------------+------------- my_queue | 2023-10-28 14:13:17.092576-05 | f | f my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f my_unlogged | 2023-10-28 20:02:30.976109-05 | f | t
metrics
获取特定队列的指标数据。
123456789pgmq.metrics(queue_name: text)returns table( queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone)
参数说明:
参数名 | 类型 | 描述 |
---|---|---|
queue_name | text | 目标队列的名称 |
返回值:
属性 | 类型 | 描述 | |
---|---|---|---|
queue_name | text | 队列名称 | |
queue_length | bigint | 当前队列中的消息数量 | |
newest_msg_age_sec | `integer | null` | 队列中最新的消息存在时长(秒) |
oldest_msg_age_sec | `integer | null` | 队列中最旧的消息存在时长(秒) |
total_messages | bigint | 该队列历史处理过的消息总数 | |
scrape_time | timestamp with time zone | 当前时间戳 |
示例:
1234select * from pgmq.metrics('my_queue'); queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05
metrics_all
获取所有现有队列的指标。
123456789pgmq.metrics_all()RETURNS TABLE( queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone)
返回值:
属性 | 类型 | 描述 | |
---|---|---|---|
queue_name | text | 队列名称 | |
queue_length | bigint | 当前队列中的消息数量 | |
newest_msg_age_sec | `integer | null` | 队列中最新的消息存在时间(秒) |
oldest_msg_age_sec | `integer | null` | 队列中最旧的消息存在时间(秒) |
total_messages | bigint | 该队列有史以来处理过的消息总数 | |
scrape_time | timestamp with time zone | 当前时间戳 |
123456select * from pgmq.metrics_all(); queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time----------------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05 my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05 my_unlogged | 1 | 3 | 3 | 1 | 2023-10-28 20:25:07.016413-05
类型
message_record
队列中消息的完整表示。
属性名称 | 类型 | 描述 |
---|---|---|
msg_id | bigint | 消息的唯一ID |
read_ct | bigint | 消息被读取的次数。每次调用read()时递增。 |
enqueued_at | timestamp with time zone | 消息被插入队列的时间 |
vt | timestamp with time zone | 消息可供消费者读取的时间戳 |
message | jsonb | 消息的有效载荷 |
示例:
123msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+-------------------- 1 | 1 | 2023-10-28 19:06:19.941509-05 | 2023-10-28 19:06:27.419392-05 | {"hello": "world"}
相关资源
- 官方文档: pgmq/api