// Windmill script to sync shorturl data from PostgreSQL to ClickHouse // 作者: AI Assistant // 创建日期: 2023-10-30 // 描述: 此脚本从PostgreSQL数据库获取所有shorturl类型的资源及其关联数据,并同步到ClickHouse import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts"; import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts"; // 资源属性接口 interface ResourceAttributes { slug?: string; original_url?: string; originalUrl?: string; title?: string; description?: string; expires_at?: string; expiresAt?: string; [key: string]: unknown; } // ClickHouse配置接口 interface ChConfig { clickhouse_host: string; clickhouse_port: number; clickhouse_user: string; clickhouse_password: string; clickhouse_url: string; } // PostgreSQL配置接口 interface PgConfig { host: string; port: number; user: string; password: string; dbname?: string; [key: string]: unknown; } // 上次同步状态接口 interface SyncState { lastSyncTime: string; lastRunTime: string; } // 状态变量名称 const STATE_VARIABLE_PATH = "f/shorturl_analytics/shorturl_sync_state"; // Windmill函数定义 export async function main( /** PostgreSQL和ClickHouse同步脚本 */ params: { /** 同步的资源数量限制,默认500 */ limit?: number; /** 是否包含已删除资源 */ includeDeleted?: boolean; /** 是否执行实际写入操作 */ dryRun?: boolean; /** 是否强制全量同步 */ forceFullSync?: boolean; /** 手动指定开始时间(ISO格式)- 会覆盖自动增量设置 */ startTime?: string; /** 手动指定结束时间(ISO格式)*/ endTime?: string; } ) { // 设置默认参数 const limit = params.limit || 500; const includeDeleted = params.includeDeleted || false; const dryRun = params.dryRun || false; const forceFullSync = params.forceFullSync || false; // 获取当前时间作为本次运行时间 const currentRunTime = new Date().toISOString(); // 初始化同步状态 let syncState: SyncState; let startTime: Date | undefined; const endTime: Date | undefined = params.endTime ? new Date(params.endTime) : new Date(); // 如果强制全量同步或手动指定了开始时间,则使用指定的开始时间 if (forceFullSync || params.startTime) { startTime = params.startTime ? new Date(params.startTime) : undefined; console.log(`使用${params.startTime ? '手动指定' : '全量同步'} - 开始时间: ${startTime ? startTime.toISOString() : '无限制'}`); } // 否则尝试获取上次同步时间作为增量同步的开始时间点 else { try { // 获取上次同步状态 const stateStr = await getVariable(STATE_VARIABLE_PATH); if (stateStr) { syncState = JSON.parse(stateStr); console.log(`获取到上次同步状态: 同步时间=${syncState.lastSyncTime}, 运行时间=${syncState.lastRunTime}`); // 使用上次运行时间作为本次的开始时间 (减去1分钟防止边界问题) const lastRunTime = new Date(syncState.lastRunTime); lastRunTime.setMinutes(lastRunTime.getMinutes() - 1); startTime = lastRunTime; } else { console.log("未找到上次同步状态,将执行全量同步"); } } catch (error: unknown) { console.log(`获取同步状态出错: ${error instanceof Error ? error.message : String(error)},将执行全量同步`); } } console.log(`开始同步PostgreSQL shorturl数据到ClickHouse`); console.log(`参数: limit=${limit}, includeDeleted=${includeDeleted}, dryRun=${dryRun}`); if (startTime) console.log(`开始时间: ${startTime.toISOString()}`); if (endTime) console.log(`结束时间: ${endTime.toISOString()}`); // 获取数据库配置 console.log("获取PostgreSQL数据库配置..."); const pgConfig = await getResource('f/limq/production_supabase') as PgConfig; console.log(`数据库连接配置: host=${pgConfig.host}, port=${pgConfig.port}, database=${pgConfig.dbname || 'postgres'}, user=${pgConfig.user}`); let pgPool: Pool | null = null; try { console.log("创建PostgreSQL连接池..."); pgPool = new Pool({ hostname: pgConfig.host, port: pgConfig.port, user: pgConfig.user, password: pgConfig.password, database: pgConfig.dbname || 'postgres' }, 3); console.log("PostgreSQL连接池创建完成,尝试连接..."); // 测试连接 const client = await pgPool.connect(); try { console.log("连接成功,执行测试查询..."); const testResult = await client.queryObject(`SELECT 1 AS test`); console.log(`测试查询结果: ${JSON.stringify(testResult.rows)}`); } finally { client.release(); } // 获取所有shorturl类型的资源 const shorturls = await fetchShorturlResources(pgPool, { limit, includeDeleted, startTime, endTime, }); console.log(`获取到 ${shorturls.length} 个shorturl资源`); if (shorturls.length === 0) { // 即使没有数据也更新状态 await updateSyncState(currentRunTime); return { synced: 0, message: "没有找到需要同步的shorturl资源" }; } // 为每个资源获取关联数据 const enrichedShorturls = await enrichShorturlData(pgPool, shorturls); console.log(`已丰富 ${enrichedShorturls.length} 个shorturl资源的关联数据`); // 转换为ClickHouse格式 const clickhouseData = formatForClickhouse(enrichedShorturls); if (!dryRun) { // 写入ClickHouse const inserted = await insertToClickhouse(clickhouseData); console.log(`成功写入 ${inserted} 条记录到ClickHouse`); // 更新同步状态 await updateSyncState(currentRunTime); return { synced: inserted, message: "同步完成", lastSyncTime: currentRunTime }; } else { console.log("Dry run模式 - 不执行实际写入"); console.log(`将写入 ${clickhouseData.length} 条记录到ClickHouse`); // 输出示例数据 if (clickhouseData.length > 0) { console.log("示例数据:"); console.log(JSON.stringify(clickhouseData[0], null, 2)); } return { synced: 0, dryRun: true, sampleData: clickhouseData.slice(0, 1) }; } } catch (error: unknown) { console.error(`同步过程中发生错误: ${(error as Error).message}`); console.error(`错误类型: ${(error as Error).name}`); if ((error as Error).stack) { console.error(`错误堆栈: ${(error as Error).stack}`); } throw error; } finally { if (pgPool) { await pgPool.end(); console.log("PostgreSQL连接池已关闭"); } } } // 更新同步状态 async function updateSyncState(currentRunTime: string): Promise { try { const syncState: SyncState = { lastSyncTime: new Date().toISOString(), // 记录数据同步完成的时间 lastRunTime: currentRunTime // 记录本次运行的时间点 }; console.log(`更新同步状态: ${JSON.stringify(syncState)}`); await setVariable(STATE_VARIABLE_PATH, JSON.stringify(syncState)); } catch (error: unknown) { console.error(`更新同步状态失败: ${error instanceof Error ? error.message : String(error)}`); // 不中断主流程,即使状态更新失败 } } // 从PostgreSQL获取所有shorturl资源 async function fetchShorturlResources( pgPool: Pool, options: { limit: number; includeDeleted: boolean; startTime?: Date; endTime?: Date; } ) { let query = ` SELECT r.id, r.external_id, r.type, r.attributes, r.schema_version, r.creator_id, r.created_at, r.updated_at, r.deleted_at, u.email as creator_email, u.first_name as creator_first_name, u.last_name as creator_last_name FROM limq.resources r LEFT JOIN limq.users u ON r.creator_id = u.id WHERE r.type = 'shorturl' `; const params = []; let paramCount = 1; if (!options.includeDeleted) { query += ` AND r.deleted_at IS NULL`; } // 修改为同时考虑created_at和updated_at,确保捕获自上次同步以来创建或更新的记录 if (options.startTime) { query += ` AND (r.created_at >= $${paramCount} OR r.updated_at >= $${paramCount})`; params.push(options.startTime); paramCount++; } if (options.endTime) { query += ` AND r.created_at <= $${paramCount}`; params.push(options.endTime); paramCount++; } // 优先按更新时间排序,确保最近更新的记录先处理 query += ` ORDER BY r.updated_at DESC, r.created_at DESC LIMIT $${paramCount}`; params.push(options.limit); const client = await pgPool.connect(); try { const result = await client.queryObject(query, params); // 添加调试日志 - 显示获取的数据样本 if (result.rows.length > 0) { console.log(`获取到 ${result.rows.length} 条shorturl记录`); console.log(`第一条记录ID: ${result.rows[0].id}`); console.log(`attributes类型: ${typeof result.rows[0].attributes}`); console.log(`attributes内容示例: ${JSON.stringify(String(result.rows[0].attributes)).substring(0, 100)}...`); } return result.rows; } finally { client.release(); } } // 为每个shorturl资源获取关联数据 async function enrichShorturlData(pgPool: Pool, shorturls: Record[]) { const client = await pgPool.connect(); const enriched = []; try { for (const shorturl of shorturls) { // 1. 获取项目关联 const projectsResult = await client.queryObject(` SELECT pr.resource_id, pr.project_id, p.name as project_name, p.description as project_description, pr.assigned_at FROM limq.project_resources pr JOIN limq.projects p ON pr.project_id = p.id WHERE pr.resource_id = $1 `, [shorturl.id]); // 2. 获取团队关联(通过项目) const teamIds = projectsResult.rows.map((p: Record) => p.project_id); const teamsResult = teamIds.length > 0 ? await client.queryObject(` SELECT tp.team_id, tp.project_id, t.name as team_name, t.description as team_description FROM limq.team_projects tp JOIN limq.teams t ON tp.team_id = t.id WHERE tp.project_id = ANY($1::uuid[]) `, [teamIds]) : { rows: [] }; // 3. 获取标签关联 const tagsResult = await client.queryObject(` SELECT rt.resource_id, rt.tag_id, rt.created_at, t.name as tag_name, t.type as tag_type FROM limq.resource_tags rt JOIN limq.tags t ON rt.tag_id = t.id WHERE rt.resource_id = $1 `, [shorturl.id]); // 4. 获取QR码关联 const qrCodesResult = await client.queryObject(` SELECT id as qr_id, scan_count, url, template_name, created_at FROM limq.qr_code WHERE resource_id = $1 `, [shorturl.id]); // 5. 获取渠道关联 const channelsResult = await client.queryObject(` SELECT id as channel_id, name as channel_name, path as channel_path, "isUserCreated" as is_user_created FROM limq.channel WHERE "shortUrlId" = $1 `, [shorturl.id]); // 6. 获取收藏关联 const favoritesResult = await client.queryObject(` SELECT f.id as favorite_id, f.user_id, f.created_at, u.first_name, u.last_name FROM limq.favorite f JOIN limq.users u ON f.user_id = u.id WHERE f.favoritable_id = $1 AND f.favoritable_type = 'resource' `, [shorturl.id]); // 调试日志 console.log(`\n处理资源ID: ${shorturl.id}`); console.log(`attributes类型: ${typeof shorturl.attributes}`); // 改进的attributes解析逻辑 let attributes: ResourceAttributes = {}; try { if (typeof shorturl.attributes === 'string') { // 如果是字符串,尝试解析为JSON console.log(`尝试解析attributes字符串,长度: ${shorturl.attributes.length}`); attributes = JSON.parse(shorturl.attributes); } else if (typeof shorturl.attributes === 'object' && shorturl.attributes !== null) { // 如果已经是对象,直接使用 console.log('attributes已经是对象类型'); attributes = shorturl.attributes as ResourceAttributes; } else { console.log(`无效的attributes类型: ${typeof shorturl.attributes}`); attributes = {}; } } catch (err) { const error = err as Error; console.warn(`无法解析资源 ${shorturl.id} 的attributes JSON:`, error.message); // 尝试进行更多原始数据分析 if (typeof shorturl.attributes === 'string') { console.log(`原始字符串前100字符: ${shorturl.attributes.substring(0, 100)}`); } attributes = {}; } // 尝试从QR码获取数据 let slugFromQr = ""; const urlFromQr = ""; if (qrCodesResult.rows.length > 0 && qrCodesResult.rows[0].url) { const qrUrl = qrCodesResult.rows[0].url as string; console.log(`找到QR码URL: ${qrUrl}`); try { const urlParts = qrUrl.split('/'); slugFromQr = urlParts[urlParts.length - 1]; console.log(`从QR码URL提取的slug: ${slugFromQr}`); } catch (err) { const error = err as Error; console.log('无法从QR码URL提取slug:', error.message); } } // 日志输出实际字段值 console.log(`提取字段 - name: ${attributes.name || 'N/A'}, slug: ${attributes.slug || 'N/A'}`); console.log(`提取字段 - originalUrl: ${attributes.originalUrl || 'N/A'}, original_url: ${attributes.original_url || 'N/A'}`); // 整合所有数据 const slug = attributes.slug || attributes.name || slugFromQr || ""; const originalUrl = attributes.originalUrl || attributes.original_url || urlFromQr || ""; console.log(`最终使用的slug: ${slug}`); console.log(`最终使用的originalUrl: ${originalUrl}`); enriched.push({ ...shorturl, attributes, projects: projectsResult.rows, teams: teamsResult.rows, tags: tagsResult.rows, qrCodes: qrCodesResult.rows, channels: channelsResult.rows, favorites: favoritesResult.rows, // 从attributes中提取特定字段 - 使用改进的顺序和QR码备选 slug, originalUrl, title: attributes.title || "", description: attributes.description || "", expiresAt: attributes.expires_at || attributes.expiresAt || null }); } } finally { client.release(); } return enriched; } // 将PostgreSQL数据格式化为ClickHouse格式 function formatForClickhouse(shorturls: Record[]) { // 将日期格式化为ClickHouse兼容的DateTime64(3)格式 const formatDateTime = (date: Date | string | number | null | undefined): string | null => { if (!date) return null; // 转换为Date对象 const dateObj = date instanceof Date ? date : new Date(date); // 返回格式化的字符串: YYYY-MM-DD HH:MM:SS.SSS return dateObj.toISOString().replace('T', ' ').replace('Z', ''); }; console.log(`\n准备格式化 ${shorturls.length} 条记录为ClickHouse格式`); return shorturls.map(shorturl => { // 调试日志:输出关键字段 console.log(`处理资源: ${shorturl.id}`); console.log(`slug: ${shorturl.slug || 'EMPTY'}`); console.log(`originalUrl: ${shorturl.originalUrl || 'EMPTY'}`); // 记录attributes状态 const attributesStr = JSON.stringify(shorturl.attributes || {}); const attributesPrev = attributesStr.length > 100 ? attributesStr.substring(0, 100) + '...' : attributesStr; console.log(`attributes: ${attributesPrev}`); const creatorName = [shorturl.creator_first_name, shorturl.creator_last_name] .filter(Boolean) .join(" "); // 格式化项目数据为JSON数组 const projects = JSON.stringify((shorturl.projects as Record[]).map((p) => ({ project_id: p.project_id, project_name: p.project_name, project_description: p.project_description, assigned_at: p.assigned_at }))); // 格式化团队数据为JSON数组 const teams = JSON.stringify((shorturl.teams as Record[]).map((t) => ({ team_id: t.team_id, team_name: t.team_name, team_description: t.team_description, via_project_id: t.project_id }))); // 格式化标签数据为JSON数组 const tags = JSON.stringify((shorturl.tags as Record[]).map((t) => ({ tag_id: t.tag_id, tag_name: t.tag_name, tag_type: t.tag_type, created_at: t.created_at }))); // 格式化QR码数据为JSON数组 const qrCodes = JSON.stringify((shorturl.qrCodes as Record[]).map((q) => ({ qr_id: q.qr_id, scan_count: q.scan_count, url: q.url, template_name: q.template_name, created_at: q.created_at }))); // 格式化渠道数据为JSON数组 const channels = JSON.stringify((shorturl.channels as Record[]).map((c) => ({ channel_id: c.channel_id, channel_name: c.channel_name, channel_path: c.channel_path, is_user_created: c.is_user_created }))); // 格式化收藏数据为JSON数组 const favorites = JSON.stringify((shorturl.favorites as Record[]).map((f) => ({ favorite_id: f.favorite_id, user_id: f.user_id, user_name: `${f.first_name || ""} ${f.last_name || ""}`.trim(), created_at: f.created_at }))); // 统计信息(可通过events表聚合或在其他地方设置) const clickCount = (shorturl.attributes as ResourceAttributes).click_count as number || 0; const uniqueVisitors = 0; // 返回ClickHouse格式数据 return { id: shorturl.id, external_id: shorturl.external_id || "", type: shorturl.type, slug: shorturl.slug || "", original_url: shorturl.originalUrl || "", title: shorturl.title || "", description: shorturl.description || "", attributes: JSON.stringify(shorturl.attributes || {}), schema_version: shorturl.schema_version || 1, creator_id: shorturl.creator_id || "", creator_email: shorturl.creator_email || "", creator_name: creatorName, created_at: formatDateTime(shorturl.created_at as Date), updated_at: formatDateTime(shorturl.updated_at as Date), deleted_at: formatDateTime(shorturl.deleted_at as Date | null), projects, teams, tags, qr_codes: qrCodes, channels, favorites, expires_at: formatDateTime(shorturl.expiresAt as Date | null), click_count: clickCount, unique_visitors: uniqueVisitors }; }); } // 获取ClickHouse配置 async function getClickHouseConfig(): Promise { try { // 使用getVariable而不是getResource获取ClickHouse配置 const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse"); console.log("原始ClickHouse配置:", typeof chConfigJson); // 确保配置不为空 if (!chConfigJson) { throw new Error("未找到ClickHouse配置"); } // 解析JSON字符串为对象 let chConfig: ChConfig; if (typeof chConfigJson === 'string') { try { chConfig = JSON.parse(chConfigJson); } catch (parseError) { console.error("解析JSON失败:", parseError); throw new Error("ClickHouse配置不是有效的JSON"); } } else { chConfig = chConfigJson as ChConfig; } // 验证配置 if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) { chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`; console.log(`已构建ClickHouse URL: ${chConfig.clickhouse_url}`); } if (!chConfig.clickhouse_url) { throw new Error("ClickHouse配置缺少URL"); } return chConfig; } catch (error) { console.error("获取ClickHouse配置失败:", error); throw error; } } // 写入数据到ClickHouse async function insertToClickhouse(data: Record[]) { if (data.length === 0) return 0; // 获取ClickHouse连接信息 const chConfig = await getClickHouseConfig(); // 确保URL有效 if (!chConfig.clickhouse_url) { throw new Error("无效的ClickHouse URL: 未定义"); } console.log(`准备写入数据到ClickHouse: ${chConfig.clickhouse_url}`); // 构建INSERT查询 const columns = Object.keys(data[0]).join(", "); // 收集所有记录的ID const recordIds = data.map(record => record.id as string); console.log(`需要处理的记录数: ${recordIds.length}`); // 先删除可能存在的重复记录 try { console.log(`删除可能存在的重复记录...`); // 按批次处理删除,避免请求过大 const deleteBatchSize = 100; for (let i = 0; i < recordIds.length; i += deleteBatchSize) { const idBatch = recordIds.slice(i, i + deleteBatchSize); const formattedIds = idBatch.map(id => `'${id}'`).join(', '); const deleteQuery = ` ALTER TABLE shorturl_analytics.shorturl DELETE WHERE id IN (${formattedIds}) `; const response = await fetch(chConfig.clickhouse_url, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}` }, body: deleteQuery, }); if (!response.ok) { const errorText = await response.text(); console.warn(`删除记录时出错 (批次 ${i/deleteBatchSize + 1}): ${errorText}`); // 继续执行,不中断流程 } else { console.log(`成功删除批次 ${i/deleteBatchSize + 1}/${Math.ceil(recordIds.length/deleteBatchSize)}的潜在重复记录`); } } } catch (error) { console.warn(`删除重复记录时出错: ${(error as Error).message}`); // 继续执行,不因为删除失败而中断整个过程 } const query = ` INSERT INTO shorturl_analytics.shorturl (${columns}) FORMAT JSONEachRow `; // 批量插入 let inserted = 0; const batchSize = 100; for (let i = 0; i < data.length; i += batchSize) { const batch = data.slice(i, i + batchSize); // 使用JSONEachRow格式 const rows = batch.map(row => JSON.stringify(row)).join('\n'); // 使用HTTP接口执行查询 try { console.log(`正在发送请求到: ${chConfig.clickhouse_url}`); console.log(`认证信息: ${chConfig.clickhouse_user}:***`); const response = await fetch(chConfig.clickhouse_url, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}` }, body: `${query}\n${rows}`, }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse插入失败: ${errorText}`); } inserted += batch.length; console.log(`已插入 ${inserted}/${data.length} 条记录`); } catch (error) { console.error(`请求ClickHouse时出错:`, error); throw error; } } return inserted; }