// 从MongoDB的trace表同步数据到ClickHouse的link_events表 import { MongoClient, ObjectId } from 'mongodb'; import fs from 'fs'; import path from 'path'; // 硬编码配置信息 const mongoConfig = { host: "10.0.1.10", port: "27017", db: "main", username: "", password: "" }; const clickhouseConfig = { clickhouse_host: "10.0.1.60", clickhouse_port: 8123, clickhouse_user: "admin", clickhouse_password: "your_secure_password", clickhouse_database: "limq", clickhouse_url: "http://10.0.1.60:8123" }; // 状态文件存储路径 const STATE_FILE_PATH = path.join(__dirname, 'shorturl_event_sync_state.json'); interface TraceRecord { _id: ObjectId; slugId: ObjectId; label: string | null; ip: string; type: number; platform: string; platformOS: string; browser: string; browserVersion: string; url: string; createTime: number; } interface SyncState { last_sync_time: number; records_synced: number; last_sync_id?: string; } // 替代 Windmill 的变量存储函数 async function getVariable(key: string): Promise { if (key === "f/shorturl_analytics/clickhouse/shorturl_sync_state") { try { if (fs.existsSync(STATE_FILE_PATH)) { return fs.readFileSync(STATE_FILE_PATH, 'utf8'); } } catch (error) { console.error("读取状态文件失败:", error); } return JSON.stringify({ last_sync_time: 0, records_synced: 0 }); } throw new Error(`未知的变量键: ${key}`); } // 替代 Windmill 的变量设置函数 async function setVariable(key: string, value: string): Promise { if (key === "f/shorturl_analytics/clickhouse/shorturl_sync_state") { try { fs.writeFileSync(STATE_FILE_PATH, value, 'utf8'); } catch (error) { console.error("保存状态文件失败:", error); throw error; } } else { throw new Error(`未知的变量键: ${key}`); } } export async function main( batch_size = 1000, initial_sync = false, max_records = 9999999, timeout_minutes = 60, skip_clickhouse_check = false, force_insert = false ) { const logWithTimestamp = (message: string) => { const now = new Date(); console.log(`[${now.toISOString()}] ${message}`); }; logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); if (skip_clickhouse_check) { logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); } if (force_insert) { logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); } // 设置超时 const startTime = Date.now(); const timeoutMs = timeout_minutes * 60 * 1000; // 检查是否超时 const checkTimeout = () => { if (Date.now() - startTime > timeoutMs) { console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); return true; } return false; }; console.log("MongoDB配置:", JSON.stringify(mongoConfig)); console.log("ClickHouse配置:", JSON.stringify(clickhouseConfig)); // 构建MongoDB连接URL let mongoUrl = "mongodb://"; if (mongoConfig.username && mongoConfig.password) { mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; } mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); // 获取上次同步的状态 let syncState: SyncState; try { const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state"); try { syncState = JSON.parse(rawSyncState); console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); } catch (parseError) { console.error("解析同步状态失败:", parseError); throw parseError; } } catch (error) { console.log("未找到同步状态,创建初始同步状态"); syncState = { last_sync_time: 0, records_synced: 0, }; } // 如果强制从头开始同步 if (initial_sync) { console.log("强制从头开始同步"); syncState = { last_sync_time: 0, records_synced: 0, }; } // 连接MongoDB const client = new MongoClient(mongoUrl); try { await client.connect(); console.log("MongoDB连接成功"); const db = client.db(mongoConfig.db); const traceCollection = db.collection("trace"); // 构建查询条件,只查询新的记录 const query: Record = {}; if (syncState.last_sync_time > 0) { query.createTime = { $gt: syncState.last_sync_time }; } if (syncState.last_sync_id) { // 如果有上次同步的ID,则从该ID之后开始查询 // 注意:这需要MongoDB中createTime相同的记录按_id排序 query._id = { $gt: new ObjectId(syncState.last_sync_id) }; } // 计算总记录数 const totalRecords = await traceCollection.countDocuments(query); console.log(`找到 ${totalRecords} 条新记录需要同步`); // 限制此次处理的记录数量 const recordsToProcess = Math.min(totalRecords, max_records); console.log(`本次将处理 ${recordsToProcess} 条记录`); if (totalRecords === 0) { console.log("没有新记录需要同步,任务完成"); return { success: true, records_synced: 0, total_synced: syncState.records_synced, message: "没有新记录需要同步" }; } // 分批处理记录 let processedRecords = 0; let lastId: string | undefined; let lastCreateTime = syncState.last_sync_time; let totalBatchRecords = 0; // 检查ClickHouse连接状态 const checkClickHouseConnection = async (): Promise => { if (skip_clickhouse_check) { logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); return true; } try { logWithTimestamp("测试ClickHouse连接..."); const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; const response = await fetch(clickhouseUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}`, }, body: "SELECT 1", // 设置5秒超时 signal: AbortSignal.timeout(5000) }); if (response.ok) { logWithTimestamp("ClickHouse连接测试成功"); return true; } else { const errorText = await response.text(); logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); return false; } } catch (err) { const error = err as Error; logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`); return false; } }; // 检查记录是否已经存在于ClickHouse中 const checkExistingRecords = async (records: TraceRecord[]): Promise => { if (records.length === 0) return []; // 如果跳过ClickHouse检查或强制插入,则直接返回所有记录 if (skip_clickhouse_check || force_insert) { logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`); return records; } logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`); try { // 提取所有记录的ID const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询 logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`); // 构建查询SQL,检查记录是否已存在,确保添加FORMAT JSON来获取正确的JSON格式响应 const query = ` SELECT link_id FROM ${clickhouseConfig.clickhouse_database}.link_events WHERE link_id IN ('${recordIds.join("','")}') FORMAT JSON `; logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`); // 发送请求到ClickHouse,添加10秒超时 const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; const response = await fetch(clickhouseUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}` }, body: query, signal: AbortSignal.timeout(10000) }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); } // 获取响应文本以便记录 const responseText = await response.text(); logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`); if (!responseText.trim()) { logWithTimestamp("ClickHouse返回空响应,假定没有记录存在"); return records; // 如果响应为空,假设没有记录 } // 解析结果 let result; try { result = JSON.parse(responseText); } catch (err) { logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`); throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`); } // 确保result有正确的结构 if (!result.data) { logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`); return records; // 如果没有data字段,假设没有记录 } // 提取已存在的记录ID const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id)); logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); if (existingIds.size > 0) { logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`); } // 过滤出不存在的记录 const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); return newRecords; } catch (err) { const error = err as Error; logWithTimestamp(`ClickHouse查询出错: ${error.message}`); if (skip_clickhouse_check) { logWithTimestamp("已启用跳过ClickHouse检查,将继续处理所有记录"); return records; } else { throw error; // 如果没有启用跳过检查,则抛出错误 } } }; // 在处理记录前先检查ClickHouse连接 const clickhouseConnected = await checkClickHouseConnection(); if (!clickhouseConnected && !skip_clickhouse_check) { logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); throw new Error("ClickHouse连接失败,无法继续同步"); } // 处理记录的函数 const processRecords = async (records: TraceRecord[]) => { if (records.length === 0) return 0; logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); // 检查记录是否已存在 let newRecords; try { newRecords = await checkExistingRecords(records); } catch (err) { const error = err as Error; logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); if (!skip_clickhouse_check && !force_insert) { throw error; } // 如果跳过检查或强制插入,则使用所有记录 logWithTimestamp("将使用所有记录进行处理"); newRecords = records; } if (newRecords.length === 0) { logWithTimestamp("所有记录都已存在,跳过处理"); // 更新同步状态,即使没有新增记录 const lastRecord = records[records.length - 1]; lastId = lastRecord._id.toString(); lastCreateTime = lastRecord.createTime; return 0; } logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`); // 准备ClickHouse插入数据 const clickhouseData = newRecords.map(record => { // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 return { // UUID将由ClickHouse自动生成 (event_id) link_id: record.slugId.toString(), channel_id: record.label || "", visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID event_type: record.type <= 4 ? record.type : 1, // 确保event_type在枚举范围内 ip_address: record.ip, country: "", // 这些字段在MongoDB中不存在,使用默认值 city: "", referrer: record.url || "", utm_source: "", utm_medium: "", utm_campaign: "", user_agent: record.browser + " " + record.browserVersion, device_type: record.platform === "mobile" ? 1 : (record.platform === "tablet" ? 2 : 3), browser: record.browser || "", os: record.platformOS || "", time_spent_sec: 0, is_bounce: true, is_qr_scan: false, qr_code_id: "", conversion_type: 1, // 默认为'visit' conversion_value: 0, custom_data: `{"mongo_id":"${record._id.toString()}"}` }; }); // 更新同步状态(使用原始records的最后一条,以确保进度正确) const lastRecord = records[records.length - 1]; lastId = lastRecord._id.toString(); lastCreateTime = lastRecord.createTime; logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`); // 生成ClickHouse插入SQL const insertSQL = ` INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events (link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city, referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os, time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data) VALUES ${clickhouseData.map(record => `('${record.link_id}', '${record.channel_id.replace(/'/g, "''")}', '${record.visitor_id}', '${record.session_id}', ${record.event_type}, '${record.ip_address}', '', '', '${record.referrer.replace(/'/g, "''")}', '', '', '', '${record.user_agent.replace(/'/g, "''")}', ${record.device_type}, '${record.browser.replace(/'/g, "''")}', '${record.os.replace(/'/g, "''")}', 0, true, false, '', 1, 0, '${record.custom_data}')` ).join(", ")} `; if (insertSQL.length === 0) { console.log("没有新记录需要插入"); return 0; } // 发送请求到ClickHouse,添加20秒超时 const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; try { logWithTimestamp("发送插入请求到ClickHouse..."); const response = await fetch(clickhouseUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}` }, body: insertSQL, signal: AbortSignal.timeout(20000) }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); } logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`); return newRecords.length; } catch (err) { const error = err as Error; logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`); throw error; } }; // 批量处理记录 for (let page = 0; processedRecords < recordsToProcess; page++) { // 检查超时 if (checkTimeout()) { logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); break; } // 每批次都输出进度 logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); const records = await traceCollection.find(query) .sort({ createTime: 1, _id: 1 }) .skip(page * batch_size) .limit(batch_size) .toArray(); if (records.length === 0) { logWithTimestamp(`第 ${page+1} 批次没有找到数据,结束处理`); break; } logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); // 输出当前批次的部分数据信息 if (records.length > 0) { logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`); if (records.length > 1) { logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); } } const batchSize = await processRecords(records); processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在 totalBatchRecords += batchSize; // 只增加实际插入的记录数 logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); // 更新查询条件,以便下一批次查询 query.createTime = { $gt: lastCreateTime }; if (lastId) { query._id = { $gt: new ObjectId(lastId) }; } logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`); } // 更新同步状态 const newSyncState: SyncState = { last_sync_time: lastCreateTime, records_synced: syncState.records_synced + totalBatchRecords, last_sync_id: lastId }; await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState)); console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`); return { success: true, records_processed: processedRecords, records_synced: totalBatchRecords, total_synced: newSyncState.records_synced, last_sync_time: new Date(newSyncState.last_sync_time).toISOString(), message: "数据同步完成" }; } catch (err) { console.error("同步过程中发生错误:", err); return { success: false, error: err instanceof Error ? err.message : String(err), stack: err instanceof Error ? err.stack : undefined }; } finally { // 关闭MongoDB连接 await client.close(); console.log("MongoDB连接已关闭"); } } // 如果直接执行此脚本 if (require.main === module) { // 解析命令行参数 const args = process.argv.slice(2); const params: Record = { batch_size: 1000, initial_sync: false, max_records: 9999999, timeout_minutes: 60, skip_clickhouse_check: false, force_insert: false }; // 简单的参数解析 for (let i = 0; i < args.length; i += 2) { if (args[i].startsWith('--') && i + 1 < args.length) { const key = args[i].substring(2); let value: any = args[i + 1]; // 类型转换 if (value === 'true') value = true; else if (value === 'false') value = false; else if (!isNaN(Number(value))) value = Number(value); params[key] = value; } } console.log('启动同步任务,参数:', params); main( params.batch_size, params.initial_sync, params.max_records, params.timeout_minutes, params.skip_clickhouse_check, params.force_insert ).then(result => { console.log('同步任务完成:', result); process.exit(result.success ? 0 : 1); }).catch(err => { console.error('同步任务失败:', err); process.exit(1); }); }