From fe40aad835346527dd7f2f248686dd32a4f5dc24 Mon Sep 17 00:00:00 2001 From: William Tso Date: Thu, 24 Apr 2025 00:09:24 +0800 Subject: [PATCH] Enhance MongoDB to ClickHouse synchronization script by adding support for custom time range synchronization, allowing users to specify start and end dates. Update .env file to include MongoDB connection URL and add .gitignore for script dependencies. --- .env | 4 +- windmill/scripts/.gitignore | 2 + windmill/scripts/package.json | 19 + windmill/scripts/sync_mongo_to_clickhouse.js | 714 +++++++++++++++++++ windmill/sync_mongo_to_events.ts | 138 +++- 5 files changed, 858 insertions(+), 19 deletions(-) create mode 100644 windmill/scripts/.gitignore create mode 100644 windmill/scripts/package.json create mode 100644 windmill/scripts/sync_mongo_to_clickhouse.js diff --git a/.env b/.env index 7b13ba4..2c9d645 100644 --- a/.env +++ b/.env @@ -1,5 +1,7 @@ PORT=3007 +MONGO_URL="mongodb://10.0.1.41:27017" + # ClickHouse Configuration CLICKHOUSE_HOST=10.0.1.60 CLICKHOUSE_PORT=8123 @@ -26,4 +28,4 @@ DATABASE_URL="postgresql://postgres.mwwvqwevplndzvmqmrxa:eYYdarJsRL*Z6&p9gD@aws- NEXT_PUBLIC_LIMQ_API="https://app.upj.to" # Application URL for redirects (replace with your production URL) -NEXT_PUBLIC_SITE_URL="https://main.upj.to/login" \ No newline at end of file +NEXT_PUBLIC_SITE_URL="https://main.upj.to" \ No newline at end of file diff --git a/windmill/scripts/.gitignore b/windmill/scripts/.gitignore new file mode 100644 index 0000000..d502512 --- /dev/null +++ b/windmill/scripts/.gitignore @@ -0,0 +1,2 @@ +/node_modules +/package-lock.json diff --git a/windmill/scripts/package.json b/windmill/scripts/package.json new file mode 100644 index 0000000..712fbb7 --- /dev/null +++ b/windmill/scripts/package.json @@ -0,0 +1,19 @@ +{ + "name": "scripts", + "version": "1.0.0", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [], + "author": "", + "license": "ISC", + "description": "", + "dependencies": { + "date-fns": "^4.1.0", + "dotenv": "^16.5.0", + "fs-extra": "^11.3.0", + "mongodb": "^6.16.0", + "node-fetch": "^2.7.0" + } +} diff --git a/windmill/scripts/sync_mongo_to_clickhouse.js b/windmill/scripts/sync_mongo_to_clickhouse.js new file mode 100644 index 0000000..e7d48f4 --- /dev/null +++ b/windmill/scripts/sync_mongo_to_clickhouse.js @@ -0,0 +1,714 @@ +// 从MongoDB的trace表同步数据到ClickHouse的events表 +// +// 支持以下同步模式: +// 1. 增量同步:基于上次同步状态,只同步新数据(默认模式) +// 2. 自定义时间范围同步:通过指定开始时间和结束时间,同步特定时间范围内的数据 +// - 可以通过时间戳参数(startTime/endTime)指定范围 +// - 也可以通过日期字符串参数(startDate/endDate)指定范围,支持ISO格式或yyyy-MM-dd格式 + +const { MongoClient, ObjectId } = require('mongodb'); +const fs = require('fs'); +const path = require('path'); +const fetch = require('node-fetch'); + +// 同步状态键名和保存路径 +const SYNC_STATE_FILE = path.join(__dirname, 'mongo_sync_state.json'); + +// 直接使用配置值 +const mongoConfig = { + url: "mongodb://10.0.1.41:27017", + db: "main" // 注意:请替换为您的实际数据库名称 +}; + +const clickhouseConfig = { + clickhouse_host: "10.0.1.60", + clickhouse_port: "8123", + clickhouse_user: "admin", + clickhouse_password: "your_secure_password", + clickhouse_database: "shorturl_analytics", + clickhouse_url: "http://10.0.1.60:8123" +}; + +// 封装本地读取变量函数 +async function getVariable(key) { + try { + if (key === 'f/shorturl_analytics/mongodb') { + return mongoConfig; + } else if (key === 'f/shorturl_analytics/clickhouse') { + return clickhouseConfig; + } else if (key === 'f/shorturl_analytics/mongo_sync_state') { + if (fs.existsSync(SYNC_STATE_FILE)) { + return JSON.parse(fs.readFileSync(SYNC_STATE_FILE, 'utf8')); + } + } + return null; + } catch (error) { + console.error(`获取变量失败: ${error.message}`); + return null; + } +} + +// 封装本地保存变量函数 +async function setVariable(key, value) { + try { + if (key === 'f/shorturl_analytics/mongo_sync_state') { + fs.writeFileSync(SYNC_STATE_FILE, JSON.stringify(value, null, 2)); + } + } catch (error) { + console.error(`保存变量失败: ${error.message}`); + throw error; + } +} + +// 日期字符串转时间戳工具函数(接受ISO字符串或yyyy-MM-dd格式) +function dateToTimestamp(dateStr) { + try { + // 尝试直接解析完整的ISO日期字符串 + const date = new Date(dateStr); + + // 检查是否为有效日期 + if (isNaN(date.getTime())) { + // 尝试解析yyyy-MM-dd格式,默认设置为当天的00:00:00 + const parts = dateStr.split('-'); + if (parts.length === 3) { + const year = parseInt(parts[0], 10); + const month = parseInt(parts[1], 10) - 1; // 月份从0开始 + const day = parseInt(parts[2], 10); + + const dateObj = new Date(year, month, day, 0, 0, 0); + return dateObj.getTime(); + } + throw new Error(`无法解析日期字符串: ${dateStr}`); + } + + return date.getTime(); + } catch (err) { + throw new Error(`日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } +} + +// 从URL中提取UTM参数的函数,增强版 +function extractUtmParams(url, debug = false) { + const defaultUtmParams = { + utm_source: "", + utm_medium: "", + utm_campaign: "", + utm_term: "", + utm_content: "" + }; + + if (!url) return defaultUtmParams; + + if (debug) { + console.log(`[UTM提取] 原始URL: ${url}`); + } + + // 准备一个解析后的参数对象 + const params = { ...defaultUtmParams }; + + // 尝试多种方法提取UTM参数 + + // 方法1: 使用URL对象解析 + try { + // 先处理URL,确保是完整的URL格式 + let normalizedUrl = url; + if (!url.match(/^https?:\/\//i)) { + normalizedUrl = `https://example.com${url.startsWith('/') ? '' : '/'}${url}`; + } + + const urlObj = new URL(normalizedUrl); + + // 读取URL参数 + if (urlObj.searchParams.has('utm_source')) + params.utm_source = urlObj.searchParams.get('utm_source') || ""; + if (urlObj.searchParams.has('utm_medium')) + params.utm_medium = urlObj.searchParams.get('utm_medium') || ""; + if (urlObj.searchParams.has('utm_campaign')) + params.utm_campaign = urlObj.searchParams.get('utm_campaign') || ""; + if (urlObj.searchParams.has('utm_term')) + params.utm_term = urlObj.searchParams.get('utm_term') || ""; + if (urlObj.searchParams.has('utm_content')) + params.utm_content = urlObj.searchParams.get('utm_content') || ""; + + if (debug) { + console.log(`[UTM提取] URL对象解析结果: ${JSON.stringify(params)}`); + } + + // 如果至少找到一个UTM参数,则返回 + if (params.utm_source || params.utm_medium || params.utm_campaign || + params.utm_term || params.utm_content) { + return params; + } + } catch (err) { + if (debug) { + console.log(`[UTM提取] URL对象解析失败,尝试正则表达式`); + } + } + + // 方法2: 使用正则表达式提取参数 + // 使用正则表达式(最安全的方法,适用于任何格式) + const sourceMatch = url.match(/[?&]utm_source=([^&#]+)/i); + if (sourceMatch && sourceMatch[1]) { + try { + params.utm_source = decodeURIComponent(sourceMatch[1]); + } catch (err) { + params.utm_source = sourceMatch[1]; + } + } + + const mediumMatch = url.match(/[?&]utm_medium=([^&#]+)/i); + if (mediumMatch && mediumMatch[1]) { + try { + params.utm_medium = decodeURIComponent(mediumMatch[1]); + } catch (err) { + params.utm_medium = mediumMatch[1]; + } + } + + const campaignMatch = url.match(/[?&]utm_campaign=([^&#]+)/i); + if (campaignMatch && campaignMatch[1]) { + try { + params.utm_campaign = decodeURIComponent(campaignMatch[1]); + } catch (err) { + params.utm_campaign = campaignMatch[1]; + } + } + + const termMatch = url.match(/[?&]utm_term=([^&#]+)/i); + if (termMatch && termMatch[1]) { + try { + params.utm_term = decodeURIComponent(termMatch[1]); + } catch (err) { + params.utm_term = termMatch[1]; + } + } + + const contentMatch = url.match(/[?&]utm_content=([^&#]+)/i); + if (contentMatch && contentMatch[1]) { + try { + params.utm_content = decodeURIComponent(contentMatch[1]); + } catch (err) { + params.utm_content = contentMatch[1]; + } + } + + if (debug) { + console.log(`[UTM提取] 正则表达式解析结果: ${JSON.stringify(params)}`); + } + + return params; +} + +// 解析命令行参数 +function parseCommandLineArgs() { + const args = {}; + process.argv.slice(2).forEach(arg => { + if (arg.startsWith('--')) { + const [key, value] = arg.substring(2).split('='); + args[key] = value || true; + } + }); + return args; +} + +async function main() { + const args = parseCommandLineArgs(); + + // 参数设置 + const batch_size = parseInt(args['batch-size'] || '1000'); + const max_records = parseInt(args['max-records'] || '9999999'); + const timeout_minutes = parseInt(args['timeout'] || '60'); + const skip_clickhouse_check = args['skip-clickhouse-check'] === 'true'; + const force_insert = args['force-insert'] !== 'false'; + const database_override = args['database'] || 'shorturl_analytics'; + const reset_sync_state = args['reset-sync-state'] === 'true'; + const debug_utm = args['debug-utm'] === 'true'; + const start_time = args['start-time'] ? parseInt(args['start-time']) : undefined; + const end_time = args['end-time'] ? parseInt(args['end-time']) : undefined; + const use_custom_time_range = args['use-custom-time-range'] === 'true'; + const start_date = args['start-date']; + const end_date = args['end-date']; + + const logWithTimestamp = (message) => { + const now = new Date(); + console.log(`[${now.toISOString()}] ${message}`); + }; + + logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); + logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); + + let customStartTime = start_time; + let customEndTime = end_time; + let useCustomTimeRange = use_custom_time_range; + + // 处理日期字符串参数,转换为时间戳 + if (start_date) { + try { + customStartTime = dateToTimestamp(start_date); + logWithTimestamp(`将开始日期 ${start_date} 转换为时间戳 ${customStartTime}`); + useCustomTimeRange = true; + } catch (err) { + logWithTimestamp(`开始日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } + } + + if (end_date) { + try { + customEndTime = dateToTimestamp(end_date); + // 如果是日期格式,设置为当天结束时间 (23:59:59.999) + if (end_date.split('-').length === 3 && end_date.length <= 10) { + customEndTime += 24 * 60 * 60 * 1000 - 1; // 加上23:59:59.999 + logWithTimestamp(`将结束日期 ${end_date} 转换为当天结束时间戳 ${customEndTime}`); + } else { + logWithTimestamp(`将结束日期 ${end_date} 转换为时间戳 ${customEndTime}`); + } + useCustomTimeRange = true; + } catch (err) { + logWithTimestamp(`结束日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } + } + + if (skip_clickhouse_check) { + logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); + } + if (force_insert) { + logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); + } + if (reset_sync_state) { + logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据"); + } + if (debug_utm) { + logWithTimestamp("已启用UTM参数调试日志"); + } + if (useCustomTimeRange) { + if (customStartTime) { + logWithTimestamp(`已启用自定义时间范围:开始时间 ${new Date(customStartTime).toISOString()}`); + } + if (customEndTime) { + logWithTimestamp(`已启用自定义时间范围:结束时间 ${new Date(customEndTime).toISOString()}`); + } + } + + // 设置超时 + const startTime = Date.now(); + const timeoutMs = timeout_minutes * 60 * 1000; + + // 检查是否超时 + const checkTimeout = () => { + if (Date.now() - startTime > timeoutMs) { + console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); + return true; + } + return false; + }; + + // 获取上次同步状态 + let lastSyncState = null; + if (!reset_sync_state) { + try { + const rawSyncState = await getVariable("f/shorturl_analytics/mongo_sync_state"); + if (rawSyncState) { + lastSyncState = rawSyncState; + } + } catch (error) { + logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`); + } + } + + if (lastSyncState) { + logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`); + if (lastSyncState.last_sync_id) { + logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`); + } + } else { + logWithTimestamp("没有找到上次同步状态,将从头开始同步"); + } + + // 连接MongoDB + const client = new MongoClient(mongoConfig.url); + try { + await client.connect(); + console.log("MongoDB连接成功"); + + const db = client.db(mongoConfig.db); + const traceCollection = db.collection("trace"); + const shortCollection = db.collection("short"); + + // 构建查询条件 + const query = { + type: 1 // 只同步type为1的记录 + }; + + // 根据时间范围参数构建查询条件 + if (useCustomTimeRange) { + // 使用自定义时间范围 + const timeQuery = {}; + + if (customStartTime) { + timeQuery.$gte = customStartTime; + logWithTimestamp(`将只同步createTime >= ${customStartTime} (${new Date(customStartTime).toISOString()}) 的记录`); + } + + if (customEndTime) { + timeQuery.$lte = customEndTime; + logWithTimestamp(`将只同步createTime <= ${customEndTime} (${new Date(customEndTime).toISOString()}) 的记录`); + } + + // 只有当至少指定了一个时间限制时才添加时间查询条件 + if (Object.keys(timeQuery).length > 0) { + query.createTime = timeQuery; + } + } + // 如果不使用自定义时间范围,且有上次同步状态,则只获取更新的记录 + else if (lastSyncState && lastSyncState.last_sync_time) { + // 使用上次同步时间作为过滤条件 + query.createTime = { $gt: lastSyncState.last_sync_time }; + logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`); + } + + // 计算总记录数 + const totalRecords = await traceCollection.countDocuments(query); + console.log(`找到 ${totalRecords} 条新记录需要同步`); + + // 限制此次处理的记录数量 + const recordsToProcess = Math.min(totalRecords, max_records); + console.log(`本次将处理 ${recordsToProcess} 条记录`); + + if (totalRecords === 0) { + console.log("没有新记录需要同步,任务完成"); + return { + success: true, + records_synced: 0, + message: "没有新记录需要同步" + }; + } + + // 检查ClickHouse连接状态 + const checkClickHouseConnection = async () => { + if (skip_clickhouse_check) { + logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); + return true; + } + + try { + logWithTimestamp("测试ClickHouse连接..."); + const clickhouseUrl = clickhouseConfig.clickhouse_url; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}`, + }, + body: `SELECT 1 FROM ${clickhouseConfig.clickhouse_database}.events LIMIT 1`, + }); + + if (response.ok) { + logWithTimestamp("ClickHouse连接测试成功"); + return true; + } else { + const errorText = await response.text(); + logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); + return false; + } + } catch (err) { + logWithTimestamp(`ClickHouse连接测试失败: ${err.message}`); + return false; + } + }; + + // 在处理记录前先检查ClickHouse连接 + const clickhouseConnected = await checkClickHouseConnection(); + if (!clickhouseConnected && !skip_clickhouse_check) { + logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); + throw new Error("ClickHouse连接失败,无法继续同步"); + } + + // 处理记录的函数 + const processRecords = async (records) => { + if (records.length === 0) return 0; + + logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); + + // 强制使用所有记录,不检查重复 + const newRecords = records; + + logWithTimestamp(`准备处理 ${newRecords.length} 条记录...`); + + // 获取链接信息 + const slugIds = newRecords.map(record => new ObjectId(record.slugId)); + logWithTimestamp(`正在查询 ${slugIds.length} 条短链接信息...`); + const shortLinks = await shortCollection.find({ + _id: { $in: slugIds } + }).toArray(); + + // 创建映射用于快速查找 + const shortLinksMap = new Map(shortLinks.map((link) => [link._id.toString(), link])); + logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息,${newRecords.length - shortLinks.length} 条数据将使用占位符`); + + // 准备ClickHouse插入数据 + const clickhouseData = newRecords.map(record => { + const eventTime = new Date(record.createTime); + + // 获取对应的短链接信息 + const shortLink = shortLinksMap.get(record.slugId.toString()); + + // 提取URL中的UTM参数 + if (debug_utm && record.url) { + logWithTimestamp(`======== UTM参数调试 ========`); + logWithTimestamp(`记录ID: ${record._id.toString()}`); + logWithTimestamp(`原始URL: ${record.url}`); + } + + const utmParams = extractUtmParams(record.url || "", debug_utm); + + if (debug_utm) { + logWithTimestamp(`提取的UTM参数: ${JSON.stringify(utmParams)}`); + logWithTimestamp(`===========================`); + } + + // 保存提取的UTM参数和URL到event_attributes + const eventAttributes = { + mongo_id: record._id.toString(), + url: record.url || "", + ...(record.url ? { raw_url: record.url } : {}) + }; + + // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 + return { + // UUID将由ClickHouse自动生成 (event_id) + event_time: eventTime.toISOString().replace('T', ' ').replace('Z', ''), + event_type: record.type === 1 ? "visit" : "custom", + event_attributes: JSON.stringify(eventAttributes), + link_id: record.slugId.toString(), + link_slug: shortLink?.slug || "unknown_slug", // 使用占位符 + link_label: record.label || "", + link_title: shortLink?.title || "unknown_title", // 使用占位符 + link_original_url: shortLink?.origin || "https://unknown.url", // 使用占位符 + link_attributes: JSON.stringify({ domain: shortLink?.domain || "unknown_domain" }), // 使用占位符 + link_created_at: shortLink?.createTime + ? new Date(shortLink.createTime).toISOString().replace('T', ' ').replace('Z', '') + : eventTime.toISOString().replace('T', ' ').replace('Z', ''), + link_expires_at: shortLink?.expiresAt + ? new Date(shortLink.expiresAt).toISOString().replace('T', ' ').replace('Z', '') + : null, + link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]", + user_id: shortLink?.user || "unknown_user", // 使用占位符 + user_name: "unknown_user", // 使用占位符 + user_email: "", + user_attributes: "{}", + team_id: shortLink?.teamId || "unknown_team", // 使用占位符 + team_name: "unknown_team", // 使用占位符 + team_attributes: "{}", + project_id: shortLink?.projectId || "unknown_project", // 使用占位符 + project_name: "unknown_project", // 使用占位符 + project_attributes: "{}", + qr_code_id: "", + qr_code_name: "", + qr_code_attributes: "{}", + visitor_id: record._id.toString(), + session_id: record._id.toString() + "-" + record.createTime, + ip_address: record.ip || "0.0.0.0", // 使用占位符 + country: "", + city: "", + device_type: record.platform || "unknown", + browser: record.browser || "unknown", // 使用占位符 + os: record.platformOS || "unknown", // 使用占位符 + user_agent: (record.browser || "unknown") + " " + (record.browserVersion || "unknown"), // 使用占位符 + referrer: record.url || "", + utm_source: utmParams.utm_source || "", + utm_medium: utmParams.utm_medium || "", + utm_campaign: utmParams.utm_campaign || "", + utm_term: utmParams.utm_term || "", + utm_content: utmParams.utm_content || "", + time_spent_sec: 0, + is_bounce: true, + is_qr_scan: false, + conversion_type: "visit", + conversion_value: 0, + req_full_path: record.url || "" + }; + }); + + // 生成ClickHouse插入SQL + const insertSQL = ` + INSERT INTO ${clickhouseConfig.clickhouse_database}.events + (event_time, event_type, event_attributes, link_id, link_slug, link_label, link_title, + link_original_url, link_attributes, link_created_at, link_expires_at, link_tags, + user_id, user_name, user_email, user_attributes, team_id, team_name, team_attributes, + project_id, project_name, project_attributes, qr_code_id, qr_code_name, qr_code_attributes, + visitor_id, session_id, ip_address, country, city, device_type, browser, os, user_agent, + referrer, utm_source, utm_medium, utm_campaign, utm_term, utm_content, time_spent_sec, + is_bounce, is_qr_scan, conversion_type, conversion_value, req_full_path) + VALUES ${clickhouseData.map(record => { + // 确保所有字符串值都是字符串类型,并安全处理替换 + const safeReplace = (val) => { + // 确保值是字符串,如果是null或undefined则使用空字符串 + const str = val === null || val === undefined ? "" : String(val); + // 安全替换单引号 + return str.replace(/'/g, "''"); + }; + + return `('${record.event_time}', '${safeReplace(record.event_type)}', '${safeReplace(record.event_attributes)}', + '${record.link_id}', '${safeReplace(record.link_slug)}', '${safeReplace(record.link_label)}', '${safeReplace(record.link_title)}', + '${safeReplace(record.link_original_url)}', '${safeReplace(record.link_attributes)}', '${record.link_created_at}', + ${record.link_expires_at === null ? 'NULL' : `'${record.link_expires_at}'`}, '${safeReplace(record.link_tags)}', + '${safeReplace(record.user_id)}', '${safeReplace(record.user_name)}', '${safeReplace(record.user_email)}', + '${safeReplace(record.user_attributes)}', '${safeReplace(record.team_id)}', '${safeReplace(record.team_name)}', + '${safeReplace(record.team_attributes)}', '${safeReplace(record.project_id)}', '${safeReplace(record.project_name)}', + '${safeReplace(record.project_attributes)}', '${safeReplace(record.qr_code_id)}', '${safeReplace(record.qr_code_name)}', + '${safeReplace(record.qr_code_attributes)}', '${safeReplace(record.visitor_id)}', '${safeReplace(record.session_id)}', + '${safeReplace(record.ip_address)}', '${safeReplace(record.country)}', '${safeReplace(record.city)}', + '${safeReplace(record.device_type)}', '${safeReplace(record.browser)}', '${safeReplace(record.os)}', + '${safeReplace(record.user_agent)}', '${safeReplace(record.referrer)}', '${safeReplace(record.utm_source)}', + '${safeReplace(record.utm_medium)}', '${safeReplace(record.utm_campaign)}', '${safeReplace(record.utm_term)}', + '${safeReplace(record.utm_content)}', ${record.time_spent_sec}, ${record.is_bounce}, ${record.is_qr_scan}, + '${safeReplace(record.conversion_type)}', ${record.conversion_value}, '${safeReplace(record.req_full_path)}')`; + }).join(", ")} + `; + + if (insertSQL.length === 0) { + console.log("没有新记录需要插入"); + return 0; + } + + // 发送请求到ClickHouse + const clickhouseUrl = clickhouseConfig.clickhouse_url; + try { + logWithTimestamp("发送插入请求到ClickHouse..."); + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}` + }, + body: insertSQL, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); + } + + logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`); + return newRecords.length; + } catch (err) { + logWithTimestamp(`向ClickHouse插入数据失败: ${err.message}`); + throw err; + } + }; + + // 批量处理记录 + let processedRecords = 0; + let totalBatchRecords = 0; + let lastSyncTime = 0; + + for (let page = 0; processedRecords < recordsToProcess; page++) { + // 检查超时 + if (checkTimeout()) { + logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); + break; + } + + // 每批次都输出进度 + logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); + + // 查询MongoDB数据 + const records = await traceCollection.find(query) + .sort({ createTime: 1 }) + .skip(page * batch_size) + .limit(batch_size) + .toArray(); + + if (records.length === 0) { + logWithTimestamp("没有找到更多数据,同步结束"); + break; + } + + // 找到数据,开始处理 + logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); + // 输出当前批次的部分数据信息 + if (records.length > 0) { + logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`); + if (records.length > 1) { + logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); + } + + // 如果开启了调试,输出一些URL样本 + if (debug_utm) { + const sampleSize = Math.min(5, records.length); + logWithTimestamp(`URL样本 (前${sampleSize}条):`); + for (let i = 0; i < sampleSize; i++) { + if (records[i].url) { + logWithTimestamp(`样本 ${i+1}: ${records[i].url}`); + } + } + } + } + + const batchSize = await processRecords(records); + processedRecords += records.length; + totalBatchRecords += batchSize; + + // 更新最后处理的记录时间和ID + if (records.length > 0) { + const lastRecord = records[records.length - 1]; + lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime); + } + + logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + } + + // 更新同步状态 + if (processedRecords > 0 && lastSyncTime > 0) { + // 只在非自定义时间范围模式下更新同步状态 + if (!useCustomTimeRange) { + // 创建新的同步状态,简化对象结构 + const newSyncState = { + last_sync_time: lastSyncTime, + records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords + }; + + try { + // 保存同步状态 + await setVariable("f/shorturl_analytics/mongo_sync_state", newSyncState); + logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); + } catch (err) { + logWithTimestamp(`更新同步状态失败: ${err.message},将继续执行`); + } + } else { + logWithTimestamp("使用自定义时间范围模式,不更新全局同步状态"); + } + } + + return { + success: true, + records_processed: processedRecords, + records_synced: totalBatchRecords, + last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null, + message: useCustomTimeRange ? "自定义时间范围数据同步完成" : "数据同步完成", + custom_time_range_used: useCustomTimeRange + }; + } catch (err) { + console.error("同步过程中发生错误:", err); + return { + success: false, + error: err.message, + stack: err.stack + }; + } finally { + // 关闭MongoDB连接 + await client.close(); + console.log("MongoDB连接已关闭"); + } +} + +// 执行主函数 +main().then(result => { + console.log("任务执行结果:", result); + process.exit(result.success ? 0 : 1); +}).catch(err => { + console.error("执行出错:", err); + process.exit(1); +}); \ No newline at end of file diff --git a/windmill/sync_mongo_to_events.ts b/windmill/sync_mongo_to_events.ts index b3ecc0b..72542bc 100644 --- a/windmill/sync_mongo_to_events.ts +++ b/windmill/sync_mongo_to_events.ts @@ -1,4 +1,12 @@ // 从MongoDB的trace表同步数据到ClickHouse的events表 +// +// 支持以下同步模式: +// 1. 增量同步:基于上次同步状态,只同步新数据(默认模式) +// 2. 自定义时间范围同步:通过指定开始时间和结束时间,同步特定时间范围内的数据 +// - 可以通过时间戳参数(start_time/end_time)指定范围 +// - 也可以通过日期字符串参数(start_date/end_date)指定范围,支持ISO格式或yyyy-MM-dd格式 +// +// 使用自定义时间范围时,将不会更新同步状态,避免干扰增量同步进度 import { getVariable, setVariable } from "npm:windmill-client@1"; import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; @@ -68,6 +76,33 @@ interface UtmParams { // 同步状态键名 const SYNC_STATE_KEY = "f/shorturl_analytics/mongo_sync_state"; +// 日期字符串转时间戳工具函数(接受ISO字符串或yyyy-MM-dd格式) +function dateToTimestamp(dateStr: string): number { + try { + // 尝试直接解析完整的ISO日期字符串 + const date = new Date(dateStr); + + // 检查是否为有效日期 + if (isNaN(date.getTime())) { + // 尝试解析yyyy-MM-dd格式,默认设置为当天的00:00:00 + const parts = dateStr.split('-'); + if (parts.length === 3) { + const year = parseInt(parts[0], 10); + const month = parseInt(parts[1], 10) - 1; // 月份从0开始 + const day = parseInt(parts[2], 10); + + const dateObj = new Date(year, month, day, 0, 0, 0); + return dateObj.getTime(); + } + throw new Error(`无法解析日期字符串: ${dateStr}`); + } + + return date.getTime(); + } catch (err) { + throw new Error(`日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } +} + // 从URL中提取UTM参数的函数,增强版 function extractUtmParams(url: string, debug = false): UtmParams { const defaultUtmParams: UtmParams = { @@ -188,7 +223,12 @@ export async function main( force_insert = true, database_override = "shorturl_analytics", // 添加数据库名称参数,默认为shorturl_analytics reset_sync_state = false, // 添加参数用于重置同步状态 - debug_utm = false // 添加参数控制UTM调试日志输出 + debug_utm = false, // 添加参数控制UTM调试日志输出 + start_time?: number, // 添加参数指定同步的开始时间戳,可选 + end_time?: number, // 添加参数指定同步的结束时间戳,可选 + use_custom_time_range = false, // 添加参数控制是否使用自定义时间范围 + start_date?: string, // 添加开始日期字符串参数(ISO格式或yyyy-MM-dd格式) + end_date?: string // 添加结束日期字符串参数(ISO格式或yyyy-MM-dd格式) ) { const logWithTimestamp = (message: string) => { const now = new Date(); @@ -197,6 +237,34 @@ export async function main( logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); + + // 处理日期字符串参数,转换为时间戳 + if (start_date) { + try { + start_time = dateToTimestamp(start_date); + logWithTimestamp(`将开始日期 ${start_date} 转换为时间戳 ${start_time}`); + use_custom_time_range = true; + } catch (err) { + logWithTimestamp(`开始日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } + } + + if (end_date) { + try { + end_time = dateToTimestamp(end_date); + // 如果是日期格式,设置为当天结束时间 (23:59:59.999) + if (end_date.split('-').length === 3 && end_date.length <= 10) { + end_time += 24 * 60 * 60 * 1000 - 1; // 加上23:59:59.999 + logWithTimestamp(`将结束日期 ${end_date} 转换为当天结束时间戳 ${end_time}`); + } else { + logWithTimestamp(`将结束日期 ${end_date} 转换为时间戳 ${end_time}`); + } + use_custom_time_range = true; + } catch (err) { + logWithTimestamp(`结束日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } + } + if (skip_clickhouse_check) { logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); } @@ -209,6 +277,14 @@ export async function main( if (debug_utm) { logWithTimestamp("已启用UTM参数调试日志"); } + if (use_custom_time_range) { + if (start_time) { + logWithTimestamp(`已启用自定义时间范围:开始时间 ${new Date(start_time).toISOString()}`); + } + if (end_time) { + logWithTimestamp(`已启用自定义时间范围:结束时间 ${new Date(end_time).toISOString()}`); + } + } // 设置超时 const startTime = Date.now(); @@ -331,8 +407,28 @@ export async function main( type: 1 // 只同步type为1的记录 }; - // 如果有上次同步状态,则只获取更新的记录 - if (lastSyncState && lastSyncState.last_sync_time) { + // 根据时间范围参数构建查询条件 + if (use_custom_time_range) { + // 使用自定义时间范围 + const timeQuery: Record = {}; + + if (start_time) { + timeQuery.$gte = start_time; + logWithTimestamp(`将只同步createTime >= ${start_time} (${new Date(start_time).toISOString()}) 的记录`); + } + + if (end_time) { + timeQuery.$lte = end_time; + logWithTimestamp(`将只同步createTime <= ${end_time} (${new Date(end_time).toISOString()}) 的记录`); + } + + // 只有当至少指定了一个时间限制时才添加时间查询条件 + if (Object.keys(timeQuery).length > 0) { + query.createTime = timeQuery; + } + } + // 如果不使用自定义时间范围,且有上次同步状态,则只获取更新的记录 + else if (lastSyncState && lastSyncState.last_sync_time) { // 使用上次同步时间作为过滤条件 query.createTime = { $gt: lastSyncState.last_sync_time }; logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`); @@ -641,20 +737,25 @@ export async function main( // 更新同步状态 if (processedRecords > 0 && lastSyncTime > 0) { - // 创建新的同步状态,简化对象结构 - const newSyncState: SyncState = { - last_sync_time: lastSyncTime, - records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords, // 使用处理的总记录数,而不是实际插入数 - }; - - try { - // 保存同步状态 - await setVariable(SYNC_STATE_KEY, newSyncState); - logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); - } catch (err) { - const error = err as Error; - logWithTimestamp(`更新同步状态失败: ${error.message},将继续执行`); - // 不抛出错误,继续执行 + // 只在非自定义时间范围模式下更新同步状态 + if (!use_custom_time_range) { + // 创建新的同步状态,简化对象结构 + const newSyncState: SyncState = { + last_sync_time: lastSyncTime, + records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords, // 使用处理的总记录数,而不是实际插入数 + }; + + try { + // 保存同步状态 + await setVariable(SYNC_STATE_KEY, newSyncState); + logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); + } catch (err) { + const error = err as Error; + logWithTimestamp(`更新同步状态失败: ${error.message},将继续执行`); + // 不抛出错误,继续执行 + } + } else { + logWithTimestamp("使用自定义时间范围模式,不更新全局同步状态"); } } @@ -663,7 +764,8 @@ export async function main( records_processed: processedRecords, records_synced: totalBatchRecords, last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null, - message: "数据同步完成" + message: use_custom_time_range ? "自定义时间范围数据同步完成" : "数据同步完成", + custom_time_range_used: use_custom_time_range }; } catch (err) { console.error("同步过程中发生错误:", err);