From 7e6356cf179d2a8af6d8a5fe97fa0c0056e90c13 Mon Sep 17 00:00:00 2001 From: William Tso Date: Fri, 21 Mar 2025 22:40:03 +0800 Subject: [PATCH] sync script & env --- windmill/sync_shorturl_event_from_mongo.ts | 403 ++++++++++--- .../sync_shorturl_from_mongo_to_clickhouse.ts | 531 ++++++++++++++++++ 2 files changed, 849 insertions(+), 85 deletions(-) create mode 100644 windmill/sync_shorturl_from_mongo_to_clickhouse.ts diff --git a/windmill/sync_shorturl_event_from_mongo.ts b/windmill/sync_shorturl_event_from_mongo.ts index 1cdec8d..6e635a7 100644 --- a/windmill/sync_shorturl_event_from_mongo.ts +++ b/windmill/sync_shorturl_event_from_mongo.ts @@ -1,5 +1,5 @@ // 从MongoDB的trace表同步数据到ClickHouse的link_events表 -import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.50.0/mod.ts"; +import { getVariable, setVariable } from "npm:windmill-client@1"; import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; interface MongoConfig { @@ -40,14 +40,81 @@ interface SyncState { } export async function main( - batch_size = 10, // 默认一次只同步10条记录 + batch_size = 1000, // 减小批处理大小为5 initial_sync = false, + max_records = 9999999, // 只同步10条记录用于测试 + timeout_minutes = 60, // 减少超时时间为5分钟 + skip_clickhouse_check = false, // 是否跳过ClickHouse重复检查 + force_insert = false // 强制插入所有记录,不检查是否已存在 ) { - console.log("开始执行MongoDB到ClickHouse的同步任务..."); + const logWithTimestamp = (message: string) => { + const now = new Date(); + console.log(`[${now.toISOString()}] ${message}`); + }; + + logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); + logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); + if (skip_clickhouse_check) { + logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); + } + if (force_insert) { + logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); + } + + // 设置超时 + const startTime = Date.now(); + const timeoutMs = timeout_minutes * 60 * 1000; + + // 检查是否超时 + const checkTimeout = () => { + if (Date.now() - startTime > timeoutMs) { + console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); + return true; + } + return false; + }; // 获取MongoDB和ClickHouse的连接信息 - const mongoConfig = await getResource("u/vitalitymailg/mongodb"); - const clickhouseConfig = await getResource("u/vitalitymailg/clickhouse"); + let mongoConfig: MongoConfig; + let clickhouseConfig: ClickHouseConfig; + + try { + const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); + console.log("原始MongoDB配置:", JSON.stringify(rawMongoConfig)); + + // 尝试解析配置,如果是字符串形式 + if (typeof rawMongoConfig === "string") { + try { + mongoConfig = JSON.parse(rawMongoConfig); + } catch (e) { + console.error("MongoDB配置解析失败:", e); + throw e; + } + } else { + mongoConfig = rawMongoConfig as MongoConfig; + } + + const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); + console.log("原始ClickHouse配置:", JSON.stringify(rawClickhouseConfig)); + + // 尝试解析配置,如果是字符串形式 + if (typeof rawClickhouseConfig === "string") { + try { + clickhouseConfig = JSON.parse(rawClickhouseConfig); + } catch (e) { + console.error("ClickHouse配置解析失败:", e); + throw e; + } + } else { + clickhouseConfig = rawClickhouseConfig as ClickHouseConfig; + } + + console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig)); + console.log("ClickHouse配置解析为:", JSON.stringify(clickhouseConfig)); + } catch (error) { + console.error("获取配置失败:", error); + throw error; + } // 构建MongoDB连接URL let mongoUrl = "mongodb://"; @@ -61,9 +128,15 @@ export async function main( // 获取上次同步的状态 let syncState: SyncState; try { - syncState = await getVariable("shorturl_sync_state"); - console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); - } catch (_err) { + const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state"); + try { + syncState = JSON.parse(rawSyncState); + console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); + } catch (parseError) { + console.error("解析同步状态失败:", parseError); + throw parseError; + } + } catch (_unused_error) { console.log("未找到同步状态,创建初始同步状态"); syncState = { last_sync_time: 0, @@ -106,6 +179,10 @@ export async function main( const totalRecords = await traceCollection.countDocuments(query); console.log(`找到 ${totalRecords} 条新记录需要同步`); + // 限制此次处理的记录数量 + const recordsToProcess = Math.min(totalRecords, max_records); + console.log(`本次将处理 ${recordsToProcess} 条记录`); + if (totalRecords === 0) { console.log("没有新记录需要同步,任务完成"); return { @@ -122,57 +199,165 @@ export async function main( let lastCreateTime = syncState.last_sync_time; let totalBatchRecords = 0; + // 检查ClickHouse连接状态 + const checkClickHouseConnection = async (): Promise => { + if (skip_clickhouse_check) { + logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); + return true; + } + + try { + logWithTimestamp("测试ClickHouse连接..."); + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, + }, + body: "SELECT 1", + // 设置5秒超时 + signal: AbortSignal.timeout(5000) + }); + + if (response.ok) { + logWithTimestamp("ClickHouse连接测试成功"); + return true; + } else { + const errorText = await response.text(); + logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); + return false; + } + } catch (err) { + const error = err as Error; + logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`); + return false; + } + }; + // 检查记录是否已经存在于ClickHouse中 const checkExistingRecords = async (records: TraceRecord[]): Promise => { if (records.length === 0) return []; - // 提取所有记录的ID - const recordIds = records.map(record => record._id.toString()); - - // 构建查询SQL,检查记录是否已存在 - const query = ` - SELECT id - FROM ${clickhouseConfig.clickhouse_database}.link_events - WHERE id IN ('${recordIds.join("','")}') - `; - - // 发送请求到ClickHouse - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` - }, - body: query - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); + // 如果跳过ClickHouse检查或强制插入,则直接返回所有记录 + if (skip_clickhouse_check || force_insert) { + logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`); + return records; } - // 解析结果 - const result = await response.json(); + logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`); - // 提取已存在的记录ID - const existingIds = new Set(result.map((row: { id: string }) => row.id)); - - console.log(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); - - // 过滤出不存在的记录 - return records.filter(record => !existingIds.has(record._id.toString())); + try { + // 提取所有记录的ID + const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询 + logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`); + + // 构建查询SQL,检查记录是否已存在,确保添加FORMAT JSON来获取正确的JSON格式响应 + const query = ` + SELECT link_id + FROM ${clickhouseConfig.clickhouse_database}.link_events + WHERE link_id IN ('${recordIds.join("','")}') + FORMAT JSON + `; + + logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`); + + // 发送请求到ClickHouse,添加10秒超时 + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` + }, + body: query, + signal: AbortSignal.timeout(10000) + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); + } + + // 获取响应文本以便记录 + const responseText = await response.text(); + logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`); + + if (!responseText.trim()) { + logWithTimestamp("ClickHouse返回空响应,假定没有记录存在"); + return records; // 如果响应为空,假设没有记录 + } + + // 解析结果 + let result; + try { + result = JSON.parse(responseText); + } catch (err) { + logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`); + throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`); + } + + // 确保result有正确的结构 + if (!result.data) { + logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`); + return records; // 如果没有data字段,假设没有记录 + } + + // 提取已存在的记录ID + const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id)); + + logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); + if (existingIds.size > 0) { + logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`); + } + + // 过滤出不存在的记录 + const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id + logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); + + return newRecords; + } catch (err) { + const error = err as Error; + logWithTimestamp(`ClickHouse查询出错: ${error.message}`); + if (skip_clickhouse_check) { + logWithTimestamp("已启用跳过ClickHouse检查,将继续处理所有记录"); + return records; + } else { + throw error; // 如果没有启用跳过检查,则抛出错误 + } + } }; + // 在处理记录前先检查ClickHouse连接 + const clickhouseConnected = await checkClickHouseConnection(); + if (!clickhouseConnected && !skip_clickhouse_check) { + logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); + throw new Error("ClickHouse连接失败,无法继续同步"); + } + // 处理记录的函数 const processRecords = async (records: TraceRecord[]) => { if (records.length === 0) return 0; + logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); + // 检查记录是否已存在 - const newRecords = await checkExistingRecords(records); + let newRecords; + try { + newRecords = await checkExistingRecords(records); + } catch (err) { + const error = err as Error; + logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); + if (!skip_clickhouse_check && !force_insert) { + throw error; + } + // 如果跳过检查或强制插入,则使用所有记录 + logWithTimestamp("将使用所有记录进行处理"); + newRecords = records; + } if (newRecords.length === 0) { - console.log("所有记录都已存在,跳过处理"); + logWithTimestamp("所有记录都已存在,跳过处理"); // 更新同步状态,即使没有新增记录 const lastRecord = records[records.length - 1]; lastId = lastRecord._id.toString(); @@ -180,24 +365,36 @@ export async function main( return 0; } - console.log(`处理 ${newRecords.length} 条新记录`); + logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`); // 准备ClickHouse插入数据 const clickhouseData = newRecords.map(record => { - // 转换MongoDB记录为ClickHouse格式 + // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 return { - id: record._id.toString(), - slug_id: record.slugId.toString(), - label: record.label || "", - ip: record.ip, - type: record.type, - platform: record.platform || "", - platform_os: record.platformOS || "", + // UUID将由ClickHouse自动生成 (event_id) + link_id: record.slugId.toString(), + channel_id: record.label || "", + visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID + session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID + event_type: record.type <= 4 ? record.type : 1, // 确保event_type在枚举范围内 + ip_address: record.ip, + country: "", // 这些字段在MongoDB中不存在,使用默认值 + city: "", + referrer: record.url || "", + utm_source: "", + utm_medium: "", + utm_campaign: "", + user_agent: record.browser + " " + record.browserVersion, + device_type: record.platform === "mobile" ? 1 : (record.platform === "tablet" ? 2 : 3), browser: record.browser || "", - browser_version: record.browserVersion || "", - url: record.url || "", - created_at: new Date(record.createTime).toISOString(), - created_time: record.createTime + os: record.platformOS || "", + time_spent_sec: 0, + is_bounce: true, + is_qr_scan: false, + qr_code_id: "", + conversion_type: 1, // 默认为'visit' + conversion_value: 0, + custom_data: `{"mongo_id":"${record._id.toString()}"}` }; }); @@ -205,64 +402,100 @@ export async function main( const lastRecord = records[records.length - 1]; lastId = lastRecord._id.toString(); lastCreateTime = lastRecord.createTime; + logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`); // 生成ClickHouse插入SQL - const values = clickhouseData.map(record => - `('${record.id}', '${record.slug_id}', '${record.label.replace(/'/g, "''")}', '${record.ip}', ${record.type}, '${record.platform.replace(/'/g, "''")}', '${record.platform_os.replace(/'/g, "''")}', '${record.browser.replace(/'/g, "''")}', '${record.browser_version.replace(/'/g, "''")}', '${record.url.replace(/'/g, "''")}', '${record.created_at}')` - ).join(", "); + const insertSQL = ` + INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events + (link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city, + referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os, + time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data) + VALUES ${clickhouseData.map(record => + `('${record.link_id}', '${record.channel_id.replace(/'/g, "''")}', '${record.visitor_id}', '${record.session_id}', + ${record.event_type}, '${record.ip_address}', '', '', + '${record.referrer.replace(/'/g, "''")}', '', '', '', '${record.user_agent.replace(/'/g, "''")}', ${record.device_type}, + '${record.browser.replace(/'/g, "''")}', '${record.os.replace(/'/g, "''")}', + 0, true, false, '', 1, 0, '${record.custom_data}')` + ).join(", ")} + `; - if (values.length === 0) { + if (insertSQL.length === 0) { console.log("没有新记录需要插入"); return 0; } - const insertSQL = ` - INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events - (id, slug_id, label, ip, type, platform, platform_os, browser, browser_version, url, created_at) - VALUES ${values} - `; - - // 发送请求到ClickHouse + // 发送请求到ClickHouse,添加20秒超时 const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` - }, - body: insertSQL - }); + try { + logWithTimestamp("发送插入请求到ClickHouse..."); + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` + }, + body: insertSQL, + signal: AbortSignal.timeout(20000) + }); - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); + } + + logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`); + return newRecords.length; + } catch (err) { + const error = err as Error; + logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`); + throw error; } - - console.log(`成功插入 ${newRecords.length} 条记录到ClickHouse`); - return newRecords.length; }; // 批量处理记录 - for (let page = 0; processedRecords < totalRecords; page++) { + for (let page = 0; processedRecords < recordsToProcess; page++) { + // 检查超时 + if (checkTimeout()) { + logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); + break; + } + + // 每批次都输出进度 + logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); const records = await traceCollection.find(query) .sort({ createTime: 1, _id: 1 }) .skip(page * batch_size) .limit(batch_size) .toArray(); - if (records.length === 0) break; + if (records.length === 0) { + logWithTimestamp(`第 ${page+1} 批次没有找到数据,结束处理`); + break; + } + + logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); + // 输出当前批次的部分数据信息 + if (records.length > 0) { + logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`); + if (records.length > 1) { + logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); + } + } const batchSize = await processRecords(records); processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在 totalBatchRecords += batchSize; // 只增加实际插入的记录数 - console.log(`已处理 ${processedRecords}/${totalRecords} 条记录,实际插入 ${totalBatchRecords} 条`); + logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); // 更新查询条件,以便下一批次查询 query.createTime = { $gt: lastCreateTime }; if (lastId) { query._id = { $gt: new ObjectId(lastId) }; } + logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`); } // 更新同步状态 @@ -272,7 +505,7 @@ export async function main( last_sync_id: lastId }; - await setVariable("shorturl_sync_state", newSyncState); + await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState)); console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`); return { diff --git a/windmill/sync_shorturl_from_mongo_to_clickhouse.ts b/windmill/sync_shorturl_from_mongo_to_clickhouse.ts new file mode 100644 index 0000000..18e1e94 --- /dev/null +++ b/windmill/sync_shorturl_from_mongo_to_clickhouse.ts @@ -0,0 +1,531 @@ +// 从MongoDB的short表同步数据到ClickHouse的links表 +import { getVariable, setVariable } from "npm:windmill-client@1"; +import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; + +interface MongoConfig { + host: string; + port: string; + db: string; + username: string; + password: string; +} + +interface ClickHouseConfig { + clickhouse_host: string; + clickhouse_port: number; + clickhouse_user: string; + clickhouse_password: string; + clickhouse_database: string; + clickhouse_url: string; +} + +interface ShortRecord { + _id: ObjectId; + slug: string; // 短链接的slug部分 + url: string; // 原始URL + createTime: number; // 创建时间戳 + user: string; // 创建用户 + title?: string; // 标题 + description?: string; // 描述 + tags?: string[]; // 标签 + active?: boolean; // 是否活跃 + expiresAt?: number; // 过期时间戳 + teamId?: string; // 团队ID + projectId?: string; // 项目ID +} + +interface SyncState { + last_sync_time: number; + records_synced: number; + last_sync_id?: string; +} + +export async function main( + batch_size = 50, + initial_sync = false, + max_records = 1000, + timeout_minutes = 30, + skip_clickhouse_check = false, + force_insert = false +) { + const logWithTimestamp = (message: string) => { + const now = new Date(); + console.log(`[${now.toISOString()}] ${message}`); + }; + + logWithTimestamp("开始执行MongoDB到ClickHouse的短链接同步任务..."); + logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); + if (skip_clickhouse_check) { + logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); + } + if (force_insert) { + logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); + } + + // 设置超时 + const startTime = Date.now(); + const timeoutMs = timeout_minutes * 60 * 1000; + + // 检查是否超时 + const checkTimeout = () => { + if (Date.now() - startTime > timeoutMs) { + console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); + return true; + } + return false; + }; + + // 获取MongoDB和ClickHouse的连接信息 + let mongoConfig: MongoConfig; + let clickhouseConfig: ClickHouseConfig; + + try { + const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); + console.log("原始MongoDB配置:", typeof rawMongoConfig === "string" ? rawMongoConfig : JSON.stringify(rawMongoConfig)); + + // 尝试解析配置,如果是字符串形式 + if (typeof rawMongoConfig === "string") { + try { + mongoConfig = JSON.parse(rawMongoConfig); + } catch (e) { + console.error("MongoDB配置解析失败:", e); + throw e; + } + } else { + mongoConfig = rawMongoConfig as MongoConfig; + } + + const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); + console.log("原始ClickHouse配置:", typeof rawClickhouseConfig === "string" ? rawClickhouseConfig : JSON.stringify(rawClickhouseConfig)); + + // 尝试解析配置,如果是字符串形式 + if (typeof rawClickhouseConfig === "string") { + try { + clickhouseConfig = JSON.parse(rawClickhouseConfig); + } catch (e) { + console.error("ClickHouse配置解析失败:", e); + throw e; + } + } else { + clickhouseConfig = rawClickhouseConfig as ClickHouseConfig; + } + + console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig)); + console.log("ClickHouse配置解析为:", JSON.stringify(clickhouseConfig)); + } catch (error) { + console.error("获取配置失败:", error); + throw error; + } + + // 构建MongoDB连接URL + let mongoUrl = "mongodb://"; + if (mongoConfig.username && mongoConfig.password) { + mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; + } + mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; + + console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); + + // 获取上次同步的状态 + let syncState: SyncState; + try { + const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_links_sync_state"); + try { + syncState = JSON.parse(rawSyncState); + console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); + } catch (parseError) { + console.error("解析同步状态失败:", parseError); + throw parseError; + } + } catch (_unused_error) { + console.log("未找到同步状态,创建初始同步状态"); + syncState = { + last_sync_time: 0, + records_synced: 0, + }; + } + + // 如果强制从头开始同步 + if (initial_sync) { + console.log("强制从头开始同步"); + syncState = { + last_sync_time: 0, + records_synced: 0, + }; + } + + // 连接MongoDB + const client = new MongoClient(); + try { + await client.connect(mongoUrl); + console.log("MongoDB连接成功"); + + const db = client.database(mongoConfig.db); + const shortCollection = db.collection("short"); + + // 构建查询条件,只查询新的记录 + const query: Record = {}; + + if (syncState.last_sync_time > 0) { + query.createTime = { $gt: syncState.last_sync_time }; + } + + if (syncState.last_sync_id) { + // 如果有上次同步的ID,则从该ID之后开始查询 + query._id = { $gt: new ObjectId(syncState.last_sync_id) }; + } + + // 计算总记录数 + const totalRecords = await shortCollection.countDocuments(query); + console.log(`找到 ${totalRecords} 条新短链接记录需要同步`); + + // 限制此次处理的记录数量 + const recordsToProcess = Math.min(totalRecords, max_records); + console.log(`本次将处理 ${recordsToProcess} 条记录`); + + if (totalRecords === 0) { + console.log("没有新记录需要同步,任务完成"); + return { + success: true, + records_synced: 0, + total_synced: syncState.records_synced, + message: "没有新记录需要同步" + }; + } + + // 分批处理记录 + let processedRecords = 0; + let lastId: string | undefined; + let lastCreateTime = syncState.last_sync_time; + let totalBatchRecords = 0; + + // 检查ClickHouse连接状态 + const checkClickHouseConnection = async (): Promise => { + if (skip_clickhouse_check) { + logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); + return true; + } + + try { + logWithTimestamp("测试ClickHouse连接..."); + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, + }, + body: "SELECT 1 FORMAT JSON", + signal: AbortSignal.timeout(5000) + }); + + if (response.ok) { + logWithTimestamp("ClickHouse连接测试成功"); + return true; + } else { + const errorText = await response.text(); + logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); + return false; + } + } catch (err) { + const error = err as Error; + logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`); + return false; + } + }; + + // 检查记录是否已经存在于ClickHouse中 + const checkExistingRecords = async (records: ShortRecord[]): Promise => { + if (records.length === 0) return []; + + // 如果跳过ClickHouse检查或强制插入,则直接返回所有记录 + if (skip_clickhouse_check || force_insert) { + logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`); + return records; + } + + logWithTimestamp(`正在检查 ${records.length} 条短链接记录是否已存在于ClickHouse中...`); + + try { + // 提取所有记录的ID + const recordIds = records.map(record => record.slug); + logWithTimestamp(`待检查的短链接ID: ${recordIds.join(', ')}`); + + // 构建查询SQL,检查记录是否已存在 + const query = ` + SELECT link_id + FROM ${clickhouseConfig.clickhouse_database}.links + WHERE link_id IN ('${recordIds.join("','")}') + FORMAT JSON + `; + + logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`); + + // 发送请求到ClickHouse + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` + }, + body: query, + signal: AbortSignal.timeout(10000) + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); + } + + // 获取响应文本 + const responseText = await response.text(); + logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`); + + if (!responseText.trim()) { + logWithTimestamp("ClickHouse返回空响应,假定没有记录存在"); + return records; + } + + // 解析结果 + let result; + try { + result = JSON.parse(responseText); + } catch (err) { + logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`); + throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`); + } + + // 确保result有正确的结构 + if (!result.data) { + logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`); + return records; + } + + // 提取已存在的记录ID + const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id)); + + logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); + if (existingIds.size > 0) { + logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`); + } + + // 过滤出不存在的记录 + const newRecords = records.filter(record => !existingIds.has(record.slug)); + logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); + + return newRecords; + } catch (err) { + const error = err as Error; + logWithTimestamp(`ClickHouse查询出错: ${error.message}`); + if (skip_clickhouse_check) { + logWithTimestamp("已启用跳过ClickHouse检查,将继续处理所有记录"); + return records; + } else { + throw error; + } + } + }; + + // 在处理记录前先检查ClickHouse连接 + const clickhouseConnected = await checkClickHouseConnection(); + if (!clickhouseConnected && !skip_clickhouse_check) { + logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); + throw new Error("ClickHouse连接失败,无法继续同步"); + } + + // 处理记录的函数 + const processRecords = async (records: ShortRecord[]) => { + if (records.length === 0) return 0; + + logWithTimestamp(`开始处理批次数据,共 ${records.length} 条短链接记录...`); + + // 检查记录是否已存在 + let newRecords; + try { + newRecords = await checkExistingRecords(records); + } catch (err) { + const error = err as Error; + logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); + if (!skip_clickhouse_check && !force_insert) { + throw error; + } + // 如果跳过检查或强制插入,则使用所有记录 + logWithTimestamp("将使用所有记录进行处理"); + newRecords = records; + } + + if (newRecords.length === 0) { + logWithTimestamp("所有记录都已存在,跳过处理"); + // 更新同步状态,即使没有新增记录 + const lastRecord = records[records.length - 1]; + lastId = lastRecord._id.toString(); + lastCreateTime = lastRecord.createTime; + return 0; + } + + logWithTimestamp(`准备处理 ${newRecords.length} 条新短链接记录...`); + + // 准备ClickHouse插入数据 + const clickhouseData = newRecords.map(record => { + // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 + // 处理日期时间,移除ISO格式中的Z以使ClickHouse正确解析 + const createdAtStr = new Date(record.createTime).toISOString().replace('Z', ''); + const expiresAtStr = record.expiresAt ? new Date(record.expiresAt).toISOString().replace('Z', '') : null; + + return { + link_id: record.slug, + original_url: record.url || "", + created_at: createdAtStr, + created_by: record.user || "unknown", + title: record.title || (record.url ? record.url.substring(0, 50) : "无标题"), + description: record.description || "", + tags: record.tags || [], + is_active: record.active !== undefined ? record.active : true, + expires_at: expiresAtStr, + team_id: record.teamId || "", + project_id: record.projectId || "" + }; + }); + + // 更新同步状态(使用原始records的最后一条,以确保进度正确) + const lastRecord = records[records.length - 1]; + lastId = lastRecord._id.toString(); + lastCreateTime = lastRecord.createTime; + logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`); + + // 生成ClickHouse插入SQL + // 注意:Array类型需要特殊处理,这里将tags作为JSON字符串处理 + const insertSQL = ` + INSERT INTO ${clickhouseConfig.clickhouse_database}.links + (link_id, original_url, created_at, created_by, title, description, tags, is_active, expires_at, team_id, project_id) + VALUES + ${clickhouseData.map(record => { + // 处理tags数组 + const tagsStr = JSON.stringify(record.tags || []); + + // 处理expires_at可能为null的情况 + const expiresAt = record.expires_at ? `'${record.expires_at}'` : "NULL"; + + // 确保所有字段在使用replace前都有默认值 + const safeOriginalUrl = (record.original_url || "").replace(/'/g, "''"); + const safeCreatedBy = (record.created_by || "unknown").replace(/'/g, "''"); + const safeTitle = (record.title || "无标题").replace(/'/g, "''"); + const safeDescription = (record.description || "").replace(/'/g, "''"); + const safeTeamId = record.team_id || ""; + const safeProjectId = record.project_id || ""; + + return `('${record.link_id}', '${safeOriginalUrl}', '${record.created_at}', '${safeCreatedBy}', '${safeTitle}', '${safeDescription}', ${tagsStr}, ${record.is_active}, ${expiresAt}, '${safeTeamId}', '${safeProjectId}')`; + }).join(", ")} + `; + + if (clickhouseData.length === 0) { + console.log("没有新记录需要插入"); + return 0; + } + + // 发送请求到ClickHouse + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + try { + logWithTimestamp("发送插入请求到ClickHouse..."); + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` + }, + body: insertSQL, + signal: AbortSignal.timeout(20000) + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); + } + + logWithTimestamp(`成功插入 ${newRecords.length} 条短链接记录到ClickHouse`); + return newRecords.length; + } catch (err) { + const error = err as Error; + logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`); + throw error; + } + }; + + // 批量处理记录 + for (let page = 0; processedRecords < recordsToProcess; page++) { + // 检查超时 + if (checkTimeout()) { + logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); + break; + } + + // 每批次都输出进度 + logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); + const records = await shortCollection.find(query) + .sort({ createTime: 1, _id: 1 }) + .skip(page * batch_size) + .limit(batch_size) + .toArray(); + + if (records.length === 0) { + logWithTimestamp(`第 ${page+1} 批次没有找到数据,结束处理`); + break; + } + + logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); + // 输出当前批次的部分数据信息 + if (records.length > 0) { + logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, Slug=${records[0].slug}, 时间=${new Date(records[0].createTime).toISOString()}`); + if (records.length > 1) { + logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, Slug=${records[records.length-1].slug}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); + } + } + + const batchSize = await processRecords(records); + processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在 + totalBatchRecords += batchSize; // 只增加实际插入的记录数 + + logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + // 更新查询条件,以便下一批次查询 + query.createTime = { $gt: lastCreateTime }; + if (lastId) { + query._id = { $gt: new ObjectId(lastId) }; + } + logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`); + } + + // 更新同步状态 + const newSyncState: SyncState = { + last_sync_time: lastCreateTime, + records_synced: syncState.records_synced + totalBatchRecords, + last_sync_id: lastId + }; + + await setVariable("f/shorturl_analytics/clickhouse/shorturl_links_sync_state", JSON.stringify(newSyncState)); + console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`); + + return { + success: true, + records_processed: processedRecords, + records_synced: totalBatchRecords, + total_synced: newSyncState.records_synced, + last_sync_time: new Date(newSyncState.last_sync_time).toISOString(), + message: "短链接数据同步完成" + }; + } catch (err) { + console.error("同步过程中发生错误:", err); + return { + success: false, + error: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined + }; + } finally { + // 关闭MongoDB连接 + await client.close(); + console.log("MongoDB连接已关闭"); + } +}