队列

PGMQ 扩展


pgmq 是一个基于 Postgres 构建的轻量级消息队列。

功能特性

  • 轻量级 - 无需后台工作进程或外部依赖,仅包含打包在扩展中的 Postgres 函数
  • 在可见性超时内实现消息"精确一次"投递给消费者
  • 提供与 AWS SQS 和 RSMQ 相同的 API 接口
  • 消息会保留在队列中直到被显式删除
  • 消息可选择归档而非删除,支持长期保留和重放

启用扩展

1
create extension pgmq;

使用方法

队列管理

create

创建新队列。

1
2
pgmq.create(queue_name text)returns void

参数说明:

参数名类型描述
queue_nametext队列名称

示例:

1
2
3
select from pgmq.create('my_queue'); create--------

create_unlogged

创建非持久化(unlogged)表。当写入吞吐量比持久性更重要时非常有用。 详情请参阅 Postgres 文档关于非持久化表的说明。

1
2
pgmq.create_unlogged(queue_name text)returns void

参数说明:

参数名类型描述
queue_nametext队列名称

示例:

1
2
3
select pgmq.create_unlogged('my_unlogged'); create_unlogged-----------------

detach_archive

将队列的归档表从 PGMQ 扩展中分离。可防止执行drop extension pgmq时删除队列的归档表。 此操作不会阻止后续的 archives() 操作继续向归档表追加数据。

1
pgmq.detach_archive(queue_name text)

参数:

参数类型描述
queue_nametext队列名称

示例:

1
2
3
select * from pgmq.detach_archive('my_queue'); detach_archive----------------

drop_queue

删除队列及其归档表。

1
2
pgmq.drop_queue(queue_name text)returns boolean

参数:

参数类型描述
queue_nametext队列名称

示例:

1
2
3
4
select * from pgmq.drop_queue('my_unlogged'); drop_queue------------ t

发送消息

send

向队列发送单条消息。

1
2
3
4
5
6
pgmq.send( queue_name text, msg jsonb, delay integer default 0)returns setof bigint

参数:

参数类型描述
queue_nametext队列名称
msgjsonb要发送到队列的消息
delayinteger消息变为可见前的延迟时间(秒)。默认为0。

示例:

1
2
3
4
select * from pgmq.send('my_queue', '{"hello": "world"}'); send------ 4

send_batch

向队列发送一条或多条消息。

1
2
3
4
5
6
pgmq.send_batch( queue_name text, msgs jsonb[], delay integer default 0)returns setof bigint

参数说明:

参数名类型描述
queue_nametext队列名称
msgsjsonb[]要发送到队列的消息数组
delayinteger消息变为可见前的延迟时间(秒),默认为0
1
2
3
4
5
6
7
8
9
10
11
select * from pgmq.send_batch( 'my_queue', array[ '{"hello": "world_0"}'::jsonb, '{"hello": "world_1"}'::jsonb ]); send_batch------------ 1 2

读取消息

read

从队列中读取1条或多条消息。VT参数指定读取后消息对其他消费者不可见的延迟时间(秒)。

1
2
3
4
5
6
7
pgmq.read( queue_name text, vt integer, qty integer)returns setof pgmq.message_record

参数说明:

参数名类型描述
queue_nametext队列名称
vtinteger消息被读取后保持不可见状态的秒数
qtyinteger从队列中读取的消息数量,默认为1

示例:

1
2
3
4
5
6
select * from pgmq.read('my_queue', 10, 2); msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+---------------------- 1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"} 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"}(2 rows)

read_with_poll

功能与read()相同,但额外提供便捷的长轮询功能。 当队列中没有消息时,函数调用会等待最多max_poll_seconds秒后返回。 如果在此期间队列接收到新消息,会立即读取并返回这些消息。

1
2
3
4
5
6
7
8
pgmq.read_with_poll( queue_name text, vt integer, qty integer, max_poll_seconds integer default 5, poll_interval_ms integer default 100)returns setof pgmq.message_record

参数说明:

参数名类型描述
queue_nametext队列名称
vtinteger消息被读取后变为不可见的持续时间(秒)
qtyinteger从队列中读取的消息数量,默认为1
max_poll_secondsinteger等待新消息到达队列的最长时间(秒),默认为5
poll_interval_msinteger内部轮询操作之间的间隔时间(毫秒),默认为100

示例:

1
2
3
4
select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100); msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+-------------------- 1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}

pop方法

从队列中读取单条消息并在读取后立即删除。

注意:如果消费应用程序不能保证消息被处理,使用pop()方法将实现至多一次(at-most-once)的投递语义。

1
2
pgmq.pop(queue_name text)returns setof pgmq.message_record

参数说明:

参数名类型描述
queue_nametext队列名称

示例:

1
2
3
4
pgmq=# select * from pgmq.pop('my_queue'); msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+-------------------- 1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}

删除/归档消息

delete (单条)

从队列中删除单条消息。

1
2
pgmq.delete (queue_name text, msg_id: bigint)returns boolean

参数说明:

参数名类型描述
queue_nametext队列名称
msg_idbigint要删除消息的ID

示例:

1
2
3
4
select pgmq.delete('my_queue', 5); delete-------- t

delete (批量)

从队列中删除一条或多条消息。

1
2
pgmq.delete (queue_name text, msg_ids: bigint[])returns setof bigint

参数说明:

参数名类型描述
queue_nametext队列名称
msg_idsbigint[]要删除的消息ID数组

示例:

删除两条存在的消息:

1
2
3
4
5
select * from pgmq.delete('my_queue', array[2, 3]); delete-------- 2 3

删除两条消息(一条存在,一条不存在)。消息999不存在:

1
2
3
4
select * from pgmq.delete('my_queue', array[6, 999]); delete-------- 6

purge_queue

永久删除队列中的所有消息。返回被删除的消息数量。

1
2
purge_queue(queue_name text)returns bigint

参数说明:

参数名类型描述
queue_nametext队列名称

示例:

当队列中包含8条消息时清空队列:

1
2
3
4
select * from pgmq.purge_queue('my_queue'); purge_queue------------- 8

archive (单条)

从指定队列中移除单条请求的消息并将其插入到队列的归档表中。

1
2
pgmq.archive(queue_name text, msg_id bigint)returns boolean

参数说明:

参数名类型描述
queue_nametext队列名称
msg_idbigint需要归档的消息ID

返回值 布尔值,表示操作成功或失败。

示例:从my_queue队列中移除ID为1的消息并归档:

1
2
3
4
select * from pgmq.archive('my_queue', 1); archive--------- t

archive (批量)

从指定队列中移除一批请求的消息并将它们插入到队列的归档表中。 返回成功归档的消息ID数组。

1
2
pgmq.archive(queue_name text, msg_ids bigint[])RETURNS SETOF bigint

参数说明:

参数名类型描述
queue_nametext队列名称
msg_idsbigint[]需要归档的消息ID数组

示例:

从队列my_queue中删除ID为1和2的消息并移动到归档。

1
2
3
4
5
select * from pgmq.archive('my_queue', array[1, 2]); archive--------- 1 2

删除存在的消息4和不存在的消息999。

1
2
3
4
select * from pgmq.archive('my_queue', array[4, 999]); archive--------- 4

实用工具

set_vt

设置消息的可见性超时时间,将其推迟到指定的未来时间。返回被更新消息的记录。

1
2
3
4
5
6
pgmq.set_vt( queue_name text, msg_id bigint, vt_offset integer)returns pgmq.message_record

参数说明:

参数名类型描述
queue_nametext队列名称
msg_idbigint需要设置可见性时间的消息ID
vt_offsetinteger从当前时间开始计算的秒数,表示消息的可见性超时应设置的时长

示例:

将消息1的可见性超时设置为从现在起30秒后。

1
2
3
4
select * from pgmq.set_vt('my_queue', 11, 30); msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+---------------------- 1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"}

list_queues

列出当前存在的所有队列。

1
2
3
4
5
6
7
list_queues()RETURNS TABLE( queue_name text, created_at timestamp with time zone, is_partitioned boolean, is_unlogged boolean)

示例:

1
2
3
4
5
6
select * from pgmq.list_queues(); queue_name | created_at | is_partitioned | is_unlogged----------------------+-------------------------------+----------------+------------- my_queue | 2023-10-28 14:13:17.092576-05 | f | f my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f my_unlogged | 2023-10-28 20:02:30.976109-05 | f | t

metrics

获取特定队列的指标数据。

1
2
3
4
5
6
7
8
9
pgmq.metrics(queue_name: text)returns table( queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone)

参数说明:

参数名类型描述
queue_nametext目标队列的名称

返回值:

属性类型描述
queue_nametext队列名称
queue_lengthbigint当前队列中的消息数量
newest_msg_age_sec`integernull`队列中最新的消息存在时长(秒)
oldest_msg_age_sec`integernull`队列中最旧的消息存在时长(秒)
total_messagesbigint该队列历史处理过的消息总数
scrape_timetimestamp with time zone当前时间戳

示例:

1
2
3
4
select * from pgmq.metrics('my_queue'); queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05

metrics_all

获取所有现有队列的指标。

1
2
3
4
5
6
7
8
9
pgmq.metrics_all()RETURNS TABLE( queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone)

返回值:

属性类型描述
queue_nametext队列名称
queue_lengthbigint当前队列中的消息数量
newest_msg_age_sec`integernull`队列中最新的消息存在时间(秒)
oldest_msg_age_sec`integernull`队列中最旧的消息存在时间(秒)
total_messagesbigint该队列有史以来处理过的消息总数
scrape_timetimestamp with time zone当前时间戳
1
2
3
4
5
6
select * from pgmq.metrics_all(); queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time----------------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05 my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05 my_unlogged | 1 | 3 | 3 | 1 | 2023-10-28 20:25:07.016413-05

类型

message_record

队列中消息的完整表示。

属性名称类型描述
msg_idbigint消息的唯一ID
read_ctbigint消息被读取的次数。每次调用read()时递增。
enqueued_attimestamp with time zone消息被插入队列的时间
vttimestamp with time zone消息可供消费者读取的时间戳
messagejsonb消息的有效载荷

示例:

1
2
3
msg_id | read_ct | enqueued_at | vt | message--------+---------+-------------------------------+-------------------------------+-------------------- 1 | 1 | 2023-10-28 19:06:19.941509-05 | 2023-10-28 19:06:27.419392-05 | {"hello": "world"}

相关资源