边缘函数

使用 Kysely 实现类型安全 SQL


Supabase 边缘函数能够直接连接至您的 Postgres 数据库执行 SQL 查询。Kysely 是一个类型安全且支持自动补全的 TypeScript SQL 查询构建器。

将 Kysely 与 Deno Postgres 结合使用,可为您提供直接操作 Postgres 数据库的便捷开发体验。

代码实现

GitHub 上查看完整示例

Supabase 控制面板 获取数据库连接凭证,并存储在 .env 文件中:

1
2
3
4
5
DB_HOSTNAME=DB_PASSWORD=DB_SSL_CERT="-----BEGIN CERTIFICATE-----从项目仪表板获取您的证书-----END CERTIFICATE-----"

创建 DenoPostgresDriver.ts 文件,通过 deno-postgres 管理 Postgres 连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import { CompiledQuery, DatabaseConnection, Driver, PostgresCursorConstructor, QueryResult, TransactionSettings,} from 'https://esm.sh/kysely@0.23.4'import { freeze, isFunction } from 'https://esm.sh/kysely@0.23.4/dist/esm/util/object-utils.js'import { extendStackTrace } from 'https://esm.sh/kysely@0.23.4/dist/esm/util/stack-trace-utils.js'import { Pool, PoolClient } from 'https://deno.land/x/postgres@v0.17.0/mod.ts'export interface PostgresDialectConfig { pool: Pool | (() => Promise<Pool>) cursor?: PostgresCursorConstructor onCreateConnection?: (connection: DatabaseConnection) => Promise<void>}const PRIVATE_RELEASE_METHOD = Symbol()export class PostgresDriver implements Driver { readonly #config: PostgresDialectConfig readonly #connections = new WeakMap<PoolClient, DatabaseConnection>() #pool?: Pool constructor(config: PostgresDialectConfig) { this.#config = freeze({ ...config }) } async init(): Promise<void> { this.#pool = isFunction(this.#config.pool) ? await this.#config.pool() : this.#config.pool } async acquireConnection(): Promise<DatabaseConnection> { const client = await this.#pool!.connect() let connection = this.#connections.get(client) if (!connection) { connection = new PostgresConnection(client, { cursor: this.#config.cursor ?? null, }) this.#connections.set(client, connection) // 当创建新连接时,驱动程序必须负责调用 `onCreateConnection` // `pg` 模块没有提供连接创建的异步钩子,我们需要显式调用该方法 if (this.#config?.onCreateConnection) { await this.#config.onCreateConnection(connection) } } return connection } async beginTransaction( connection: DatabaseConnection, settings: TransactionSettings ): Promise<void> { if (settings.isolationLevel) { await connection.executeQuery( CompiledQuery.raw(`start transaction isolation level ${settings.isolationLevel}`) ) } else { await connection.executeQuery(CompiledQuery.raw('begin')) } } async commitTransaction(connection: DatabaseConnection): Promise<void> { await connection.executeQuery(CompiledQuery.raw('commit')) } async rollbackTransaction(connection: DatabaseConnection): Promise<void> { await connection.executeQuery(CompiledQuery.raw('rollback')) } async releaseConnection(connection: PostgresConnection): Promise<void> { connection[PRIVATE_RELEASE_METHOD]() } async destroy(): Promise<void> { if (this.#pool) { const pool = this.#pool this.#pool = undefined await pool.end() } }}interface PostgresConnectionOptions { cursor: PostgresCursorConstructor | null}class PostgresConnection implements DatabaseConnection { #client: PoolClient #options: PostgresConnectionOptions constructor(client: PoolClient, options: PostgresConnectionOptions) { this.#client = client this.#options = options } async executeQuery<O>(compiledQuery: CompiledQuery): Promise<QueryResult<O>> { try { const result = await this.#client.queryObject<O>(compiledQuery.sql, [ ...compiledQuery.parameters, ]) if ( result.command === 'INSERT' || result.command === 'UPDATE' || result.command === 'DELETE' ) { const numAffectedRows = BigInt(result.rowCount || 0) return { numUpdatedOrDeletedRows: numAffectedRows, numAffectedRows, rows: result.rows ?? [], } as any } return { rows: result.rows ?? [], } } catch (err) { throw extendStackTrace(err, new Error()) } } async *streamQuery<O>( _compiledQuery: CompiledQuery, chunkSize: number ): AsyncIterableIterator<QueryResult<O>> { if (!this.#options.cursor) { throw new Error( "Postgres 方言配置中缺少 'cursor' 参数,该参数是实现 Postgres 流式查询的必要条件" ) } if (!Number.isInteger(chunkSize) || chunkSize <= 0) { throw new Error('chunkSize 必须是正整数') } // 流式查询不可用 return null } [PRIVATE_RELEASE_METHOD](): void { this.#client.release() }}

创建 index.ts 文件来处理传入请求并执行查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import { serve } from 'https://deno.land/std@0.175.0/http/server.ts'import { Pool } from 'https://deno.land/x/postgres@v0.17.0/mod.ts'import { Kysely, Generated, PostgresAdapter, PostgresIntrospector, PostgresQueryCompiler,} from 'https://esm.sh/kysely@0.23.4'import { PostgresDriver } from './DenoPostgresDriver.ts'console.log(`"kysely-postgres" 函数已启动并运行!`)interface AnimalTable { id: Generated<bigint> animal: string created_at: Date}// 此接口的键是表名interface Database { animals: AnimalTable}// 创建一个带有一个连接的数据库连接池const pool = new Pool( { tls: { caCertificates: [Deno.env.get('DB_SSL_CERT')!] }, database: 'postgres', hostname: Deno.env.get('DB_HOSTNAME'), user: 'postgres', port: 5432, password: Deno.env.get('DB_PASSWORD'), }, 1)// 在应用启动时创建 Kysely 实例const db = new Kysely<Database>({ dialect: { createAdapter() { return new PostgresAdapter() }, createDriver() { return new PostgresDriver({ pool }) }, createIntrospector(db: Kysely<unknown>) { return new PostgresIntrospector(db) }, createQueryCompiler() { return new PostgresQueryCompiler() }, },})serve(async (_req) => { try { // 执行查询 const animals = await db.selectFrom('animals').select(['id', 'animal', 'created_at']).execute() // 完美,类型推断正确 \o/ console.log(animals[0].created_at.getFullYear()) // 将结果格式化为美观的 JSON const body = JSON.stringify( animals, (key, value) => (typeof value === 'bigint' ? value.toString() : value), 2 ) // 返回带有正确内容类型头的响应 return new Response(body, { status: 200, headers: { 'Content-Type': 'application/json; charset=utf-8', }, }) } catch (err) { console.error(err) return new Response(String(err?.message ?? err), { status: 500 }) }})