使用 Kysely 实现类型安全 SQL
Supabase 边缘函数能够直接连接至您的 Postgres 数据库执行 SQL 查询。Kysely 是一个类型安全且支持自动补全的 TypeScript SQL 查询构建器。
将 Kysely 与 Deno Postgres 结合使用,可为您提供直接操作 Postgres 数据库的便捷开发体验。
代码实现
在 GitHub 上查看完整示例
从 Supabase 控制面板 获取数据库连接凭证,并存储在 .env
文件中:
12345DB_HOSTNAME=DB_PASSWORD=DB_SSL_CERT="-----BEGIN CERTIFICATE-----从项目仪表板获取您的证书-----END CERTIFICATE-----"
创建 DenoPostgresDriver.ts
文件,通过 deno-postgres 管理 Postgres 连接:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150import { CompiledQuery, DatabaseConnection, Driver, PostgresCursorConstructor, QueryResult, TransactionSettings,} from 'https://esm.sh/kysely@0.23.4'import { freeze, isFunction } from 'https://esm.sh/kysely@0.23.4/dist/esm/util/object-utils.js'import { extendStackTrace } from 'https://esm.sh/kysely@0.23.4/dist/esm/util/stack-trace-utils.js'import { Pool, PoolClient } from 'https://deno.land/x/postgres@v0.17.0/mod.ts'export interface PostgresDialectConfig { pool: Pool | (() => Promise<Pool>) cursor?: PostgresCursorConstructor onCreateConnection?: (connection: DatabaseConnection) => Promise<void>}const PRIVATE_RELEASE_METHOD = Symbol()export class PostgresDriver implements Driver { readonly #config: PostgresDialectConfig readonly #connections = new WeakMap<PoolClient, DatabaseConnection>() #pool?: Pool constructor(config: PostgresDialectConfig) { this.#config = freeze({ ...config }) } async init(): Promise<void> { this.#pool = isFunction(this.#config.pool) ? await this.#config.pool() : this.#config.pool } async acquireConnection(): Promise<DatabaseConnection> { const client = await this.#pool!.connect() let connection = this.#connections.get(client) if (!connection) { connection = new PostgresConnection(client, { cursor: this.#config.cursor ?? null, }) this.#connections.set(client, connection) // 当创建新连接时,驱动程序必须负责调用 `onCreateConnection` // `pg` 模块没有提供连接创建的异步钩子,我们需要显式调用该方法 if (this.#config?.onCreateConnection) { await this.#config.onCreateConnection(connection) } } return connection } async beginTransaction( connection: DatabaseConnection, settings: TransactionSettings ): Promise<void> { if (settings.isolationLevel) { await connection.executeQuery( CompiledQuery.raw(`start transaction isolation level ${settings.isolationLevel}`) ) } else { await connection.executeQuery(CompiledQuery.raw('begin')) } } async commitTransaction(connection: DatabaseConnection): Promise<void> { await connection.executeQuery(CompiledQuery.raw('commit')) } async rollbackTransaction(connection: DatabaseConnection): Promise<void> { await connection.executeQuery(CompiledQuery.raw('rollback')) } async releaseConnection(connection: PostgresConnection): Promise<void> { connection[PRIVATE_RELEASE_METHOD]() } async destroy(): Promise<void> { if (this.#pool) { const pool = this.#pool this.#pool = undefined await pool.end() } }}interface PostgresConnectionOptions { cursor: PostgresCursorConstructor | null}class PostgresConnection implements DatabaseConnection { #client: PoolClient #options: PostgresConnectionOptions constructor(client: PoolClient, options: PostgresConnectionOptions) { this.#client = client this.#options = options } async executeQuery<O>(compiledQuery: CompiledQuery): Promise<QueryResult<O>> { try { const result = await this.#client.queryObject<O>(compiledQuery.sql, [ ...compiledQuery.parameters, ]) if ( result.command === 'INSERT' || result.command === 'UPDATE' || result.command === 'DELETE' ) { const numAffectedRows = BigInt(result.rowCount || 0) return { numUpdatedOrDeletedRows: numAffectedRows, numAffectedRows, rows: result.rows ?? [], } as any } return { rows: result.rows ?? [], } } catch (err) { throw extendStackTrace(err, new Error()) } } async *streamQuery<O>( _compiledQuery: CompiledQuery, chunkSize: number ): AsyncIterableIterator<QueryResult<O>> { if (!this.#options.cursor) { throw new Error( "Postgres 方言配置中缺少 'cursor' 参数,该参数是实现 Postgres 流式查询的必要条件" ) } if (!Number.isInteger(chunkSize) || chunkSize <= 0) { throw new Error('chunkSize 必须是正整数') } // 流式查询不可用 return null } [PRIVATE_RELEASE_METHOD](): void { this.#client.release() }}
创建 index.ts
文件来处理传入请求并执行查询:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182import { serve } from 'https://deno.land/std@0.175.0/http/server.ts'import { Pool } from 'https://deno.land/x/postgres@v0.17.0/mod.ts'import { Kysely, Generated, PostgresAdapter, PostgresIntrospector, PostgresQueryCompiler,} from 'https://esm.sh/kysely@0.23.4'import { PostgresDriver } from './DenoPostgresDriver.ts'console.log(`"kysely-postgres" 函数已启动并运行!`)interface AnimalTable { id: Generated<bigint> animal: string created_at: Date}// 此接口的键是表名interface Database { animals: AnimalTable}// 创建一个带有一个连接的数据库连接池const pool = new Pool( { tls: { caCertificates: [Deno.env.get('DB_SSL_CERT')!] }, database: 'postgres', hostname: Deno.env.get('DB_HOSTNAME'), user: 'postgres', port: 5432, password: Deno.env.get('DB_PASSWORD'), }, 1)// 在应用启动时创建 Kysely 实例const db = new Kysely<Database>({ dialect: { createAdapter() { return new PostgresAdapter() }, createDriver() { return new PostgresDriver({ pool }) }, createIntrospector(db: Kysely<unknown>) { return new PostgresIntrospector(db) }, createQueryCompiler() { return new PostgresQueryCompiler() }, },})serve(async (_req) => { try { // 执行查询 const animals = await db.selectFrom('animals').select(['id', 'animal', 'created_at']).execute() // 完美,类型推断正确 \o/ console.log(animals[0].created_at.getFullYear()) // 将结果格式化为美观的 JSON const body = JSON.stringify( animals, (key, value) => (typeof value === 'bigint' ? value.toString() : value), 2 ) // 返回带有正确内容类型头的响应 return new Response(body, { status: 200, headers: { 'Content-Type': 'application/json; charset=utf-8', }, }) } catch (err) { console.error(err) return new Response(String(err?.message ?? err), { status: 500 }) }})