From 90e2000842d37f8e693ff617776651af701293c4 Mon Sep 17 00:00:00 2001 From: William Tso Date: Mon, 24 Mar 2025 15:22:23 +0800 Subject: [PATCH] add windmill instructions --- sync_shorturl_event_from_mongo.ts | 575 ++++++++++++++++++++++++++++++ windmill/README.md | 73 ++++ 2 files changed, 648 insertions(+) create mode 100644 sync_shorturl_event_from_mongo.ts create mode 100644 windmill/README.md diff --git a/sync_shorturl_event_from_mongo.ts b/sync_shorturl_event_from_mongo.ts new file mode 100644 index 0000000..ecfbe58 --- /dev/null +++ b/sync_shorturl_event_from_mongo.ts @@ -0,0 +1,575 @@ +// 从MongoDB的trace表同步数据到ClickHouse的link_events表 +import { MongoClient, ObjectId } from 'mongodb'; +import fs from 'fs'; +import path from 'path'; + +// 硬编码配置信息 +const mongoConfig = { + host: "10.0.1.10", + port: "27017", + db: "main", + username: "", + password: "" +}; + +const clickhouseConfig = { + clickhouse_host: "10.0.1.60", + clickhouse_port: 8123, + clickhouse_user: "admin", + clickhouse_password: "your_secure_password", + clickhouse_database: "limq", + clickhouse_url: "http://10.0.1.60:8123" +}; + +// 状态文件存储路径 +const STATE_FILE_PATH = path.join(__dirname, 'shorturl_event_sync_state.json'); + +interface TraceRecord { + _id: ObjectId; + slugId: ObjectId; + label: string | null; + ip: string; + type: number; + platform: string; + platformOS: string; + browser: string; + browserVersion: string; + url: string; + createTime: number; +} + +interface SyncState { + last_sync_time: number; + records_synced: number; + last_sync_id?: string; +} + +// 替代 Windmill 的变量存储函数 +async function getVariable(key: string): Promise { + if (key === "f/shorturl_analytics/clickhouse/shorturl_sync_state") { + try { + if (fs.existsSync(STATE_FILE_PATH)) { + return fs.readFileSync(STATE_FILE_PATH, 'utf8'); + } + } catch (error) { + console.error("读取状态文件失败:", error); + } + return JSON.stringify({ + last_sync_time: 0, + records_synced: 0 + }); + } + + throw new Error(`未知的变量键: ${key}`); +} + +// 替代 Windmill 的变量设置函数 +async function setVariable(key: string, value: string): Promise { + if (key === "f/shorturl_analytics/clickhouse/shorturl_sync_state") { + try { + fs.writeFileSync(STATE_FILE_PATH, value, 'utf8'); + } catch (error) { + console.error("保存状态文件失败:", error); + throw error; + } + } else { + throw new Error(`未知的变量键: ${key}`); + } +} + +export async function main( + batch_size = 1000, + initial_sync = false, + max_records = 9999999, + timeout_minutes = 60, + skip_clickhouse_check = false, + force_insert = false +) { + const logWithTimestamp = (message: string) => { + const now = new Date(); + console.log(`[${now.toISOString()}] ${message}`); + }; + + logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); + logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); + if (skip_clickhouse_check) { + logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); + } + if (force_insert) { + logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); + } + + // 设置超时 + const startTime = Date.now(); + const timeoutMs = timeout_minutes * 60 * 1000; + + // 检查是否超时 + const checkTimeout = () => { + if (Date.now() - startTime > timeoutMs) { + console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); + return true; + } + return false; + }; + + console.log("MongoDB配置:", JSON.stringify(mongoConfig)); + console.log("ClickHouse配置:", JSON.stringify(clickhouseConfig)); + + // 构建MongoDB连接URL + let mongoUrl = "mongodb://"; + if (mongoConfig.username && mongoConfig.password) { + mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; + } + mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; + + console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); + + // 获取上次同步的状态 + let syncState: SyncState; + try { + const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state"); + try { + syncState = JSON.parse(rawSyncState); + console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); + } catch (parseError) { + console.error("解析同步状态失败:", parseError); + throw parseError; + } + } catch (error) { + console.log("未找到同步状态,创建初始同步状态"); + syncState = { + last_sync_time: 0, + records_synced: 0, + }; + } + + // 如果强制从头开始同步 + if (initial_sync) { + console.log("强制从头开始同步"); + syncState = { + last_sync_time: 0, + records_synced: 0, + }; + } + + // 连接MongoDB + const client = new MongoClient(mongoUrl); + try { + await client.connect(); + console.log("MongoDB连接成功"); + + const db = client.db(mongoConfig.db); + const traceCollection = db.collection("trace"); + + // 构建查询条件,只查询新的记录 + const query: Record = {}; + + if (syncState.last_sync_time > 0) { + query.createTime = { $gt: syncState.last_sync_time }; + } + + if (syncState.last_sync_id) { + // 如果有上次同步的ID,则从该ID之后开始查询 + // 注意:这需要MongoDB中createTime相同的记录按_id排序 + query._id = { $gt: new ObjectId(syncState.last_sync_id) }; + } + + // 计算总记录数 + const totalRecords = await traceCollection.countDocuments(query); + console.log(`找到 ${totalRecords} 条新记录需要同步`); + + // 限制此次处理的记录数量 + const recordsToProcess = Math.min(totalRecords, max_records); + console.log(`本次将处理 ${recordsToProcess} 条记录`); + + if (totalRecords === 0) { + console.log("没有新记录需要同步,任务完成"); + return { + success: true, + records_synced: 0, + total_synced: syncState.records_synced, + message: "没有新记录需要同步" + }; + } + + // 分批处理记录 + let processedRecords = 0; + let lastId: string | undefined; + let lastCreateTime = syncState.last_sync_time; + let totalBatchRecords = 0; + + // 检查ClickHouse连接状态 + const checkClickHouseConnection = async (): Promise => { + if (skip_clickhouse_check) { + logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); + return true; + } + + try { + logWithTimestamp("测试ClickHouse连接..."); + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}`, + }, + body: "SELECT 1", + // 设置5秒超时 + signal: AbortSignal.timeout(5000) + }); + + if (response.ok) { + logWithTimestamp("ClickHouse连接测试成功"); + return true; + } else { + const errorText = await response.text(); + logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); + return false; + } + } catch (err) { + const error = err as Error; + logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`); + return false; + } + }; + + // 检查记录是否已经存在于ClickHouse中 + const checkExistingRecords = async (records: TraceRecord[]): Promise => { + if (records.length === 0) return []; + + // 如果跳过ClickHouse检查或强制插入,则直接返回所有记录 + if (skip_clickhouse_check || force_insert) { + logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`); + return records; + } + + logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`); + + try { + // 提取所有记录的ID + const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询 + logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`); + + // 构建查询SQL,检查记录是否已存在,确保添加FORMAT JSON来获取正确的JSON格式响应 + const query = ` + SELECT link_id + FROM ${clickhouseConfig.clickhouse_database}.link_events + WHERE link_id IN ('${recordIds.join("','")}') + FORMAT JSON + `; + + logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`); + + // 发送请求到ClickHouse,添加10秒超时 + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}` + }, + body: query, + signal: AbortSignal.timeout(10000) + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); + } + + // 获取响应文本以便记录 + const responseText = await response.text(); + logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`); + + if (!responseText.trim()) { + logWithTimestamp("ClickHouse返回空响应,假定没有记录存在"); + return records; // 如果响应为空,假设没有记录 + } + + // 解析结果 + let result; + try { + result = JSON.parse(responseText); + } catch (err) { + logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`); + throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`); + } + + // 确保result有正确的结构 + if (!result.data) { + logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`); + return records; // 如果没有data字段,假设没有记录 + } + + // 提取已存在的记录ID + const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id)); + + logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); + if (existingIds.size > 0) { + logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`); + } + + // 过滤出不存在的记录 + const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id + logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); + + return newRecords; + } catch (err) { + const error = err as Error; + logWithTimestamp(`ClickHouse查询出错: ${error.message}`); + if (skip_clickhouse_check) { + logWithTimestamp("已启用跳过ClickHouse检查,将继续处理所有记录"); + return records; + } else { + throw error; // 如果没有启用跳过检查,则抛出错误 + } + } + }; + + // 在处理记录前先检查ClickHouse连接 + const clickhouseConnected = await checkClickHouseConnection(); + if (!clickhouseConnected && !skip_clickhouse_check) { + logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); + throw new Error("ClickHouse连接失败,无法继续同步"); + } + + // 处理记录的函数 + const processRecords = async (records: TraceRecord[]) => { + if (records.length === 0) return 0; + + logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); + + // 检查记录是否已存在 + let newRecords; + try { + newRecords = await checkExistingRecords(records); + } catch (err) { + const error = err as Error; + logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); + if (!skip_clickhouse_check && !force_insert) { + throw error; + } + // 如果跳过检查或强制插入,则使用所有记录 + logWithTimestamp("将使用所有记录进行处理"); + newRecords = records; + } + + if (newRecords.length === 0) { + logWithTimestamp("所有记录都已存在,跳过处理"); + // 更新同步状态,即使没有新增记录 + const lastRecord = records[records.length - 1]; + lastId = lastRecord._id.toString(); + lastCreateTime = lastRecord.createTime; + return 0; + } + + logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`); + + // 准备ClickHouse插入数据 + const clickhouseData = newRecords.map(record => { + // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 + return { + // UUID将由ClickHouse自动生成 (event_id) + link_id: record.slugId.toString(), + channel_id: record.label || "", + visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID + session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID + event_type: record.type <= 4 ? record.type : 1, // 确保event_type在枚举范围内 + ip_address: record.ip, + country: "", // 这些字段在MongoDB中不存在,使用默认值 + city: "", + referrer: record.url || "", + utm_source: "", + utm_medium: "", + utm_campaign: "", + user_agent: record.browser + " " + record.browserVersion, + device_type: record.platform === "mobile" ? 1 : (record.platform === "tablet" ? 2 : 3), + browser: record.browser || "", + os: record.platformOS || "", + time_spent_sec: 0, + is_bounce: true, + is_qr_scan: false, + qr_code_id: "", + conversion_type: 1, // 默认为'visit' + conversion_value: 0, + custom_data: `{"mongo_id":"${record._id.toString()}"}` + }; + }); + + // 更新同步状态(使用原始records的最后一条,以确保进度正确) + const lastRecord = records[records.length - 1]; + lastId = lastRecord._id.toString(); + lastCreateTime = lastRecord.createTime; + logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`); + + // 生成ClickHouse插入SQL + const insertSQL = ` + INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events + (link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city, + referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os, + time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data) + VALUES ${clickhouseData.map(record => + `('${record.link_id}', '${record.channel_id.replace(/'/g, "''")}', '${record.visitor_id}', '${record.session_id}', + ${record.event_type}, '${record.ip_address}', '', '', + '${record.referrer.replace(/'/g, "''")}', '', '', '', '${record.user_agent.replace(/'/g, "''")}', ${record.device_type}, + '${record.browser.replace(/'/g, "''")}', '${record.os.replace(/'/g, "''")}', + 0, true, false, '', 1, 0, '${record.custom_data}')` + ).join(", ")} + `; + + if (insertSQL.length === 0) { + console.log("没有新记录需要插入"); + return 0; + } + + // 发送请求到ClickHouse,添加20秒超时 + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + try { + logWithTimestamp("发送插入请求到ClickHouse..."); + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}` + }, + body: insertSQL, + signal: AbortSignal.timeout(20000) + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); + } + + logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`); + return newRecords.length; + } catch (err) { + const error = err as Error; + logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`); + throw error; + } + }; + + // 批量处理记录 + for (let page = 0; processedRecords < recordsToProcess; page++) { + // 检查超时 + if (checkTimeout()) { + logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); + break; + } + + // 每批次都输出进度 + logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); + const records = await traceCollection.find(query) + .sort({ createTime: 1, _id: 1 }) + .skip(page * batch_size) + .limit(batch_size) + .toArray(); + + if (records.length === 0) { + logWithTimestamp(`第 ${page+1} 批次没有找到数据,结束处理`); + break; + } + + logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); + // 输出当前批次的部分数据信息 + if (records.length > 0) { + logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`); + if (records.length > 1) { + logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); + } + } + + const batchSize = await processRecords(records); + processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在 + totalBatchRecords += batchSize; // 只增加实际插入的记录数 + + logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + // 更新查询条件,以便下一批次查询 + query.createTime = { $gt: lastCreateTime }; + if (lastId) { + query._id = { $gt: new ObjectId(lastId) }; + } + logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`); + } + + // 更新同步状态 + const newSyncState: SyncState = { + last_sync_time: lastCreateTime, + records_synced: syncState.records_synced + totalBatchRecords, + last_sync_id: lastId + }; + + await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState)); + console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`); + + return { + success: true, + records_processed: processedRecords, + records_synced: totalBatchRecords, + total_synced: newSyncState.records_synced, + last_sync_time: new Date(newSyncState.last_sync_time).toISOString(), + message: "数据同步完成" + }; + } catch (err) { + console.error("同步过程中发生错误:", err); + return { + success: false, + error: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined + }; + } finally { + // 关闭MongoDB连接 + await client.close(); + console.log("MongoDB连接已关闭"); + } +} + +// 如果直接执行此脚本 +if (require.main === module) { + // 解析命令行参数 + const args = process.argv.slice(2); + const params: Record = { + batch_size: 1000, + initial_sync: false, + max_records: 9999999, + timeout_minutes: 60, + skip_clickhouse_check: false, + force_insert: false + }; + + // 简单的参数解析 + for (let i = 0; i < args.length; i += 2) { + if (args[i].startsWith('--') && i + 1 < args.length) { + const key = args[i].substring(2); + let value: any = args[i + 1]; + + // 类型转换 + if (value === 'true') value = true; + else if (value === 'false') value = false; + else if (!isNaN(Number(value))) value = Number(value); + + params[key] = value; + } + } + + console.log('启动同步任务,参数:', params); + main( + params.batch_size, + params.initial_sync, + params.max_records, + params.timeout_minutes, + params.skip_clickhouse_check, + params.force_insert + ).then(result => { + console.log('同步任务完成:', result); + process.exit(result.success ? 0 : 1); + }).catch(err => { + console.error('同步任务失败:', err); + process.exit(1); + }); +} \ No newline at end of file diff --git a/windmill/README.md b/windmill/README.md new file mode 100644 index 0000000..7091b0a --- /dev/null +++ b/windmill/README.md @@ -0,0 +1,73 @@ + + +Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_from_mongo_to_clickhouse.ts + +Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_from_mongo_to_clickhouse.ts + +Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_event_from_mongo.ts + +Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_event_from_mongo.ts + +Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_from_mongo_to_clickhouse.ts + +Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_event_from_mongo.ts +这两个脚本是使用 Windmill 平台开发的数据同步工具,用于将短链接相关数据从 MongoDB 数据库同步到 ClickHouse 数据库。 + +## 1. sync_shorturl_from_mongo_to_clickhouse.ts + +**功能**: 将 MongoDB 中的短链接数据(short 表)同步到 ClickHouse 的 links 表 + +**主要特点**: +- 增量同步: 记录上次同步位置,只处理新增数据 +- 批量处理: 默认每批次处理 100 条记录,可配置 +- 超时控制: 设置最大运行时间(默认 30 分钟) +- 数据重复检查: 检查 ClickHouse 中是否已存在相同记录 +- 错误处理: 完善的错误处理和日志记录 + +**数据转换**: +- 将 MongoDB 中的短链接记录(包含 slug、origin、创建时间等)转换为 ClickHouse 表结构 +- 处理特殊字段如日期时间、标签数组等 +- 转换字段包括: link_id、original_url、created_at、created_by、title、description、tags、is_active、expires_at、team_id、project_id + +**执行流程**: +1. 从 Windmill 变量获取 MongoDB 和 ClickHouse 连接配置 +2. 获取上次同步状态(时间戳和记录ID) +3. 连接 MongoDB,批量查询符合条件的新记录 +4. 检查这些记录是否已存在于 ClickHouse +5. 转换数据格式并生成 SQL 插入语句 +6. 执行插入操作并记录结果 +7. 更新同步状态,为下次同步做准备 + +## 2. sync_shorturl_event_from_mongo.ts + +**功能**: 将 MongoDB 中的短链接点击事件数据(trace 表)同步到 ClickHouse 的 link_events 表 + +**主要特点**: +- 与第一个脚本类似,但处理的是访问事件数据 +- 默认批量处理规模更大(1000 条/批次) +- 超时时间更长(60 分钟) +- 支持完整的事件元数据保存 + +**数据转换**: +- 将 MongoDB 中的访问事件记录转换为 ClickHouse 事件表结构 +- 记录的字段更丰富,包括: + - link_id: 短链接ID + - visitor_id: 访客ID + - session_id: 会话ID + - event_type: 事件类型(点击、转化等) + - 设备信息: ip_address、user_agent、device_type、browser、os + - 来源信息: referrer、utm 参数 + - 行为数据: time_spent_sec、is_bounce、conversion_type 等 + +**执行流程**: +与第一个脚本基本相同,但处理的是 trace 表的数据,并且将其转换为 link_events 表所需的格式。 + +## 两者共同点: + +1. **增量同步机制**: 记录同步状态,每次只处理新数据 +2. **容错设计**: 超时控制、错误处理、异常恢复机制 +3. **配置灵活**: 可通过参数控制批量大小、超时时间等 +4. **数据验证**: 确保已同步数据不会重复 +5. **详细日志**: 记录同步过程中的关键事件和状态 + +这两个脚本共同构成了短链接分析系统的数据管道,实现了从 MongoDB(可能是原始数据存储)到 ClickHouse(分析型数据库)的数据迁移,为短链接分析平台提供数据基础。