AI 与向量

自动嵌入


向量嵌入在Postgres中实现了强大的语义搜索功能,但传统上,将它们与您的内容一起管理很复杂。本指南展示了如何使用Supabase的边缘函数pgmqpg_netpg_cron实现嵌入生成和更新的自动化。

理解挑战

在使用pgvector实现语义搜索时,开发人员通常需要:

  1. 通过外部API(如OpenAI)生成嵌入。
  2. 将这些嵌入与内容一起存储。
  3. 当内容发生变化时,保持嵌入同步。
  4. 在嵌入生成过程中处理失败和重试。

虽然Postgres的全文搜索可以通过对to_tsvector的同步调用和触发器在内部处理这些问题,但语义搜索需要向OpenAI等供应商发起异步API调用以生成向量嵌入。本指南展示了如何使用触发器、队列和Supabase边缘函数来弥合这一差距。

理解架构

我们将利用以下Postgres和Supabase的功能来创建自动化嵌入系统:

  1. pgvector:存储和查询向量嵌入。
  2. pgmq:对嵌入生成请求进行排队,以便处理和重试。
  3. pg_net:直接从Postgres处理对边缘函数的异步HTTP请求。
  4. pg_cron:自动处理和重试嵌入生成。
  5. 触发器:检测内容变化并将嵌入生成请求加入队列。
  6. 边缘函数:通过类似OpenAI的API(可定制)生成嵌入。

我们将把系统设计为:

  1. 具有通用性,以便可用于任何表和内容。这使您能够在多个位置配置嵌入,每个位置都能够定制用于嵌入生成的输入。所有这些都将使用相同的队列基础架构和边缘函数来生成嵌入。
  2. 优雅地处理故障,通过重试失败的任务并提供有关每个任务状态的详细信息。

实现

我们将首先设置对嵌入生成请求进行排队和处理所需的基础架构。然后,我们将创建一个带有触发器的示例表,以便在插入或更新内容时将这些嵌入请求加入队列。

步骤1:启用扩展

首先,我们来启用所需的扩展:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- 用于向量操作CREATE EXTENSION IF NOT EXISTS vectorWITH SCHEMA extensions;-- 用于排队和处理作业-- (pgmq 将创建自己的模式)CREATE EXTENSION IF NOT EXISTS pgmq;-- 用于异步 HTTP 请求CREATE EXTENSION IF NOT EXISTS pg_netWITH SCHEMA extensions;-- 用于定时处理和重试-- (pg_cron 将创建自己的模式)CREATE EXTENSION IF NOT EXISTS pg_cron;-- 用于在更新时清除嵌入CREATE EXTENSION IF NOT EXISTS hstoreWITH SCHEMA extensions;

尽管SQL代码是CREATE EXTENSION,但这等同于“启用扩展”。 要禁用扩展,请调用DROP EXTENSION

步骤2:创建实用函数

在设置嵌入逻辑之前,我们需要创建一些实用函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
-- 实用函数的模式create schema util;-- 获取Supabase项目URL的实用函数(边缘函数所需)create function util.project_url()returns textlanguage plpgsqlsecurity defineras $$declare secret_value text;begin -- 从Vault中检索项目URL select decrypted_secret into secret_value from vault.decrypted_secrets where name = 'project_url'; return secret_value;end;$$;-- 调用任何边缘函数的通用函数create or replace function util.invoke_edge_function( name text, body jsonb, timeout_milliseconds int = 5 * 60 * 1000 -- 默认5分钟超时)returns voidlanguage plpgsqlas $$declare headers_raw text; auth_header text;begin -- 如果我们处于PostgREST会话中,重用请求头进行授权 headers_raw := current_setting('request.headers', true); -- 仅当存在头信息时才尝试解析 auth_header := case when headers_raw is not null then (headers_raw::json->>'authorization') else null end; -- 对边缘函数执行异步HTTP请求 perform net.http_post( url => util.project_url() || '/functions/v1/' || name, headers => jsonb_build_object( 'Content-Type', 'application/json', 'Authorization', auth_header ), body => body, timeout_milliseconds => timeout_milliseconds );end;$$;-- 更新时清除列的通用触发器函数create or replace function util.clear_column()returns triggerlanguage plpgsql as $$declare clear_column text := TG_ARGV[0];begin NEW := NEW #= hstore(clear_column, NULL); return NEW;end;$$;

在此我们创建:

  • 一个模式 util 用于存储实用函数。
  • 一个从 Vault 检索Supabase项目URL的函数。接下来我们将添加此密钥。
  • 一个使用给定名称和请求体调用任何边缘函数的通用函数。
  • 一个更新时清除列的通用触发器函数。此函数接受列名作为参数,并在 NEW 记录中将其设置为 NULL。我们稍后将解释如何使用此函数。

每个项目都有一个唯一的API URL,这是调用边缘函数所必需的。让我们根据您的环境继续将项目URL密钥添加到Vault中。

使用本地Supabase堆栈时,将以下内容添加到 supabase/seed.sql 文件中:

1
2
select vault.create_secret('http://api.supabase.internal:8000', 'project_url');

部署到云平台时,打开 SQL编辑器 并运行以下内容,将 <project-url> 替换为您的 项目API URL

1
2
select vault.create_secret('<project-url>', 'project_url');

步骤3:创建队列和触发器

我们的目标是每当表中的内容被插入或更新时,自动生成嵌入向量。我们可以使用触发器和队列来实现这一点。我们的方法是,每当表中有记录插入或更新时,自动将嵌入任务排入队列,然后使用定时任务异步处理这些任务。如果某个任务失败,它将留在队列中,并在下一个预定任务中重试。

首先,我们创建一个用于处理嵌入请求的 pgmq 队列:

1
2
-- 用于处理嵌入任务的队列select pgmq.create('embedding_jobs');

接下来,我们创建一个触发器函数,用于将嵌入任务排入队列。我们将使用这个函数来处理插入和更新事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- 用于将嵌入任务排入队列的通用触发器函数create or replace function util.queue_embeddings()returns triggerlanguage plpgsqlas $$declare content_function text = TG_ARGV[0]; embedding_column text = TG_ARGV[1];begin perform pgmq.send( queue_name => 'embedding_jobs', msg => jsonb_build_object( 'id', NEW.id, 'schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'contentFunction', content_function, 'embeddingColumn', embedding_column ) ); return NEW;end;$$;

我们的 util.queue_embeddings 触发器函数是通用的,可用于任何表和内容函数。它接受两个参数:

  1. content_function:一个函数的名称,该函数返回要嵌入的文本内容。该函数应接受单行作为输入并返回文本(请参阅 embedding_input 示例)。

这使您可以自定义传递给嵌入模型的文本输入 - 例如,您可以将 titlecontent 等多个列连接在一起,并将结果用作输入。

  1. embedding_column:存储嵌入向量的目标列的名称。

请注意,util.queue_embeddings 触发器函数需要 for each row 子句才能正常工作。有关如何将此触发器函数与您的表一起使用的示例,请参阅 用法

接下来,我们将创建一个函数来处理嵌入任务。此函数将从队列中读取任务,将它们分组为批次,并调用边缘函数来生成嵌入向量。我们将使用 pg_cron 安排此函数每10秒运行一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
-- 用于处理队列中嵌入任务的函数create or replace function util.process_embeddings( batch_size int = 10, max_requests int = 10, timeout_milliseconds int = 5 * 60 * 1000 -- 默认5分钟超时)returns voidlanguage plpgsqlas $$declare job_batches jsonb[]; batch jsonb;begin with -- 首先获取任务并分配批次编号 numbered_jobs as ( select message || jsonb_build_object('jobId', msg_id) as job_info, (row_number() over (order by 1) - 1) / batch_size as batch_num from pgmq.read( queue_name => 'embedding_jobs', vt => timeout_milliseconds / 1000, qty => max_requests * batch_size ) ), -- 然后将任务分组为批次 batched_jobs as ( select jsonb_agg(job_info) as batch_array, batch_num from numbered_jobs group by batch_num ) -- 最后将所有批次聚合为数组 select array_agg(batch_array) from batched_jobs into job_batches; -- 为每个批次调用嵌入边缘函数 foreach batch in array job_batches loop perform util.invoke_edge_function( name => 'embed', body => batch, timeout_milliseconds => timeout_milliseconds ); end loop;end;$$;-- 安排嵌入处理select cron.schedule( 'process-embeddings', '10 seconds', $$ select util.process_embeddings(); $$ );

让我们讨论一下关于这种方法的一些常见问题:

为什么不在单个边缘函数请求中生成所有嵌入向量?

虽然这是可行的,但可能会导致处理时间过长并有可能出现超时。批量处理使我们能够并发处理多个嵌入向量,并更有效地处理失败情况。

为什么不每行发起一个请求?

这种方法可能会导致 API 速率限制和性能问题。批量处理在效率和可靠性之间提供了一种平衡。

为什么要将请求排队而不是立即处理?

排队使我们能够优雅地处理失败、重试请求并更有效地管理并发。具体来说,我们使用 pgmq 的可见性超时来确保失败的请求会被重试。

可见性超时是如何工作的?

每次我们从队列中读取一条消息时,都会设置一个可见性超时,这会告诉 pgmq 在特定时间段内对其他读取者隐藏该消息。如果边缘函数在此时间段内未能处理该消息,该消息将再次变为可见,并将由下一个计划任务重试。

我们如何处理重试?

我们使用 pg_cron 来安排一个任务,从队列中读取消息并进行处理。如果边缘函数未能处理一条消息,在超时后该消息将再次变为可见,并可由下一个计划任务重试。

10 秒的处理间隔合适吗?

这个间隔是一个不错的起点,但您可能需要根据工作负载以及生成嵌入向量所需的时间进行调整。您可以调整 batch_sizemax_requeststimeout_milliseconds 参数来优化性能。

步骤4:创建边缘函数

最后,我们将创建用于生成嵌入向量的边缘函数。在本示例中,我们将使用OpenAI的API,但你可以将其替换为任何其他嵌入向量生成服务。

使用Supabase CLI创建一个新的边缘函数:

1
supabase functions new embed

这将创建一个新目录 supabase/functions/embed 以及一个 index.ts 文件。将此文件的内容替换为以下内容:

supabase/functions/embed/index.ts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// 为内置的Supabase运行时API设置类型定义import 'jsr:@supabase/functions-js/edge-runtime.d.ts'// 我们将使用OpenAI API生成嵌入向量import OpenAI from 'jsr:@openai/openai'import { z } from 'npm:zod'// 我们将直接连接Postgres以更新文档import postgres from 'https://deno.land/x/postgresjs@v3.4.5/mod.js'// 初始化OpenAI客户端const openai = new OpenAI({ // 我们需要手动设置 `OPENAI_API_KEY` 环境变量 apiKey: Deno.env.get('OPENAI_API_KEY'),})// 初始化Postgres客户端const sql = postgres( // `SUPABASE_DB_URL` 是一个内置的环境变量 Deno.env.get('SUPABASE_DB_URL')!)const jobSchema = z.object({ jobId: z.number(), id: z.number(), schema: z.string(), table: z.string(), contentFunction: z.string(), embeddingColumn: z.string(),})const failedJobSchema = jobSchema.extend({ error: z.string(),})type Job = z.infer<typeof jobSchema>type FailedJob = z.infer<typeof failedJobSchema>type Row = { id: string content: unknown}const QUEUE_NAME = 'embedding_jobs'// 监听HTTP请求Deno.serve(async (req) => { if (req.method !== 'POST') { return new Response('expected POST request', { status: 405 }) } if (req.headers.get('content-type')!== 'application/json') { return new Response('expected json body', { status: 400 }) } // 使用Zod解析并验证请求体 const parseResult = z.array(jobSchema).safeParse(await req.json()) if (parseResult.error) { return new Response(`invalid request body: ${parseResult.error.message}`, { status: 400, }) } const pendingJobs = parseResult.data // 跟踪成功完成的作业 const completedJobs: Job[] = [] // 跟踪因错误而失败的作业 const failedJobs: FailedJob[] = [] async function processJobs() { let currentJob: Job | undefined while ((currentJob = pendingJobs.shift())!== undefined) { try { await processJob(currentJob) completedJobs.push(currentJob) } catch (error) { failedJobs.push({ ...currentJob, error: error instanceof Error? error.message : JSON.stringify(error), }) } } } try { // 在监听工作进程终止的同时处理作业 await Promise.race([processJobs(), catchUnload()]) } catch (error) { // 如果工作进程正在终止(例如达到时间限制), // 将未处理的作业添加到失败列表中,并附上终止原因 failedJobs.push( ...pendingJobs.map((job) => ({ ...job, error: error instanceof Error? error.message : JSON.stringify(error), })) ) } // 记录已完成和失败的作业,以便进行追溯 console.log('finished processing jobs:', { completedJobs: completedJobs.length, failedJobs: failedJobs.length, }) return new Response( JSON.stringify({ completedJobs, failedJobs, }), { // 200 OK响应 status: 200, // 用于报告作业状态的自定义标头 headers: { 'content-type': 'application/json', 'x-completed-jobs': completedJobs.length.toString(), 'x-failed-jobs': failedJobs.length.toString(), }, } )})/** * 为给定文本生成嵌入向量。 */async function generateEmbedding(text: string) { const response = await openai.embeddings.create({ model: 'text-embedding-3-small', input: text, }) const [data] = response.data if (!data) { throw new Error('failed to generate embedding') } return data.embedding}/** * 处理嵌入作业。 */async function processJob(job: Job) { const { jobId, id, schema, table, contentFunction, embeddingColumn } = job // 获取模式/表/行组合的内容 const [row]: [Row] = await sql` select id, ${sql(contentFunction)}(t) as content from ${sql(schema)}.${sql(table)} t where id = ${id} ` if (!row) { throw new Error(`row not found: ${schema}.${table}/${id}`) } if (typeof row.content!== 'string') { throw new Error(`invalid content - expected string: ${schema}.${table}/${id}`) } const embedding = await generateEmbedding(row.content) await sql` update ${sql(schema)}.${sql(table)} set ${sql(embeddingColumn)} = ${JSON.stringify(embedding)} where id = ${id} ` await sql` select pgmq.delete(${QUEUE_NAME}, ${jobId}::bigint) `}/** * 返回一个Promise,如果工作进程正在终止,则该Promise会被拒绝。 */function catchUnload() { return new Promise((reject) => { addEventListener('beforeunload', (ev: any) => { reject(new Error(ev.detail?.reason)) }) })}

该边缘函数监听来自 pg_net 的传入HTTP请求,并处理每个嵌入作业。它是一个通用的工作进程,可以处理任何表和列的嵌入作业。它使用OpenAI的API生成嵌入向量,并更新数据库中的相应行。处理完成后,它还会从队列中删除该作业。

该函数旨在独立处理多个作业。如果一个作业失败,不会影响其他作业的处理。该函数返回一个 200 OK 响应,其中包含已完成和失败作业的列表。我们可以使用此信息诊断失败的作业。有关更多详细信息,请参阅故障排除

你需要设置 OPENAI_API_KEY 环境变量,以便向OpenAI进行身份验证。在本地运行边缘函数时,可以将其添加到 .env 文件中:

.env:

1
OPENAI_API_KEY=your-api-key

当你准备部署边缘函数时,可以使用Supabase CLI设置环境变量:

1
supabase secrets set --env-file .env

或者

1
supabase secrets set OPENAI_API_KEY=<your-api-key>

或者,你可以用自己的嵌入向量生成逻辑替换 generateEmbedding 函数。

有关如何部署边缘函数的更多信息,请参阅部署到生产环境

用法

既然基础设施已经就绪,让我们通过一个示例,了解如何使用该系统为文档表自动生成嵌入向量。你可以将此方法应用于多个表,并根据需要为每次嵌入向量生成自定义输入。

1. 创建用于存储带嵌入向量文档的表

我们将创建一个新的 documents 表,用于存储内容和嵌入向量:

1
2
3
4
5
6
7
8
9
10
11
-- 用于存储带嵌入向量文档的表create table documents ( id integer primary key generated always as identity, title text not null, content text not null, embedding halfvec(1536), created_at timestamp with time zone default now());-- 用于对文档嵌入向量进行向量搜索的索引create index on documents using hnsw (embedding halfvec_cosine_ops);

我们的 documents 表存储每个文档的标题和内容及其向量嵌入。我们使用 halfvec(1536) 列来存储嵌入向量。

halfvec 是一种 pgvector 数据类型,它以半精度(16 位)存储浮点值以节省空间。我们的边缘函数使用 OpenAI 的 text-embedding-3-small 模型,该模型生成 1536 维的嵌入向量,因此我们在此处使用相同的维度。请根据您的嵌入模型生成的维度数量进行调整。

我们在向量列上使用 HNSW 索引。请注意,我们选择 halfvec_cosine_ops 作为索引方法,这意味着我们未来的查询将需要使用余弦距离(<=>)来查找相似的嵌入向量。还要注意,HNSW 索引对 halfvec 向量最多支持 4000 维,因此在选择嵌入模型时要牢记这一点。如果您的模型生成的嵌入向量维度超过 4000 维,则需要在对其进行索引之前降低维度。有关缩短维度的潜在解决方案,请参阅 Matryoshka 嵌入向量

另外请注意,该表必须有一个名为 id 的主键列,这样我们的触发器才能与 util.queue_embeddings 函数正常配合工作,并且我们的边缘函数才能更新正确的行。

2. 创建触发器以将嵌入任务加入队列

现在我们将设置触发器,以便在插入或更新内容时将嵌入任务加入队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-- 自定义嵌入生成的输入-- 例如,使用 Markdown 标题连接标题和内容create or replace function embedding_input(doc documents)returns textlanguage plpgsqlimmutableas $$begin return '# ' || doc.title || E'\n\n' || doc.content;end;$$;-- 插入事件的触发器create trigger embed_documents_on_insert after insert on documents for each row execute function util.queue_embeddings('embedding_input', 'embedding');-- 更新事件的触发器create trigger embed_documents_on_update after update of title, content -- 必须与 embedding_input() 中的列匹配 on documents for each row execute function util.queue_embeddings('embedding_input', 'embedding');

我们创建了两个触发器:

  1. embed_documents_on_insert:每当有新行插入到 documents 表时,将嵌入任务加入队列。

  2. embed_documents_on_update:每当 documents 表中的 titlecontent 列更新时,将嵌入任务加入队列。

这两个触发器都使用相同的 util.queue_embeddings 函数,该函数会将嵌入任务加入队列以便处理。它们接受两个参数:

  1. embedding_input:生成嵌入生成输入的函数名称。此函数允许您自定义传递给嵌入模型的文本输入(例如,连接标题和内容)。该函数应接受单行作为输入并返回文本。

  2. embedding:将存储嵌入的目标列的名称。

请注意,更新触发器仅在 titlecontent 列更新时触发。这是为了避免在其他列更新时对嵌入列进行不必要的更新。请确保这些列与 embedding_input 函数中使用的列相匹配。

(可选)更新时清除嵌入向量

请注意,我们的触发器会在内容更新时将新的嵌入向量任务加入队列,但不会清除任何现有的嵌入向量。这意味着在生成并更新新的嵌入向量之前,现有嵌入向量可能会暂时与内容不同步。

如果拥有“准确”的嵌入向量比拥有“任何”嵌入向量更为重要,那么您可以添加另一个触发器,在生成新的嵌入向量之前清除现有的嵌入向量:

1
2
3
4
5
6
-- 更新时清除嵌入向量列的触发器create trigger clear_document_embedding_on_update before update of title, content -- 必须与 embedding_input() 中的列匹配 on documents for each row execute function util.clear_column('embedding');

util.clear_column 是我们之前创建的一个通用触发器函数,可用于清除表中的任何列。

  • 它接受列名作为参数。此列必须可为空。
  • 它需要一个带有 for each row 子句的 before 触发器。
  • 它需要我们之前创建的 hstore 扩展。

此示例会在 titlecontent 列更新时清除 embedding 列(请注意 of title, content 子句)。这确保了嵌入向量始终与标题和内容保持同步,但在生成新的嵌入向量之前,搜索结果中会出现暂时的空白。

我们特意使用 before 触发器,因为它允许我们在将记录写入磁盘之前对其进行修改,避免了使用 after 触发器时所需的额外 update 语句。

3. 插入和更新文档

我们插入一个新文档并更新其内容,来看看嵌入向量生成的实际运行情况:

1
2
3
4
5
6
7
8
9
-- 插入一个新文档insert into documents (title, content)values ('理解向量数据库', '向量数据库是专门用于……');-- 立即检查嵌入向量列select id, embeddingfrom documentswhere title = '理解向量数据库';

你会注意到,插入文档后,embedding 列最初为 null。这是因为嵌入向量的生成是异步的,将在下一个计划任务中由边缘函数处理。

等待最多 10 秒,让下一个任务运行,然后再次检查 embedding 列:

1
2
3
select id, embeddingfrom documentswhere title = '理解向量数据库';

你应该能看到为该文档生成的嵌入向量。

接下来,我们更新文档的内容:

1
2
3
4
5
6
7
8
9
-- 更新文档的内容update documentsset content = '向量数据库允许你查询……'where title = '理解向量数据库';-- 立即检查嵌入向量列select id, embeddingfrom documentswhere title = '理解向量数据库';

你会注意到,更新内容后,embedding 列被重置为 null。这是因为我们添加了一个触发器,每当内容更新时,就会清除现有的嵌入向量。嵌入向量将在下一个计划任务中由边缘函数重新生成。

等待最多 10 秒,让下一个任务运行,然后再次检查 embedding 列:

1
2
3
select id, embeddingfrom documentswhere title = '理解向量数据库';

你应该能看到该文档更新后的嵌入向量。

最后,我们更新文档的标题:

1
2
3
4
-- 更新文档的标题update documentsset title = '使用 Supabase 理解向量数据库'where title = '理解向量数据库';

你会注意到,更新标题后,embedding 列再次被重置为 null。这是因为我们添加的清除现有嵌入向量的触发器,在 contenttitle 列更新时都会触发。嵌入向量将在下一个计划任务中由边缘函数重新生成。

等待最多 10 秒,让下一个任务运行,然后再次检查 embedding 列:

1
2
3
select id, embeddingfrom documentswhere title = '使用 Supabase 理解向量数据库';

你应该能看到该文档更新后的嵌入向量。

故障排除

embed 边缘函数会处理一批嵌入作业,并返回 200 OK 响应,响应体中包含已完成和失败作业的列表。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{ "completedJobs": [ { "jobId": "1", "id": "1", "schema": "public", "table": "documents", "contentFunction": "embedding_input", "embeddingColumn": "embedding" } ], "failedJobs": [ { "jobId": "2", "id": "2", "schema": "public", "table": "documents", "contentFunction": "embedding_input", "embeddingColumn": "embedding", "error": "error connecting to openai api" } ]}

它还会在响应头中返回已完成和失败作业的数量。例如:

1
2
x-completed-jobs: 1x-failed-jobs: 1

你还可以使用 x-deno-execution-id 响应头,在 仪表板 日志中追踪边缘函数的执行情况。

每个失败的作业都包含一个 error 字段,用于描述失败原因。作业失败的原因可能包括:

  • 通过外部 API 生成嵌入时出错
  • 连接数据库时出错
  • 边缘函数被终止(例如由于挂钟时间限制)
  • 处理过程中抛出的任何其他错误

pg_net 将 HTTP 响应存储在 net._http_response 表中,可以查询该表来诊断嵌入生成过程中的问题。

1
2
3
4
5
6
select *from net._http_responsewhere (headers->>'x-failed-jobs')::int > 0;

结论

在Postgres中实现嵌入生成和更新自动化,使您能够构建强大的语义搜索功能,而无需手动管理嵌入的复杂性。

通过将Postgres的触发器、队列和其他扩展等功能与Supabase边缘函数相结合,我们可以创建一个强大的系统,异步处理嵌入生成并自动重试失败的任务。

该系统可以定制为与任何内容和嵌入生成服务配合使用,为Postgres中的语义搜索提供灵活且可扩展的解决方案。

另请参阅