自动嵌入
向量嵌入在Postgres中实现了强大的语义搜索功能,但传统上,将它们与您的内容一起管理很复杂。本指南展示了如何使用Supabase的边缘函数、pgmq、pg_net和pg_cron实现嵌入生成和更新的自动化。
理解挑战
在使用pgvector实现语义搜索时,开发人员通常需要:
- 通过外部API(如OpenAI)生成嵌入。
- 将这些嵌入与内容一起存储。
- 当内容发生变化时,保持嵌入同步。
- 在嵌入生成过程中处理失败和重试。
虽然Postgres的全文搜索可以通过对to_tsvector
的同步调用和触发器在内部处理这些问题,但语义搜索需要向OpenAI等供应商发起异步API调用以生成向量嵌入。本指南展示了如何使用触发器、队列和Supabase边缘函数来弥合这一差距。
理解架构
我们将利用以下Postgres和Supabase的功能来创建自动化嵌入系统:
- pgvector:存储和查询向量嵌入。
- pgmq:对嵌入生成请求进行排队,以便处理和重试。
- pg_net:直接从Postgres处理对边缘函数的异步HTTP请求。
- pg_cron:自动处理和重试嵌入生成。
- 触发器:检测内容变化并将嵌入生成请求加入队列。
- 边缘函数:通过类似OpenAI的API(可定制)生成嵌入。
我们将把系统设计为:
- 具有通用性,以便可用于任何表和内容。这使您能够在多个位置配置嵌入,每个位置都能够定制用于嵌入生成的输入。所有这些都将使用相同的队列基础架构和边缘函数来生成嵌入。
- 优雅地处理故障,通过重试失败的任务并提供有关每个任务状态的详细信息。
实现
我们将首先设置对嵌入生成请求进行排队和处理所需的基础架构。然后,我们将创建一个带有触发器的示例表,以便在插入或更新内容时将这些嵌入请求加入队列。
步骤1:启用扩展
首先,我们来启用所需的扩展:
12345678910111213141516171819202122-- 用于向量操作CREATE EXTENSION IF NOT EXISTS vectorWITH SCHEMA extensions;-- 用于排队和处理作业-- (pgmq 将创建自己的模式)CREATE EXTENSION IF NOT EXISTS pgmq;-- 用于异步 HTTP 请求CREATE EXTENSION IF NOT EXISTS pg_netWITH SCHEMA extensions;-- 用于定时处理和重试-- (pg_cron 将创建自己的模式)CREATE EXTENSION IF NOT EXISTS pg_cron;-- 用于在更新时清除嵌入CREATE EXTENSION IF NOT EXISTS hstoreWITH SCHEMA extensions;
尽管SQL代码是CREATE EXTENSION
,但这等同于“启用扩展”。
要禁用扩展,请调用DROP EXTENSION
。
步骤2:创建实用函数
在设置嵌入逻辑之前,我们需要创建一些实用函数:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566-- 实用函数的模式create schema util;-- 获取Supabase项目URL的实用函数(边缘函数所需)create function util.project_url()returns textlanguage plpgsqlsecurity defineras $$declare secret_value text;begin -- 从Vault中检索项目URL select decrypted_secret into secret_value from vault.decrypted_secrets where name = 'project_url'; return secret_value;end;$$;-- 调用任何边缘函数的通用函数create or replace function util.invoke_edge_function( name text, body jsonb, timeout_milliseconds int = 5 * 60 * 1000 -- 默认5分钟超时)returns voidlanguage plpgsqlas $$declare headers_raw text; auth_header text;begin -- 如果我们处于PostgREST会话中,重用请求头进行授权 headers_raw := current_setting('request.headers', true); -- 仅当存在头信息时才尝试解析 auth_header := case when headers_raw is not null then (headers_raw::json->>'authorization') else null end; -- 对边缘函数执行异步HTTP请求 perform net.http_post( url => util.project_url() || '/functions/v1/' || name, headers => jsonb_build_object( 'Content-Type', 'application/json', 'Authorization', auth_header ), body => body, timeout_milliseconds => timeout_milliseconds );end;$$;-- 更新时清除列的通用触发器函数create or replace function util.clear_column()returns triggerlanguage plpgsql as $$declare clear_column text := TG_ARGV[0];begin NEW := NEW #= hstore(clear_column, NULL); return NEW;end;$$;
在此我们创建:
- 一个模式
util
用于存储实用函数。 - 一个从 Vault 检索Supabase项目URL的函数。接下来我们将添加此密钥。
- 一个使用给定名称和请求体调用任何边缘函数的通用函数。
- 一个更新时清除列的通用触发器函数。此函数接受列名作为参数,并在
NEW
记录中将其设置为NULL
。我们稍后将解释如何使用此函数。
每个项目都有一个唯一的API URL,这是调用边缘函数所必需的。让我们根据您的环境继续将项目URL密钥添加到Vault中。
使用本地Supabase堆栈时,将以下内容添加到 supabase/seed.sql
文件中:
12select vault.create_secret('http://api.supabase.internal:8000', 'project_url');
部署到云平台时,打开 SQL编辑器 并运行以下内容,将 <project-url>
替换为您的 项目API URL:
12select vault.create_secret('<project-url>', 'project_url');
步骤3:创建队列和触发器
我们的目标是每当表中的内容被插入或更新时,自动生成嵌入向量。我们可以使用触发器和队列来实现这一点。我们的方法是,每当表中有记录插入或更新时,自动将嵌入任务排入队列,然后使用定时任务异步处理这些任务。如果某个任务失败,它将留在队列中,并在下一个预定任务中重试。
首先,我们创建一个用于处理嵌入请求的 pgmq
队列:
12-- 用于处理嵌入任务的队列select pgmq.create('embedding_jobs');
接下来,我们创建一个触发器函数,用于将嵌入任务排入队列。我们将使用这个函数来处理插入和更新事件:
12345678910111213141516171819202122-- 用于将嵌入任务排入队列的通用触发器函数create or replace function util.queue_embeddings()returns triggerlanguage plpgsqlas $$declare content_function text = TG_ARGV[0]; embedding_column text = TG_ARGV[1];begin perform pgmq.send( queue_name => 'embedding_jobs', msg => jsonb_build_object( 'id', NEW.id, 'schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'contentFunction', content_function, 'embeddingColumn', embedding_column ) ); return NEW;end;$$;
我们的 util.queue_embeddings
触发器函数是通用的,可用于任何表和内容函数。它接受两个参数:
content_function
:一个函数的名称,该函数返回要嵌入的文本内容。该函数应接受单行作为输入并返回文本(请参阅embedding_input
示例)。
这使您可以自定义传递给嵌入模型的文本输入 - 例如,您可以将 title
和 content
等多个列连接在一起,并将结果用作输入。
embedding_column
:存储嵌入向量的目标列的名称。
请注意,util.queue_embeddings
触发器函数需要 for each row
子句才能正常工作。有关如何将此触发器函数与您的表一起使用的示例,请参阅 用法。
接下来,我们将创建一个函数来处理嵌入任务。此函数将从队列中读取任务,将它们分组为批次,并调用边缘函数来生成嵌入向量。我们将使用 pg_cron
安排此函数每10秒运行一次。
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758-- 用于处理队列中嵌入任务的函数create or replace function util.process_embeddings( batch_size int = 10, max_requests int = 10, timeout_milliseconds int = 5 * 60 * 1000 -- 默认5分钟超时)returns voidlanguage plpgsqlas $$declare job_batches jsonb[]; batch jsonb;begin with -- 首先获取任务并分配批次编号 numbered_jobs as ( select message || jsonb_build_object('jobId', msg_id) as job_info, (row_number() over (order by 1) - 1) / batch_size as batch_num from pgmq.read( queue_name => 'embedding_jobs', vt => timeout_milliseconds / 1000, qty => max_requests * batch_size ) ), -- 然后将任务分组为批次 batched_jobs as ( select jsonb_agg(job_info) as batch_array, batch_num from numbered_jobs group by batch_num ) -- 最后将所有批次聚合为数组 select array_agg(batch_array) from batched_jobs into job_batches; -- 为每个批次调用嵌入边缘函数 foreach batch in array job_batches loop perform util.invoke_edge_function( name => 'embed', body => batch, timeout_milliseconds => timeout_milliseconds ); end loop;end;$$;-- 安排嵌入处理select cron.schedule( 'process-embeddings', '10 seconds', $$ select util.process_embeddings(); $$ );
让我们讨论一下关于这种方法的一些常见问题:
为什么不在单个边缘函数请求中生成所有嵌入向量?
虽然这是可行的,但可能会导致处理时间过长并有可能出现超时。批量处理使我们能够并发处理多个嵌入向量,并更有效地处理失败情况。
为什么不每行发起一个请求?
这种方法可能会导致 API 速率限制和性能问题。批量处理在效率和可靠性之间提供了一种平衡。
为什么要将请求排队而不是立即处理?
排队使我们能够优雅地处理失败、重试请求并更有效地管理并发。具体来说,我们使用 pgmq
的可见性超时来确保失败的请求会被重试。
可见性超时是如何工作的?
每次我们从队列中读取一条消息时,都会设置一个可见性超时,这会告诉 pgmq
在特定时间段内对其他读取者隐藏该消息。如果边缘函数在此时间段内未能处理该消息,该消息将再次变为可见,并将由下一个计划任务重试。
我们如何处理重试?
我们使用 pg_cron
来安排一个任务,从队列中读取消息并进行处理。如果边缘函数未能处理一条消息,在超时后该消息将再次变为可见,并可由下一个计划任务重试。
10 秒的处理间隔合适吗?
这个间隔是一个不错的起点,但您可能需要根据工作负载以及生成嵌入向量所需的时间进行调整。您可以调整 batch_size
、max_requests
和 timeout_milliseconds
参数来优化性能。
步骤4:创建边缘函数
最后,我们将创建用于生成嵌入向量的边缘函数。在本示例中,我们将使用OpenAI的API,但你可以将其替换为任何其他嵌入向量生成服务。
使用Supabase CLI创建一个新的边缘函数:
1supabase functions new embed
这将创建一个新目录 supabase/functions/embed
以及一个 index.ts
文件。将此文件的内容替换为以下内容:
supabase/functions/embed/index.ts:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196// 为内置的Supabase运行时API设置类型定义import 'jsr:@supabase/functions-js/edge-runtime.d.ts'// 我们将使用OpenAI API生成嵌入向量import OpenAI from 'jsr:@openai/openai'import { z } from 'npm:zod'// 我们将直接连接Postgres以更新文档import postgres from 'https://deno.land/x/postgresjs@v3.4.5/mod.js'// 初始化OpenAI客户端const openai = new OpenAI({ // 我们需要手动设置 `OPENAI_API_KEY` 环境变量 apiKey: Deno.env.get('OPENAI_API_KEY'),})// 初始化Postgres客户端const sql = postgres( // `SUPABASE_DB_URL` 是一个内置的环境变量 Deno.env.get('SUPABASE_DB_URL')!)const jobSchema = z.object({ jobId: z.number(), id: z.number(), schema: z.string(), table: z.string(), contentFunction: z.string(), embeddingColumn: z.string(),})const failedJobSchema = jobSchema.extend({ error: z.string(),})type Job = z.infer<typeof jobSchema>type FailedJob = z.infer<typeof failedJobSchema>type Row = { id: string content: unknown}const QUEUE_NAME = 'embedding_jobs'// 监听HTTP请求Deno.serve(async (req) => { if (req.method !== 'POST') { return new Response('expected POST request', { status: 405 }) } if (req.headers.get('content-type')!== 'application/json') { return new Response('expected json body', { status: 400 }) } // 使用Zod解析并验证请求体 const parseResult = z.array(jobSchema).safeParse(await req.json()) if (parseResult.error) { return new Response(`invalid request body: ${parseResult.error.message}`, { status: 400, }) } const pendingJobs = parseResult.data // 跟踪成功完成的作业 const completedJobs: Job[] = [] // 跟踪因错误而失败的作业 const failedJobs: FailedJob[] = [] async function processJobs() { let currentJob: Job | undefined while ((currentJob = pendingJobs.shift())!== undefined) { try { await processJob(currentJob) completedJobs.push(currentJob) } catch (error) { failedJobs.push({ ...currentJob, error: error instanceof Error? error.message : JSON.stringify(error), }) } } } try { // 在监听工作进程终止的同时处理作业 await Promise.race([processJobs(), catchUnload()]) } catch (error) { // 如果工作进程正在终止(例如达到时间限制), // 将未处理的作业添加到失败列表中,并附上终止原因 failedJobs.push( ...pendingJobs.map((job) => ({ ...job, error: error instanceof Error? error.message : JSON.stringify(error), })) ) } // 记录已完成和失败的作业,以便进行追溯 console.log('finished processing jobs:', { completedJobs: completedJobs.length, failedJobs: failedJobs.length, }) return new Response( JSON.stringify({ completedJobs, failedJobs, }), { // 200 OK响应 status: 200, // 用于报告作业状态的自定义标头 headers: { 'content-type': 'application/json', 'x-completed-jobs': completedJobs.length.toString(), 'x-failed-jobs': failedJobs.length.toString(), }, } )})/** * 为给定文本生成嵌入向量。 */async function generateEmbedding(text: string) { const response = await openai.embeddings.create({ model: 'text-embedding-3-small', input: text, }) const [data] = response.data if (!data) { throw new Error('failed to generate embedding') } return data.embedding}/** * 处理嵌入作业。 */async function processJob(job: Job) { const { jobId, id, schema, table, contentFunction, embeddingColumn } = job // 获取模式/表/行组合的内容 const [row]: [Row] = await sql` select id, ${sql(contentFunction)}(t) as content from ${sql(schema)}.${sql(table)} t where id = ${id} ` if (!row) { throw new Error(`row not found: ${schema}.${table}/${id}`) } if (typeof row.content!== 'string') { throw new Error(`invalid content - expected string: ${schema}.${table}/${id}`) } const embedding = await generateEmbedding(row.content) await sql` update ${sql(schema)}.${sql(table)} set ${sql(embeddingColumn)} = ${JSON.stringify(embedding)} where id = ${id} ` await sql` select pgmq.delete(${QUEUE_NAME}, ${jobId}::bigint) `}/** * 返回一个Promise,如果工作进程正在终止,则该Promise会被拒绝。 */function catchUnload() { return new Promise((reject) => { addEventListener('beforeunload', (ev: any) => { reject(new Error(ev.detail?.reason)) }) })}
该边缘函数监听来自 pg_net
的传入HTTP请求,并处理每个嵌入作业。它是一个通用的工作进程,可以处理任何表和列的嵌入作业。它使用OpenAI的API生成嵌入向量,并更新数据库中的相应行。处理完成后,它还会从队列中删除该作业。
该函数旨在独立处理多个作业。如果一个作业失败,不会影响其他作业的处理。该函数返回一个 200 OK
响应,其中包含已完成和失败作业的列表。我们可以使用此信息诊断失败的作业。有关更多详细信息,请参阅故障排除。
你需要设置 OPENAI_API_KEY
环境变量,以便向OpenAI进行身份验证。在本地运行边缘函数时,可以将其添加到 .env
文件中:
.env:
1OPENAI_API_KEY=your-api-key
当你准备部署边缘函数时,可以使用Supabase CLI设置环境变量:
1supabase secrets set --env-file .env
或者
1supabase secrets set OPENAI_API_KEY=<your-api-key>
或者,你可以用自己的嵌入向量生成逻辑替换 generateEmbedding
函数。
有关如何部署边缘函数的更多信息,请参阅部署到生产环境。
用法
既然基础设施已经就绪,让我们通过一个示例,了解如何使用该系统为文档表自动生成嵌入向量。你可以将此方法应用于多个表,并根据需要为每次嵌入向量生成自定义输入。
1. 创建用于存储带嵌入向量文档的表
我们将创建一个新的 documents
表,用于存储内容和嵌入向量:
1234567891011-- 用于存储带嵌入向量文档的表create table documents ( id integer primary key generated always as identity, title text not null, content text not null, embedding halfvec(1536), created_at timestamp with time zone default now());-- 用于对文档嵌入向量进行向量搜索的索引create index on documents using hnsw (embedding halfvec_cosine_ops);
我们的 documents
表存储每个文档的标题和内容及其向量嵌入。我们使用 halfvec(1536)
列来存储嵌入向量。
halfvec
是一种 pgvector
数据类型,它以半精度(16 位)存储浮点值以节省空间。我们的边缘函数使用 OpenAI 的 text-embedding-3-small
模型,该模型生成 1536 维的嵌入向量,因此我们在此处使用相同的维度。请根据您的嵌入模型生成的维度数量进行调整。
我们在向量列上使用 HNSW 索引。请注意,我们选择 halfvec_cosine_ops
作为索引方法,这意味着我们未来的查询将需要使用余弦距离(<=>
)来查找相似的嵌入向量。还要注意,HNSW 索引对 halfvec
向量最多支持 4000 维,因此在选择嵌入模型时要牢记这一点。如果您的模型生成的嵌入向量维度超过 4000 维,则需要在对其进行索引之前降低维度。有关缩短维度的潜在解决方案,请参阅 Matryoshka 嵌入向量。
另外请注意,该表必须有一个名为 id
的主键列,这样我们的触发器才能与 util.queue_embeddings
函数正常配合工作,并且我们的边缘函数才能更新正确的行。
2. 创建触发器以将嵌入任务加入队列
现在我们将设置触发器,以便在插入或更新内容时将嵌入任务加入队列:
12345678910111213141516171819202122232425-- 自定义嵌入生成的输入-- 例如,使用 Markdown 标题连接标题和内容create or replace function embedding_input(doc documents)returns textlanguage plpgsqlimmutableas $$begin return '# ' || doc.title || E'\n\n' || doc.content;end;$$;-- 插入事件的触发器create trigger embed_documents_on_insert after insert on documents for each row execute function util.queue_embeddings('embedding_input', 'embedding');-- 更新事件的触发器create trigger embed_documents_on_update after update of title, content -- 必须与 embedding_input() 中的列匹配 on documents for each row execute function util.queue_embeddings('embedding_input', 'embedding');
我们创建了两个触发器:
-
embed_documents_on_insert
:每当有新行插入到documents
表时,将嵌入任务加入队列。 -
embed_documents_on_update
:每当documents
表中的title
或content
列更新时,将嵌入任务加入队列。
这两个触发器都使用相同的 util.queue_embeddings
函数,该函数会将嵌入任务加入队列以便处理。它们接受两个参数:
-
embedding_input
:生成嵌入生成输入的函数名称。此函数允许您自定义传递给嵌入模型的文本输入(例如,连接标题和内容)。该函数应接受单行作为输入并返回文本。 -
embedding
:将存储嵌入的目标列的名称。
请注意,更新触发器仅在 title
或 content
列更新时触发。这是为了避免在其他列更新时对嵌入列进行不必要的更新。请确保这些列与 embedding_input
函数中使用的列相匹配。
(可选)更新时清除嵌入向量
请注意,我们的触发器会在内容更新时将新的嵌入向量任务加入队列,但不会清除任何现有的嵌入向量。这意味着在生成并更新新的嵌入向量之前,现有嵌入向量可能会暂时与内容不同步。
如果拥有“准确”的嵌入向量比拥有“任何”嵌入向量更为重要,那么您可以添加另一个触发器,在生成新的嵌入向量之前清除现有的嵌入向量:
123456-- 更新时清除嵌入向量列的触发器create trigger clear_document_embedding_on_update before update of title, content -- 必须与 embedding_input() 中的列匹配 on documents for each row execute function util.clear_column('embedding');
util.clear_column
是我们之前创建的一个通用触发器函数,可用于清除表中的任何列。
- 它接受列名作为参数。此列必须可为空。
- 它需要一个带有
for each row
子句的before
触发器。 - 它需要我们之前创建的
hstore
扩展。
此示例会在 title
或 content
列更新时清除 embedding
列(请注意 of title, content
子句)。这确保了嵌入向量始终与标题和内容保持同步,但在生成新的嵌入向量之前,搜索结果中会出现暂时的空白。
我们特意使用 before
触发器,因为它允许我们在将记录写入磁盘之前对其进行修改,避免了使用 after
触发器时所需的额外 update
语句。
3. 插入和更新文档
我们插入一个新文档并更新其内容,来看看嵌入向量生成的实际运行情况:
123456789-- 插入一个新文档insert into documents (title, content)values ('理解向量数据库', '向量数据库是专门用于……');-- 立即检查嵌入向量列select id, embeddingfrom documentswhere title = '理解向量数据库';
你会注意到,插入文档后,embedding
列最初为 null
。这是因为嵌入向量的生成是异步的,将在下一个计划任务中由边缘函数处理。
等待最多 10 秒,让下一个任务运行,然后再次检查 embedding
列:
123select id, embeddingfrom documentswhere title = '理解向量数据库';
你应该能看到为该文档生成的嵌入向量。
接下来,我们更新文档的内容:
123456789-- 更新文档的内容update documentsset content = '向量数据库允许你查询……'where title = '理解向量数据库';-- 立即检查嵌入向量列select id, embeddingfrom documentswhere title = '理解向量数据库';
你会注意到,更新内容后,embedding
列被重置为 null
。这是因为我们添加了一个触发器,每当内容更新时,就会清除现有的嵌入向量。嵌入向量将在下一个计划任务中由边缘函数重新生成。
等待最多 10 秒,让下一个任务运行,然后再次检查 embedding
列:
123select id, embeddingfrom documentswhere title = '理解向量数据库';
你应该能看到该文档更新后的嵌入向量。
最后,我们更新文档的标题:
1234-- 更新文档的标题update documentsset title = '使用 Supabase 理解向量数据库'where title = '理解向量数据库';
你会注意到,更新标题后,embedding
列再次被重置为 null
。这是因为我们添加的清除现有嵌入向量的触发器,在 content
或 title
列更新时都会触发。嵌入向量将在下一个计划任务中由边缘函数重新生成。
等待最多 10 秒,让下一个任务运行,然后再次检查 embedding
列:
123select id, embeddingfrom documentswhere title = '使用 Supabase 理解向量数据库';
你应该能看到该文档更新后的嵌入向量。
故障排除
embed
边缘函数会处理一批嵌入作业,并返回 200 OK
响应,响应体中包含已完成和失败作业的列表。例如:
1234567891011121314151617181920212223{ "completedJobs": [ { "jobId": "1", "id": "1", "schema": "public", "table": "documents", "contentFunction": "embedding_input", "embeddingColumn": "embedding" } ], "failedJobs": [ { "jobId": "2", "id": "2", "schema": "public", "table": "documents", "contentFunction": "embedding_input", "embeddingColumn": "embedding", "error": "error connecting to openai api" } ]}
它还会在响应头中返回已完成和失败作业的数量。例如:
12x-completed-jobs: 1x-failed-jobs: 1
你还可以使用 x-deno-execution-id
响应头,在 仪表板 日志中追踪边缘函数的执行情况。
每个失败的作业都包含一个 error
字段,用于描述失败原因。作业失败的原因可能包括:
- 通过外部 API 生成嵌入时出错
- 连接数据库时出错
- 边缘函数被终止(例如由于挂钟时间限制)
- 处理过程中抛出的任何其他错误
pg_net
将 HTTP 响应存储在 net._http_response
表中,可以查询该表来诊断嵌入生成过程中的问题。
123456select *from net._http_responsewhere (headers->>'x-failed-jobs')::int > 0;
结论
在Postgres中实现嵌入生成和更新自动化,使您能够构建强大的语义搜索功能,而无需手动管理嵌入的复杂性。
通过将Postgres的触发器、队列和其他扩展等功能与Supabase边缘函数相结合,我们可以创建一个强大的系统,异步处理嵌入生成并自动重试失败的任务。
该系统可以定制为与任何内容和嵌入生成服务配合使用,为Postgres中的语义搜索提供灵活且可扩展的解决方案。