// 从MongoDB的trace表同步数据到ClickHouse的link_events表 import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.50.0/mod.ts"; import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; interface MongoConfig { host: string; port: string; db: string; username: string; password: string; } interface ClickHouseConfig { clickhouse_host: string; clickhouse_port: number; clickhouse_user: string; clickhouse_password: string; clickhouse_database: string; clickhouse_url: string; } interface TraceRecord { _id: ObjectId; slugId: ObjectId; label: string | null; ip: string; type: number; platform: string; platformOS: string; browser: string; browserVersion: string; url: string; createTime: number; } interface SyncState { last_sync_time: number; records_synced: number; last_sync_id?: string; } export async function main( batch_size = 10, // 默认一次只同步10条记录 initial_sync = false, ) { console.log("开始执行MongoDB到ClickHouse的同步任务..."); // 获取MongoDB和ClickHouse的连接信息 const mongoConfig = await getResource("u/vitalitymailg/mongodb"); const clickhouseConfig = await getResource("u/vitalitymailg/clickhouse"); // 构建MongoDB连接URL let mongoUrl = "mongodb://"; if (mongoConfig.username && mongoConfig.password) { mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; } mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); // 获取上次同步的状态 let syncState: SyncState; try { syncState = await getVariable("shorturl_sync_state"); console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); } catch (_err) { console.log("未找到同步状态,创建初始同步状态"); syncState = { last_sync_time: 0, records_synced: 0, }; } // 如果强制从头开始同步 if (initial_sync) { console.log("强制从头开始同步"); syncState = { last_sync_time: 0, records_synced: 0, }; } // 连接MongoDB const client = new MongoClient(); try { await client.connect(mongoUrl); console.log("MongoDB连接成功"); const db = client.database(mongoConfig.db); const traceCollection = db.collection("trace"); // 构建查询条件,只查询新的记录 const query: Record = {}; if (syncState.last_sync_time > 0) { query.createTime = { $gt: syncState.last_sync_time }; } if (syncState.last_sync_id) { // 如果有上次同步的ID,则从该ID之后开始查询 // 注意:这需要MongoDB中createTime相同的记录按_id排序 query._id = { $gt: new ObjectId(syncState.last_sync_id) }; } // 计算总记录数 const totalRecords = await traceCollection.countDocuments(query); console.log(`找到 ${totalRecords} 条新记录需要同步`); if (totalRecords === 0) { console.log("没有新记录需要同步,任务完成"); return { success: true, records_synced: 0, total_synced: syncState.records_synced, message: "没有新记录需要同步" }; } // 分批处理记录 let processedRecords = 0; let lastId: string | undefined; let lastCreateTime = syncState.last_sync_time; let totalBatchRecords = 0; // 检查记录是否已经存在于ClickHouse中 const checkExistingRecords = async (records: TraceRecord[]): Promise => { if (records.length === 0) return []; // 提取所有记录的ID const recordIds = records.map(record => record._id.toString()); // 构建查询SQL,检查记录是否已存在 const query = ` SELECT id FROM ${clickhouseConfig.clickhouse_database}.link_events WHERE id IN ('${recordIds.join("','")}') `; // 发送请求到ClickHouse const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; const response = await fetch(clickhouseUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` }, body: query }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); } // 解析结果 const result = await response.json(); // 提取已存在的记录ID const existingIds = new Set(result.map((row: { id: string }) => row.id)); console.log(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); // 过滤出不存在的记录 return records.filter(record => !existingIds.has(record._id.toString())); }; // 处理记录的函数 const processRecords = async (records: TraceRecord[]) => { if (records.length === 0) return 0; // 检查记录是否已存在 const newRecords = await checkExistingRecords(records); if (newRecords.length === 0) { console.log("所有记录都已存在,跳过处理"); // 更新同步状态,即使没有新增记录 const lastRecord = records[records.length - 1]; lastId = lastRecord._id.toString(); lastCreateTime = lastRecord.createTime; return 0; } console.log(`处理 ${newRecords.length} 条新记录`); // 准备ClickHouse插入数据 const clickhouseData = newRecords.map(record => { // 转换MongoDB记录为ClickHouse格式 return { id: record._id.toString(), slug_id: record.slugId.toString(), label: record.label || "", ip: record.ip, type: record.type, platform: record.platform || "", platform_os: record.platformOS || "", browser: record.browser || "", browser_version: record.browserVersion || "", url: record.url || "", created_at: new Date(record.createTime).toISOString(), created_time: record.createTime }; }); // 更新同步状态(使用原始records的最后一条,以确保进度正确) const lastRecord = records[records.length - 1]; lastId = lastRecord._id.toString(); lastCreateTime = lastRecord.createTime; // 生成ClickHouse插入SQL const values = clickhouseData.map(record => `('${record.id}', '${record.slug_id}', '${record.label.replace(/'/g, "''")}', '${record.ip}', ${record.type}, '${record.platform.replace(/'/g, "''")}', '${record.platform_os.replace(/'/g, "''")}', '${record.browser.replace(/'/g, "''")}', '${record.browser_version.replace(/'/g, "''")}', '${record.url.replace(/'/g, "''")}', '${record.created_at}')` ).join(", "); if (values.length === 0) { console.log("没有新记录需要插入"); return 0; } const insertSQL = ` INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events (id, slug_id, label, ip, type, platform, platform_os, browser, browser_version, url, created_at) VALUES ${values} `; // 发送请求到ClickHouse const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; const response = await fetch(clickhouseUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` }, body: insertSQL }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); } console.log(`成功插入 ${newRecords.length} 条记录到ClickHouse`); return newRecords.length; }; // 批量处理记录 for (let page = 0; processedRecords < totalRecords; page++) { const records = await traceCollection.find(query) .sort({ createTime: 1, _id: 1 }) .skip(page * batch_size) .limit(batch_size) .toArray(); if (records.length === 0) break; const batchSize = await processRecords(records); processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在 totalBatchRecords += batchSize; // 只增加实际插入的记录数 console.log(`已处理 ${processedRecords}/${totalRecords} 条记录,实际插入 ${totalBatchRecords} 条`); // 更新查询条件,以便下一批次查询 query.createTime = { $gt: lastCreateTime }; if (lastId) { query._id = { $gt: new ObjectId(lastId) }; } } // 更新同步状态 const newSyncState: SyncState = { last_sync_time: lastCreateTime, records_synced: syncState.records_synced + totalBatchRecords, last_sync_id: lastId }; await setVariable("shorturl_sync_state", newSyncState); console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`); return { success: true, records_processed: processedRecords, records_synced: totalBatchRecords, total_synced: newSyncState.records_synced, last_sync_time: new Date(newSyncState.last_sync_time).toISOString(), message: "数据同步完成" }; } catch (err) { console.error("同步过程中发生错误:", err); return { success: false, error: err instanceof Error ? err.message : String(err), stack: err instanceof Error ? err.stack : undefined }; } finally { // 关闭MongoDB连接 await client.close(); console.log("MongoDB连接已关闭"); } }