// 文件名: sync_shorturl_schema_to_clickhouse.ts // 描述: 此脚本用于同步PostgreSQL中的short_url.shorturl表数据到ClickHouse // 创建日期: 2023-11-21 import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts"; import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts"; // 同步状态接口 interface SyncState { last_sync_time: string; // 上次同步的结束时间 records_synced: number; // 累计同步的记录数 last_run: string; // 上次运行的时间 } // 同步状态键名 const SYNC_STATE_KEY = "f/shorturl_analytics/shorturl_to_clickhouse_state"; // PostgreSQL配置接口 interface PgConfig { host: string; port: number; user: string; password: string; dbname?: string; [key: string]: unknown; } // ClickHouse配置接口 interface ChConfig { clickhouse_host: string; clickhouse_port: number; clickhouse_user: string; clickhouse_password: string; clickhouse_url?: string; } // Shorturl数据接口 interface ShortUrlData { id: string; slug: string; origin: string; // 对应ClickHouse中的original_url title?: string; description?: string; created_at?: string; updated_at?: string; deleted_at?: string; expires_at?: string; // 注意这里已更正为expires_at domain?: string; // 添加domain字段 } /** * 同步PostgreSQL short_url.shorturl表数据到ClickHouse */ export async function main( /** 是否为测试模式(不执行实际更新) */ dry_run = false, /** 是否显示详细日志 */ verbose = false, /** 是否重置同步状态(从头开始同步) */ reset_sync_state = false, /** 如果没有同步状态,往前查询多少小时的数据(默认1小时) */ default_hours_back = 1 ) { // 初始化日志函数 const log = (message: string, isVerbose = false) => { if (!isVerbose || verbose) { console.log(message); } }; // 获取同步状态 let syncState: SyncState | null = null; if (!reset_sync_state) { try { log("获取同步状态...", true); const rawState = await getVariable(SYNC_STATE_KEY); if (rawState) { if (typeof rawState === "string") { syncState = JSON.parse(rawState); } else { syncState = rawState as SyncState; } log(`找到上次同步状态: 最后同步时间 ${syncState.last_sync_time}, 已同步记录数 ${syncState.records_synced}`, true); } } catch (error) { log(`获取同步状态失败: ${error}, 将使用默认设置`, true); } } else { log("重置同步状态,从头开始同步", true); } // 设置时间范围 const oneHourAgo = new Date(Date.now() - default_hours_back * 60 * 60 * 1000).toISOString(); // 如果有同步状态,使用上次同步时间作为开始时间;否则使用默认时间 const start_time = syncState ? syncState.last_sync_time : oneHourAgo; const end_time = new Date().toISOString(); log(`开始同步shorturl表数据: ${start_time} 至 ${end_time}`); let pgPool: Pool | null = null; try { // 1. 获取数据库配置 log("获取PostgreSQL数据库配置...", true); const pgConfig = await getResource('f/limq/production_supabase') as PgConfig; // 2. 创建PostgreSQL连接池 pgPool = new Pool({ hostname: pgConfig.host, port: pgConfig.port, user: pgConfig.user, password: pgConfig.password, database: pgConfig.dbname || 'postgres' }, 3); // 3. 获取需要更新的数据 const shorturlData = await getShortUrlData(pgPool, start_time, end_time, log); log(`成功获取 ${shorturlData.length} 条shorturl数据`); if (shorturlData.length === 0) { // 更新同步状态,即使没有新数据 if (!dry_run) { await updateSyncState(end_time, syncState ? syncState.records_synced : 0, log); } return { success: true, message: "没有找到需要更新的数据", updated: 0 }; } // 4. 获取ClickHouse配置 const chConfig = await getClickHouseConfig(); // 5. 执行更新 if (!dry_run) { const shorturlUpdated = await updateClickHouseShortUrl(shorturlData, chConfig, log); // 更新同步状态 const totalSynced = (syncState ? syncState.records_synced : 0) + shorturlUpdated; await updateSyncState(end_time, totalSynced, log); return { success: true, message: "shorturl表数据同步完成", shorturl_updated: shorturlUpdated, total_synced: totalSynced, sync_state: { last_sync_time: end_time, records_synced: totalSynced } }; } else { log("测试模式: 不执行实际更新"); return { success: true, dry_run: true, shorturl_count: shorturlData.length, shorturl_sample: shorturlData.slice(0, 1) }; } } catch (error) { const errorMessage = `同步过程中发生错误: ${(error as Error).message}`; log(errorMessage); if ((error as Error).stack) { log(`错误堆栈: ${(error as Error).stack}`, true); } return { success: false, message: errorMessage }; } finally { if (pgPool) { await pgPool.end(); log("PostgreSQL连接池已关闭", true); } } } /** * 更新同步状态 */ async function updateSyncState(lastSyncTime: string, recordsSynced: number, log: (message: string, isVerbose?: boolean) => void): Promise { try { const newState: SyncState = { last_sync_time: lastSyncTime, records_synced: recordsSynced, last_run: new Date().toISOString() }; await setVariable(SYNC_STATE_KEY, newState); log(`同步状态已更新: 最后同步时间 ${lastSyncTime}, 累计同步记录数 ${recordsSynced}`, true); } catch (error) { log(`更新同步状态失败: ${error}`, true); // 继续执行,不中断同步过程 } } /** * 从PostgreSQL获取shorturl数据 */ async function getShortUrlData( pgPool: Pool, startTime: string, endTime: string, log: (message: string, isVerbose?: boolean) => void ): Promise { const client = await pgPool.connect(); try { log(`获取shorturl数据 (${startTime} 至 ${endTime})`, true); const query = ` SELECT id, slug, origin, title, description, domain, created_at, updated_at, deleted_at, expired_at as expires_at FROM short_url.shorturl WHERE (created_at >= $1 AND created_at <= $2) OR (updated_at >= $1 AND updated_at <= $2) `; const result = await client.queryObject(query, [startTime, endTime]); return result.rows as ShortUrlData[]; } finally { client.release(); } } /** * 格式化日期时间为ClickHouse可接受的格式 */ function formatDateTime(dateStr: string | null | undefined): string { if (!dateStr) return 'NULL'; try { // 将日期字符串转换为ISO格式 const date = new Date(dateStr); if (isNaN(date.getTime())) { return 'NULL'; } // 返回ISO格式的日期字符串,ClickHouse可以解析 return `parseDateTimeBestEffort('${date.toISOString()}')`; } catch (error) { console.error(`日期格式化错误: ${error}`); return 'NULL'; } } /** * 格式化进度显示 */ function formatProgress(current: number, total: number): string { const percent = Math.round((current / total) * 100); const progressBar = '[' + '='.repeat(Math.floor(percent / 5)) + ' '.repeat(20 - Math.floor(percent / 5)) + ']'; return `${progressBar} ${percent}% (${current}/${total})`; } /** * 更新ClickHouse中的shorturl表数据 */ async function updateClickHouseShortUrl( shorturls: ShortUrlData[], chConfig: ChConfig, log: (message: string, isVerbose?: boolean) => void ): Promise { if (shorturls.length === 0) { log('没有找到shorturl数据,跳过shorturl表更新'); return 0; } log(`准备更新 ${shorturls.length} 条shorturl数据`); // 检查ClickHouse中是否存在shorturl表 const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.shorturl'); if (!tableExists) { log('ClickHouse中未找到shorturl表,请先创建表'); return 0; } let updatedCount = 0; const startTime = Date.now(); // 使用批量插入更高效 const batchSize = 50; // 降低批次大小,使查询更稳定 for (let i = 0; i < shorturls.length; i += batchSize) { const batch = shorturls.slice(i, i + batchSize); let successCount = 0; // 显示批处理进度信息 const batchNumber = Math.floor(i / batchSize) + 1; const totalBatches = Math.ceil(shorturls.length / batchSize); log(`处理批次 ${batchNumber}/${totalBatches}: ${formatProgress(i, shorturls.length)}`); // 对每条记录使用单独的INSERT ... SELECT ... WHERE NOT EXISTS语句 for (let j = 0; j < batch.length; j++) { const shorturl = batch[j]; // 显示记录处理细节进度 const overallProgress = i + j + 1; if (overallProgress % 10 === 0 || overallProgress === shorturls.length) { // 每10条记录或最后一条记录显示一次进度 const elapsedSeconds = (Date.now() - startTime) / 1000; const recordsPerSecond = overallProgress / elapsedSeconds; const remainingRecords = shorturls.length - overallProgress; const estimatedSecondsRemaining = remainingRecords / recordsPerSecond; log(`总进度: ${formatProgress(overallProgress, shorturls.length)} - 速率: ${recordsPerSecond.toFixed(1)}条/秒 - 预计剩余时间: ${formatTime(estimatedSecondsRemaining)}`); } try { const insertQuery = ` INSERT INTO shorturl_analytics.shorturl SELECT '${escapeString(shorturl.id)}' AS id, '${escapeString(shorturl.id)}' AS external_id, 'shorturl' AS type, '${escapeString(shorturl.slug)}' AS slug, '${escapeString(shorturl.origin)}' AS original_url, ${shorturl.title ? `'${escapeString(shorturl.title)}'` : 'NULL'} AS title, ${shorturl.description ? `'${escapeString(shorturl.description)}'` : 'NULL'} AS description, '{}' AS attributes, 1 AS schema_version, '' AS creator_id, '' AS creator_email, '' AS creator_name, ${formatDateTime(shorturl.created_at)} AS created_at, ${formatDateTime(shorturl.updated_at)} AS updated_at, ${formatDateTime(shorturl.deleted_at)} AS deleted_at, '[]' AS projects, '[]' AS teams, '[]' AS tags, '[]' AS qr_codes, '[]' AS channels, '[]' AS favorites, ${formatDateTime(shorturl.expires_at)} AS expires_at, 0 AS click_count, 0 AS unique_visitors, ${shorturl.domain ? `'${escapeString(shorturl.domain)}'` : 'NULL'} AS domain WHERE NOT EXISTS ( SELECT 1 FROM shorturl_analytics.shorturl WHERE id = '${escapeString(shorturl.id)}' ) `; await executeClickHouseQuery(chConfig, insertQuery); successCount++; log(`成功处理shorturl: ${shorturl.id}`, true); } catch (error) { log(`处理shorturl ${shorturl.id} 失败: ${(error as Error).message}`); // 尝试使用简单插入作为备选方案 try { log(`尝试替代方法更新: ${shorturl.id}`, true); // 先检查记录是否存在 const checkQuery = `SELECT count() FROM shorturl_analytics.shorturl WHERE id = '${escapeString(shorturl.id)}'`; const existsResult = await executeClickHouseQuery(chConfig, checkQuery); const exists = parseInt(existsResult.trim()) > 0; if (!exists) { const fallbackQuery = ` INSERT INTO shorturl_analytics.shorturl ( id, external_id, type, slug, original_url, title, description, attributes, schema_version, creator_id, creator_email, creator_name, created_at, updated_at, deleted_at, projects, teams, tags, qr_codes, channels, favorites, expires_at, click_count, unique_visitors, domain ) VALUES ( '${escapeString(shorturl.id)}', '${escapeString(shorturl.id)}', 'shorturl', '${escapeString(shorturl.slug)}', '${escapeString(shorturl.origin)}', ${shorturl.title ? `'${escapeString(shorturl.title)}'` : 'NULL'}, ${shorturl.description ? `'${escapeString(shorturl.description)}'` : 'NULL'}, '{}', 1, '', '', '', ${formatDateTime(shorturl.created_at)}, ${formatDateTime(shorturl.updated_at)}, ${formatDateTime(shorturl.deleted_at)}, '[]', '[]', '[]', '[]', '[]', '[]', ${formatDateTime(shorturl.expires_at)}, 0, 0, ${shorturl.domain ? `'${escapeString(shorturl.domain)}'` : 'NULL'} ) `; await executeClickHouseQuery(chConfig, fallbackQuery); successCount++; log(`备选方式插入成功: ${shorturl.id}`, true); } else { log(`记录已存在,跳过: ${shorturl.id}`, true); } } catch (fallbackError) { log(`备选方式失败 ${shorturl.id}: ${(fallbackError as Error).message}`); } } } updatedCount += successCount; log(`批次 ${batchNumber}/${totalBatches} 完成: ${successCount}/${batch.length} 条成功 (总计: ${updatedCount}/${shorturls.length})`); } const totalTime = (Date.now() - startTime) / 1000; log(`同步完成! 总计处理: ${updatedCount}/${shorturls.length} 条记录, 耗时: ${formatTime(totalTime)}, 平均速率: ${(updatedCount / totalTime).toFixed(1)}条/秒`); return updatedCount; } /** * 获取ClickHouse配置 */ async function getClickHouseConfig(): Promise { try { const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse"); // 确保配置不为空 if (!chConfigJson) { throw new Error("未找到ClickHouse配置"); } // 解析JSON字符串为对象 let chConfig: ChConfig; if (typeof chConfigJson === 'string') { try { chConfig = JSON.parse(chConfigJson); } catch { throw new Error("ClickHouse配置不是有效的JSON"); } } else { chConfig = chConfigJson as ChConfig; } // 验证并构建URL if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) { chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`; } if (!chConfig.clickhouse_url) { throw new Error("ClickHouse配置缺少URL"); } return chConfig; } catch (error) { throw new Error(`获取ClickHouse配置失败: ${(error as Error).message}`); } } /** * 检查ClickHouse中是否存在指定表 */ async function checkClickHouseTable(chConfig: ChConfig, tableName: string): Promise { try { const query = `EXISTS TABLE ${tableName}`; const result = await executeClickHouseQuery(chConfig, query); return result.trim() === '1'; } catch (error) { console.error(`检查表 ${tableName} 失败:`, error); return false; } } /** * 执行ClickHouse查询 */ async function executeClickHouseQuery(chConfig: ChConfig, query: string): Promise { // 确保URL有效 if (!chConfig.clickhouse_url) { throw new Error("无效的ClickHouse URL: 未定义"); } // 执行HTTP请求 try { const response = await fetch(chConfig.clickhouse_url, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}` }, body: query, }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse查询失败 (${response.status}): ${errorText}`); } return await response.text(); } catch (error) { throw new Error(`执行ClickHouse查询失败: ${(error as Error).message}`); } } /** * 转义字符串,避免SQL注入 */ function escapeString(str: string): string { if (!str) return ''; return str.replace(/'/g, "''"); } /** * 格式化时间(秒)为可读格式 */ function formatTime(seconds: number): string { const mins = Math.floor(seconds / 60); const secs = Math.floor(seconds % 60); if (mins === 0) { return `${secs}秒`; } else { return `${mins}分${secs}秒`; } }