diff --git a/sync_shorturl_event_from_mongo.ts b/sync_shorturl_event_from_mongo.ts deleted file mode 100644 index 2de05d4..0000000 --- a/sync_shorturl_event_from_mongo.ts +++ /dev/null @@ -1,501 +0,0 @@ -// 从MongoDB的trace表同步数据到ClickHouse的link_events表 -import { MongoClient, ObjectId } from 'mongodb'; -import fs from 'fs'; -import path from 'path'; - -// 硬编码配置信息 -const mongoConfig = { - host: "10.0.1.10", - port: "27017", - db: "main", - username: "", - password: "" -}; - -const clickhouseConfig = { - clickhouse_host: "10.0.1.60", - clickhouse_port: 8123, - clickhouse_user: "admin", - clickhouse_password: "your_secure_password", - clickhouse_database: "limq", - clickhouse_url: "http://10.0.1.60:8123" -}; - -// 状态文件存储路径 -const STATE_FILE_PATH = path.join(__dirname, 'shorturl_event_sync_state.json'); - -interface TraceRecord { - _id: ObjectId; - slugId: ObjectId; - label: string | null; - ip: string; - type: number; - platform: string; - platformOS: string; - browser: string; - browserVersion: string; - url: string; - createTime: number; -} - -interface SyncState { - last_sync_time: number; - records_synced: number; - last_sync_id?: string; - sync_filter?: string; -} - -// 替代 Windmill 的变量存储函数 -async function getVariable(key: string): Promise { - if (key === "f/shorturl_analytics/clickhouse/shorturl_sync_state") { - try { - if (fs.existsSync(STATE_FILE_PATH)) { - return fs.readFileSync(STATE_FILE_PATH, 'utf8'); - } - } catch (error) { - console.error("读取状态文件失败:", error); - } - return JSON.stringify({ - last_sync_time: 0, - records_synced: 0 - }); - } - - throw new Error(`未知的变量键: ${key}`); -} - -// 替代 Windmill 的变量设置函数 -async function setVariable(key: string, value: string): Promise { - if (key === "f/shorturl_analytics/clickhouse/shorturl_sync_state") { - try { - fs.writeFileSync(STATE_FILE_PATH, value, 'utf8'); - } catch (error) { - console.error("保存状态文件失败:", error); - throw error; - } - } else { - throw new Error(`未知的变量键: ${key}`); - } -} - -export async function main( - batch_size = 1000, - initial_sync = false, - max_records = 9999999, - timeout_minutes = 60, - skip_clickhouse_check = false, - force_insert = false -) { - const logWithTimestamp = (message: string) => { - const now = new Date(); - console.log(`[${now.toISOString()}] ${message}`); - }; - - logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); - logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); - if (skip_clickhouse_check) { - logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); - } - if (force_insert) { - logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); - } - - // 设置超时 - const startTime = Date.now(); - const timeoutMs = timeout_minutes * 60 * 1000; - - // 检查是否超时 - const checkTimeout = () => { - if (Date.now() - startTime > timeoutMs) { - console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); - return true; - } - return false; - }; - - console.log("MongoDB配置:", JSON.stringify(mongoConfig)); - console.log("ClickHouse配置:", JSON.stringify(clickhouseConfig)); - - // 构建MongoDB连接URL - let mongoUrl = "mongodb://"; - if (mongoConfig.username && mongoConfig.password) { - mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; - } - mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; - - console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); - - // 获取上次同步的状态 - let syncState: SyncState; - try { - const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state"); - try { - syncState = JSON.parse(rawSyncState); - console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); - } catch (parseError) { - console.error("解析同步状态失败:", parseError); - throw parseError; - } - } catch (error) { - console.log("未找到同步状态,创建初始同步状态"); - syncState = { - last_sync_time: 0, - records_synced: 0, - }; - } - - // 如果强制从头开始同步 - if (initial_sync) { - console.log("强制从头开始同步"); - syncState = { - last_sync_time: 0, - records_synced: 0, - }; - } - - // 连接MongoDB - const client = new MongoClient(mongoUrl); - try { - await client.connect(); - console.log("MongoDB连接成功"); - - const db = client.db(mongoConfig.db); - const traceCollection = db.collection("trace"); - - // 构建查询条件,只查询新的记录 - const query: Record = { - // 添加type为1的过滤条件 - type: 1 - }; - - if (syncState.last_sync_time > 0) { - query.createTime = { $gt: syncState.last_sync_time }; - } - - if (syncState.last_sync_id) { - // 如果有上次同步的ID,则从该ID之后开始查询 - query._id = { $gt: new ObjectId(syncState.last_sync_id) }; - } - - // 计算总记录数 - const totalRecords = await traceCollection.countDocuments(query); - console.log(`找到 ${totalRecords} 条新记录需要同步`); - - // 限制此次处理的记录数量 - const recordsToProcess = Math.min(totalRecords, max_records); - console.log(`本次将处理 ${recordsToProcess} 条记录`); - - if (totalRecords === 0) { - console.log("没有新记录需要同步,任务完成"); - return { - success: true, - records_synced: 0, - total_synced: syncState.records_synced, - message: "没有新记录需要同步" - }; - } - - // 分批处理记录 - let processedRecords = 0; - let lastId: string | undefined; - let lastCreateTime = syncState.last_sync_time; - const totalBatchRecords = 0; - - // 检查ClickHouse连接状态 - const checkClickHouseConnection = async (): Promise => { - if (skip_clickhouse_check) { - logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); - return true; - } - - try { - logWithTimestamp("测试ClickHouse连接..."); - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}`, - }, - body: "SELECT 1", - // 设置5秒超时 - signal: AbortSignal.timeout(5000) - }); - - if (response.ok) { - logWithTimestamp("ClickHouse连接测试成功"); - return true; - } else { - const errorText = await response.text(); - logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); - return false; - } - } catch (err) { - const error = err as Error; - logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`); - return false; - } - }; - - // 检查记录是否已经存在于ClickHouse中 - const checkExistingRecords = async (records: TraceRecord[]): Promise => { - if (records.length === 0) return []; - - // 如果跳过ClickHouse检查或强制插入,则直接返回所有记录 - if (skip_clickhouse_check || force_insert) { - logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`); - return records; - } - - logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`); - - try { - // 提取所有记录的ID - const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询 - logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`); - - // 构建查询SQL,检查记录是否已存在,确保添加FORMAT JSON来获取正确的JSON格式响应 - const query = ` - SELECT link_id - FROM ${clickhouseConfig.clickhouse_database}.link_events - WHERE link_id IN ('${recordIds.join("','")}') - FORMAT JSON - `; - - logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`); - - // 发送请求到ClickHouse,添加10秒超时 - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}` - }, - body: query, - signal: AbortSignal.timeout(10000) - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); - } - - // 获取响应文本以便记录 - const responseText = await response.text(); - logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`); - - if (!responseText.trim()) { - logWithTimestamp("ClickHouse返回空响应,假定没有记录存在"); - return records; // 如果响应为空,假设没有记录 - } - - // 解析结果 - let result; - try { - result = JSON.parse(responseText); - } catch (err) { - logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`); - throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`); - } - - // 确保result有正确的结构 - if (!result.data) { - logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`); - return records; // 如果没有data字段,假设没有记录 - } - - // 提取已存在的记录ID - const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id)); - - logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); - if (existingIds.size > 0) { - logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`); - } - - // 过滤出不存在的记录 - const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id - logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); - - return newRecords; - } catch (err) { - const error = err as Error; - logWithTimestamp(`ClickHouse查询出错: ${error.message}`); - if (skip_clickhouse_check) { - logWithTimestamp("已启用跳过ClickHouse检查,将继续处理所有记录"); - return records; - } else { - throw error; // 如果没有启用跳过检查,则抛出错误 - } - } - }; - - // 在处理记录前先检查ClickHouse连接 - const clickhouseConnected = await checkClickHouseConnection(); - if (!clickhouseConnected && !skip_clickhouse_check) { - logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); - throw new Error("ClickHouse连接失败,无法继续同步"); - } - - // 处理记录的函数 - const processRecords = async (records: TraceRecord[]) => { - if (records.length === 0) return 0; - - logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); - - // 检查记录是否已存在 - let newRecords; - try { - newRecords = await checkExistingRecords(records); - } catch (err) { - const error = err as Error; - logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); - if (!skip_clickhouse_check && !force_insert) { - throw error; - } - // 如果跳过检查或强制插入,则使用所有记录 - logWithTimestamp("将使用所有记录进行处理"); - newRecords = records; - } - - if (newRecords.length === 0) { - logWithTimestamp("所有记录都已存在,跳过处理"); - // 更新同步状态,即使没有新增记录 - const lastRecord = records[records.length - 1]; - lastId = lastRecord._id.toString(); - lastCreateTime = lastRecord.createTime; - return 0; - } - - logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`); - - // 准备ClickHouse插入数据 - const clickhouseData = newRecords.map(record => { - // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 - return { - // UUID将由ClickHouse自动生成 (event_id) - link_id: record.slugId.toString(), - channel_id: record.label || "", - visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID - session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID - event_type: record.type <= 4 ? record.type : 1, // 确保event_type在枚举范围内 - ip_address: record.ip, - country: "", // 这些字段在MongoDB中不存在,使用默认值 - city: "", - referrer: record.url || "", - utm_source: "", - utm_medium: "", - utm_campaign: "", - user_agent: record.browser + " " + record.browserVersion, - device_type: record.platform === "mobile" ? 1 : (record.platform === "tablet" ? 2 : 3), - browser: record.browser || "", - os: record.platformOS || "", - time_spent_sec: 0, - is_bounce: true, - is_qr_scan: false, - qr_code_id: "", - conversion_type: 1, // 默认为'visit' - conversion_value: 0, - custom_data: `{"mongo_id":"${record._id.toString()}"}` - }; - }); - - // 更新同步状态(使用原始records的最后一条,以确保进度正确) - const lastRecord = records[records.length - 1]; - lastId = lastRecord._id.toString(); - lastCreateTime = lastRecord.createTime; - logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`); - - // 生成ClickHouse插入SQL - const insertSQL = ` - INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events - (link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city, - referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os, - time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data) - VALUES ${clickhouseData.map(record => - `('${record.link_id}', '${record.channel_id.replace(/'/g, "''")}', '${record.visitor_id}', '${record.session_id}', - ${record.event_type}, '${record.ip_address}', '', '', - '${record.referrer.replace(/'/g, "''")}', '', '', '', '${record.user_agent.replace(/'/g, "''")}', ${record.device_type}, - '${record.browser.replace(/'/g, "''")}', '${record.os.replace(/'/g, "''")}', - 0, true, false, '', 1, 0, '${record.custom_data}')` - ).join(", ")} - `; - - if (insertSQL.length === 0) { - console.log("没有新记录需要插入"); - return 0; - } - - // 发送请求到ClickHouse,添加20秒超时 - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - try { - logWithTimestamp("发送插入请求到ClickHouse..."); - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}` - }, - body: insertSQL, - signal: AbortSignal.timeout(20000) - }); - - if (response.ok) { - logWithTimestamp("ClickHouse插入响应: 成功"); - return newRecords.length; - } else { - const errorText = await response.text(); - throw new Error(`ClickHouse插入失败: ${response.status} ${errorText}`); - } - } catch (err) { - const error = err as Error; - logWithTimestamp(`ClickHouse插入请求出错: ${error.message}`); - throw error; - } - }; - - // 处理所有记录 - let totalSyncedRecords = 0; - while (processedRecords < recordsToProcess) { - const batch = await traceCollection.find(query).skip(processedRecords).limit(batch_size).toArray(); - const batchRecords = await processRecords(batch); - processedRecords += batch.length; - totalSyncedRecords += batchRecords; - logWithTimestamp(`已处理 ${processedRecords} 条记录,共 ${totalSyncedRecords} 条记录已同步`); - - // 更新同步状态 - const newSyncState: SyncState = { - last_sync_time: lastCreateTime, - records_synced: syncState.records_synced + totalBatchRecords, - last_sync_id: lastId, - // 可以添加一个标记,表明这是type=1的过滤同步 - sync_filter: "type_1_only" - }; - - // 保存同步状态 - await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState)); - - // 检查是否超时 - if (checkTimeout()) break; - } - - // 返回同步结果 - return { - success: true, - records_synced: totalSyncedRecords, - total_synced: syncState.records_synced + totalSyncedRecords, - message: "同步完成" - }; - } catch (err) { - const error = err as Error; - logWithTimestamp(`同步任务出错: ${error.message}`); - return { - success: false, - records_synced: 0, - total_synced: syncState.records_synced, - message: error.message - }; - } finally { - await client.close(); - } -} \ No newline at end of file diff --git a/windmill/sync_shorturl_event_from_mongo.ts b/windmill/sync_shorturl_event_from_mongo.ts index 98fca68..3618310 100644 --- a/windmill/sync_shorturl_event_from_mongo.ts +++ b/windmill/sync_shorturl_event_from_mongo.ts @@ -410,13 +410,21 @@ export async function main( (link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city, referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os, time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data) - VALUES ${clickhouseData.map(record => - `('${record.link_id}', '${record.channel_id.replace(/'/g, "''")}', '${record.visitor_id}', '${record.session_id}', - ${record.event_type}, '${record.ip_address}', '', '', - '${record.referrer.replace(/'/g, "''")}', '', '', '', '${record.user_agent.replace(/'/g, "''")}', ${record.device_type}, - '${record.browser.replace(/'/g, "''")}', '${record.os.replace(/'/g, "''")}', - 0, true, false, '', 1, 0, '${record.custom_data}')` - ).join(", ")} + VALUES ${clickhouseData.map(record => { + // 确保所有字符串值都是字符串类型,并安全处理替换 + const safeReplace = (val: any): string => { + // 确保值是字符串,如果是null或undefined则使用空字符串 + const str = val === null || val === undefined ? "" : String(val); + // 安全替换单引号 + return str.replace(/'/g, "''"); + }; + + return `('${record.link_id}', '${safeReplace(record.channel_id)}', '${record.visitor_id}', '${record.session_id}', + ${record.event_type}, '${safeReplace(record.ip_address)}', '', '', + '${safeReplace(record.referrer)}', '', '', '', '${safeReplace(record.user_agent)}', ${record.device_type}, + '${safeReplace(record.browser)}', '${safeReplace(record.os)}', + 0, true, false, '', 1, 0, '${safeReplace(record.custom_data)}')`; + }).join(", ")} `; if (insertSQL.length === 0) {