diff --git a/.env b/.env index 7b13ba4..2c9d645 100644 --- a/.env +++ b/.env @@ -1,5 +1,7 @@ PORT=3007 +MONGO_URL="mongodb://10.0.1.41:27017" + # ClickHouse Configuration CLICKHOUSE_HOST=10.0.1.60 CLICKHOUSE_PORT=8123 @@ -26,4 +28,4 @@ DATABASE_URL="postgresql://postgres.mwwvqwevplndzvmqmrxa:eYYdarJsRL*Z6&p9gD@aws- NEXT_PUBLIC_LIMQ_API="https://app.upj.to" # Application URL for redirects (replace with your production URL) -NEXT_PUBLIC_SITE_URL="https://main.upj.to/login" \ No newline at end of file +NEXT_PUBLIC_SITE_URL="https://main.upj.to" \ No newline at end of file diff --git a/windmill/scripts/.gitignore b/windmill/scripts/.gitignore new file mode 100644 index 0000000..d502512 --- /dev/null +++ b/windmill/scripts/.gitignore @@ -0,0 +1,2 @@ +/node_modules +/package-lock.json diff --git a/windmill/scripts/package.json b/windmill/scripts/package.json new file mode 100644 index 0000000..712fbb7 --- /dev/null +++ b/windmill/scripts/package.json @@ -0,0 +1,19 @@ +{ + "name": "scripts", + "version": "1.0.0", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [], + "author": "", + "license": "ISC", + "description": "", + "dependencies": { + "date-fns": "^4.1.0", + "dotenv": "^16.5.0", + "fs-extra": "^11.3.0", + "mongodb": "^6.16.0", + "node-fetch": "^2.7.0" + } +} diff --git a/windmill/scripts/sync_mongo_to_clickhouse.js b/windmill/scripts/sync_mongo_to_clickhouse.js new file mode 100644 index 0000000..e7d48f4 --- /dev/null +++ b/windmill/scripts/sync_mongo_to_clickhouse.js @@ -0,0 +1,714 @@ +// 从MongoDB的trace表同步数据到ClickHouse的events表 +// +// 支持以下同步模式: +// 1. 增量同步:基于上次同步状态,只同步新数据(默认模式) +// 2. 自定义时间范围同步:通过指定开始时间和结束时间,同步特定时间范围内的数据 +// - 可以通过时间戳参数(startTime/endTime)指定范围 +// - 也可以通过日期字符串参数(startDate/endDate)指定范围,支持ISO格式或yyyy-MM-dd格式 + +const { MongoClient, ObjectId } = require('mongodb'); +const fs = require('fs'); +const path = require('path'); +const fetch = require('node-fetch'); + +// 同步状态键名和保存路径 +const SYNC_STATE_FILE = path.join(__dirname, 'mongo_sync_state.json'); + +// 直接使用配置值 +const mongoConfig = { + url: "mongodb://10.0.1.41:27017", + db: "main" // 注意:请替换为您的实际数据库名称 +}; + +const clickhouseConfig = { + clickhouse_host: "10.0.1.60", + clickhouse_port: "8123", + clickhouse_user: "admin", + clickhouse_password: "your_secure_password", + clickhouse_database: "shorturl_analytics", + clickhouse_url: "http://10.0.1.60:8123" +}; + +// 封装本地读取变量函数 +async function getVariable(key) { + try { + if (key === 'f/shorturl_analytics/mongodb') { + return mongoConfig; + } else if (key === 'f/shorturl_analytics/clickhouse') { + return clickhouseConfig; + } else if (key === 'f/shorturl_analytics/mongo_sync_state') { + if (fs.existsSync(SYNC_STATE_FILE)) { + return JSON.parse(fs.readFileSync(SYNC_STATE_FILE, 'utf8')); + } + } + return null; + } catch (error) { + console.error(`获取变量失败: ${error.message}`); + return null; + } +} + +// 封装本地保存变量函数 +async function setVariable(key, value) { + try { + if (key === 'f/shorturl_analytics/mongo_sync_state') { + fs.writeFileSync(SYNC_STATE_FILE, JSON.stringify(value, null, 2)); + } + } catch (error) { + console.error(`保存变量失败: ${error.message}`); + throw error; + } +} + +// 日期字符串转时间戳工具函数(接受ISO字符串或yyyy-MM-dd格式) +function dateToTimestamp(dateStr) { + try { + // 尝试直接解析完整的ISO日期字符串 + const date = new Date(dateStr); + + // 检查是否为有效日期 + if (isNaN(date.getTime())) { + // 尝试解析yyyy-MM-dd格式,默认设置为当天的00:00:00 + const parts = dateStr.split('-'); + if (parts.length === 3) { + const year = parseInt(parts[0], 10); + const month = parseInt(parts[1], 10) - 1; // 月份从0开始 + const day = parseInt(parts[2], 10); + + const dateObj = new Date(year, month, day, 0, 0, 0); + return dateObj.getTime(); + } + throw new Error(`无法解析日期字符串: ${dateStr}`); + } + + return date.getTime(); + } catch (err) { + throw new Error(`日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } +} + +// 从URL中提取UTM参数的函数,增强版 +function extractUtmParams(url, debug = false) { + const defaultUtmParams = { + utm_source: "", + utm_medium: "", + utm_campaign: "", + utm_term: "", + utm_content: "" + }; + + if (!url) return defaultUtmParams; + + if (debug) { + console.log(`[UTM提取] 原始URL: ${url}`); + } + + // 准备一个解析后的参数对象 + const params = { ...defaultUtmParams }; + + // 尝试多种方法提取UTM参数 + + // 方法1: 使用URL对象解析 + try { + // 先处理URL,确保是完整的URL格式 + let normalizedUrl = url; + if (!url.match(/^https?:\/\//i)) { + normalizedUrl = `https://example.com${url.startsWith('/') ? '' : '/'}${url}`; + } + + const urlObj = new URL(normalizedUrl); + + // 读取URL参数 + if (urlObj.searchParams.has('utm_source')) + params.utm_source = urlObj.searchParams.get('utm_source') || ""; + if (urlObj.searchParams.has('utm_medium')) + params.utm_medium = urlObj.searchParams.get('utm_medium') || ""; + if (urlObj.searchParams.has('utm_campaign')) + params.utm_campaign = urlObj.searchParams.get('utm_campaign') || ""; + if (urlObj.searchParams.has('utm_term')) + params.utm_term = urlObj.searchParams.get('utm_term') || ""; + if (urlObj.searchParams.has('utm_content')) + params.utm_content = urlObj.searchParams.get('utm_content') || ""; + + if (debug) { + console.log(`[UTM提取] URL对象解析结果: ${JSON.stringify(params)}`); + } + + // 如果至少找到一个UTM参数,则返回 + if (params.utm_source || params.utm_medium || params.utm_campaign || + params.utm_term || params.utm_content) { + return params; + } + } catch (err) { + if (debug) { + console.log(`[UTM提取] URL对象解析失败,尝试正则表达式`); + } + } + + // 方法2: 使用正则表达式提取参数 + // 使用正则表达式(最安全的方法,适用于任何格式) + const sourceMatch = url.match(/[?&]utm_source=([^&#]+)/i); + if (sourceMatch && sourceMatch[1]) { + try { + params.utm_source = decodeURIComponent(sourceMatch[1]); + } catch (err) { + params.utm_source = sourceMatch[1]; + } + } + + const mediumMatch = url.match(/[?&]utm_medium=([^&#]+)/i); + if (mediumMatch && mediumMatch[1]) { + try { + params.utm_medium = decodeURIComponent(mediumMatch[1]); + } catch (err) { + params.utm_medium = mediumMatch[1]; + } + } + + const campaignMatch = url.match(/[?&]utm_campaign=([^&#]+)/i); + if (campaignMatch && campaignMatch[1]) { + try { + params.utm_campaign = decodeURIComponent(campaignMatch[1]); + } catch (err) { + params.utm_campaign = campaignMatch[1]; + } + } + + const termMatch = url.match(/[?&]utm_term=([^&#]+)/i); + if (termMatch && termMatch[1]) { + try { + params.utm_term = decodeURIComponent(termMatch[1]); + } catch (err) { + params.utm_term = termMatch[1]; + } + } + + const contentMatch = url.match(/[?&]utm_content=([^&#]+)/i); + if (contentMatch && contentMatch[1]) { + try { + params.utm_content = decodeURIComponent(contentMatch[1]); + } catch (err) { + params.utm_content = contentMatch[1]; + } + } + + if (debug) { + console.log(`[UTM提取] 正则表达式解析结果: ${JSON.stringify(params)}`); + } + + return params; +} + +// 解析命令行参数 +function parseCommandLineArgs() { + const args = {}; + process.argv.slice(2).forEach(arg => { + if (arg.startsWith('--')) { + const [key, value] = arg.substring(2).split('='); + args[key] = value || true; + } + }); + return args; +} + +async function main() { + const args = parseCommandLineArgs(); + + // 参数设置 + const batch_size = parseInt(args['batch-size'] || '1000'); + const max_records = parseInt(args['max-records'] || '9999999'); + const timeout_minutes = parseInt(args['timeout'] || '60'); + const skip_clickhouse_check = args['skip-clickhouse-check'] === 'true'; + const force_insert = args['force-insert'] !== 'false'; + const database_override = args['database'] || 'shorturl_analytics'; + const reset_sync_state = args['reset-sync-state'] === 'true'; + const debug_utm = args['debug-utm'] === 'true'; + const start_time = args['start-time'] ? parseInt(args['start-time']) : undefined; + const end_time = args['end-time'] ? parseInt(args['end-time']) : undefined; + const use_custom_time_range = args['use-custom-time-range'] === 'true'; + const start_date = args['start-date']; + const end_date = args['end-date']; + + const logWithTimestamp = (message) => { + const now = new Date(); + console.log(`[${now.toISOString()}] ${message}`); + }; + + logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); + logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); + + let customStartTime = start_time; + let customEndTime = end_time; + let useCustomTimeRange = use_custom_time_range; + + // 处理日期字符串参数,转换为时间戳 + if (start_date) { + try { + customStartTime = dateToTimestamp(start_date); + logWithTimestamp(`将开始日期 ${start_date} 转换为时间戳 ${customStartTime}`); + useCustomTimeRange = true; + } catch (err) { + logWithTimestamp(`开始日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } + } + + if (end_date) { + try { + customEndTime = dateToTimestamp(end_date); + // 如果是日期格式,设置为当天结束时间 (23:59:59.999) + if (end_date.split('-').length === 3 && end_date.length <= 10) { + customEndTime += 24 * 60 * 60 * 1000 - 1; // 加上23:59:59.999 + logWithTimestamp(`将结束日期 ${end_date} 转换为当天结束时间戳 ${customEndTime}`); + } else { + logWithTimestamp(`将结束日期 ${end_date} 转换为时间戳 ${customEndTime}`); + } + useCustomTimeRange = true; + } catch (err) { + logWithTimestamp(`结束日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } + } + + if (skip_clickhouse_check) { + logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); + } + if (force_insert) { + logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); + } + if (reset_sync_state) { + logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据"); + } + if (debug_utm) { + logWithTimestamp("已启用UTM参数调试日志"); + } + if (useCustomTimeRange) { + if (customStartTime) { + logWithTimestamp(`已启用自定义时间范围:开始时间 ${new Date(customStartTime).toISOString()}`); + } + if (customEndTime) { + logWithTimestamp(`已启用自定义时间范围:结束时间 ${new Date(customEndTime).toISOString()}`); + } + } + + // 设置超时 + const startTime = Date.now(); + const timeoutMs = timeout_minutes * 60 * 1000; + + // 检查是否超时 + const checkTimeout = () => { + if (Date.now() - startTime > timeoutMs) { + console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); + return true; + } + return false; + }; + + // 获取上次同步状态 + let lastSyncState = null; + if (!reset_sync_state) { + try { + const rawSyncState = await getVariable("f/shorturl_analytics/mongo_sync_state"); + if (rawSyncState) { + lastSyncState = rawSyncState; + } + } catch (error) { + logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`); + } + } + + if (lastSyncState) { + logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`); + if (lastSyncState.last_sync_id) { + logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`); + } + } else { + logWithTimestamp("没有找到上次同步状态,将从头开始同步"); + } + + // 连接MongoDB + const client = new MongoClient(mongoConfig.url); + try { + await client.connect(); + console.log("MongoDB连接成功"); + + const db = client.db(mongoConfig.db); + const traceCollection = db.collection("trace"); + const shortCollection = db.collection("short"); + + // 构建查询条件 + const query = { + type: 1 // 只同步type为1的记录 + }; + + // 根据时间范围参数构建查询条件 + if (useCustomTimeRange) { + // 使用自定义时间范围 + const timeQuery = {}; + + if (customStartTime) { + timeQuery.$gte = customStartTime; + logWithTimestamp(`将只同步createTime >= ${customStartTime} (${new Date(customStartTime).toISOString()}) 的记录`); + } + + if (customEndTime) { + timeQuery.$lte = customEndTime; + logWithTimestamp(`将只同步createTime <= ${customEndTime} (${new Date(customEndTime).toISOString()}) 的记录`); + } + + // 只有当至少指定了一个时间限制时才添加时间查询条件 + if (Object.keys(timeQuery).length > 0) { + query.createTime = timeQuery; + } + } + // 如果不使用自定义时间范围,且有上次同步状态,则只获取更新的记录 + else if (lastSyncState && lastSyncState.last_sync_time) { + // 使用上次同步时间作为过滤条件 + query.createTime = { $gt: lastSyncState.last_sync_time }; + logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`); + } + + // 计算总记录数 + const totalRecords = await traceCollection.countDocuments(query); + console.log(`找到 ${totalRecords} 条新记录需要同步`); + + // 限制此次处理的记录数量 + const recordsToProcess = Math.min(totalRecords, max_records); + console.log(`本次将处理 ${recordsToProcess} 条记录`); + + if (totalRecords === 0) { + console.log("没有新记录需要同步,任务完成"); + return { + success: true, + records_synced: 0, + message: "没有新记录需要同步" + }; + } + + // 检查ClickHouse连接状态 + const checkClickHouseConnection = async () => { + if (skip_clickhouse_check) { + logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); + return true; + } + + try { + logWithTimestamp("测试ClickHouse连接..."); + const clickhouseUrl = clickhouseConfig.clickhouse_url; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}`, + }, + body: `SELECT 1 FROM ${clickhouseConfig.clickhouse_database}.events LIMIT 1`, + }); + + if (response.ok) { + logWithTimestamp("ClickHouse连接测试成功"); + return true; + } else { + const errorText = await response.text(); + logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); + return false; + } + } catch (err) { + logWithTimestamp(`ClickHouse连接测试失败: ${err.message}`); + return false; + } + }; + + // 在处理记录前先检查ClickHouse连接 + const clickhouseConnected = await checkClickHouseConnection(); + if (!clickhouseConnected && !skip_clickhouse_check) { + logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); + throw new Error("ClickHouse连接失败,无法继续同步"); + } + + // 处理记录的函数 + const processRecords = async (records) => { + if (records.length === 0) return 0; + + logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); + + // 强制使用所有记录,不检查重复 + const newRecords = records; + + logWithTimestamp(`准备处理 ${newRecords.length} 条记录...`); + + // 获取链接信息 + const slugIds = newRecords.map(record => new ObjectId(record.slugId)); + logWithTimestamp(`正在查询 ${slugIds.length} 条短链接信息...`); + const shortLinks = await shortCollection.find({ + _id: { $in: slugIds } + }).toArray(); + + // 创建映射用于快速查找 + const shortLinksMap = new Map(shortLinks.map((link) => [link._id.toString(), link])); + logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息,${newRecords.length - shortLinks.length} 条数据将使用占位符`); + + // 准备ClickHouse插入数据 + const clickhouseData = newRecords.map(record => { + const eventTime = new Date(record.createTime); + + // 获取对应的短链接信息 + const shortLink = shortLinksMap.get(record.slugId.toString()); + + // 提取URL中的UTM参数 + if (debug_utm && record.url) { + logWithTimestamp(`======== UTM参数调试 ========`); + logWithTimestamp(`记录ID: ${record._id.toString()}`); + logWithTimestamp(`原始URL: ${record.url}`); + } + + const utmParams = extractUtmParams(record.url || "", debug_utm); + + if (debug_utm) { + logWithTimestamp(`提取的UTM参数: ${JSON.stringify(utmParams)}`); + logWithTimestamp(`===========================`); + } + + // 保存提取的UTM参数和URL到event_attributes + const eventAttributes = { + mongo_id: record._id.toString(), + url: record.url || "", + ...(record.url ? { raw_url: record.url } : {}) + }; + + // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 + return { + // UUID将由ClickHouse自动生成 (event_id) + event_time: eventTime.toISOString().replace('T', ' ').replace('Z', ''), + event_type: record.type === 1 ? "visit" : "custom", + event_attributes: JSON.stringify(eventAttributes), + link_id: record.slugId.toString(), + link_slug: shortLink?.slug || "unknown_slug", // 使用占位符 + link_label: record.label || "", + link_title: shortLink?.title || "unknown_title", // 使用占位符 + link_original_url: shortLink?.origin || "https://unknown.url", // 使用占位符 + link_attributes: JSON.stringify({ domain: shortLink?.domain || "unknown_domain" }), // 使用占位符 + link_created_at: shortLink?.createTime + ? new Date(shortLink.createTime).toISOString().replace('T', ' ').replace('Z', '') + : eventTime.toISOString().replace('T', ' ').replace('Z', ''), + link_expires_at: shortLink?.expiresAt + ? new Date(shortLink.expiresAt).toISOString().replace('T', ' ').replace('Z', '') + : null, + link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]", + user_id: shortLink?.user || "unknown_user", // 使用占位符 + user_name: "unknown_user", // 使用占位符 + user_email: "", + user_attributes: "{}", + team_id: shortLink?.teamId || "unknown_team", // 使用占位符 + team_name: "unknown_team", // 使用占位符 + team_attributes: "{}", + project_id: shortLink?.projectId || "unknown_project", // 使用占位符 + project_name: "unknown_project", // 使用占位符 + project_attributes: "{}", + qr_code_id: "", + qr_code_name: "", + qr_code_attributes: "{}", + visitor_id: record._id.toString(), + session_id: record._id.toString() + "-" + record.createTime, + ip_address: record.ip || "0.0.0.0", // 使用占位符 + country: "", + city: "", + device_type: record.platform || "unknown", + browser: record.browser || "unknown", // 使用占位符 + os: record.platformOS || "unknown", // 使用占位符 + user_agent: (record.browser || "unknown") + " " + (record.browserVersion || "unknown"), // 使用占位符 + referrer: record.url || "", + utm_source: utmParams.utm_source || "", + utm_medium: utmParams.utm_medium || "", + utm_campaign: utmParams.utm_campaign || "", + utm_term: utmParams.utm_term || "", + utm_content: utmParams.utm_content || "", + time_spent_sec: 0, + is_bounce: true, + is_qr_scan: false, + conversion_type: "visit", + conversion_value: 0, + req_full_path: record.url || "" + }; + }); + + // 生成ClickHouse插入SQL + const insertSQL = ` + INSERT INTO ${clickhouseConfig.clickhouse_database}.events + (event_time, event_type, event_attributes, link_id, link_slug, link_label, link_title, + link_original_url, link_attributes, link_created_at, link_expires_at, link_tags, + user_id, user_name, user_email, user_attributes, team_id, team_name, team_attributes, + project_id, project_name, project_attributes, qr_code_id, qr_code_name, qr_code_attributes, + visitor_id, session_id, ip_address, country, city, device_type, browser, os, user_agent, + referrer, utm_source, utm_medium, utm_campaign, utm_term, utm_content, time_spent_sec, + is_bounce, is_qr_scan, conversion_type, conversion_value, req_full_path) + VALUES ${clickhouseData.map(record => { + // 确保所有字符串值都是字符串类型,并安全处理替换 + const safeReplace = (val) => { + // 确保值是字符串,如果是null或undefined则使用空字符串 + const str = val === null || val === undefined ? "" : String(val); + // 安全替换单引号 + return str.replace(/'/g, "''"); + }; + + return `('${record.event_time}', '${safeReplace(record.event_type)}', '${safeReplace(record.event_attributes)}', + '${record.link_id}', '${safeReplace(record.link_slug)}', '${safeReplace(record.link_label)}', '${safeReplace(record.link_title)}', + '${safeReplace(record.link_original_url)}', '${safeReplace(record.link_attributes)}', '${record.link_created_at}', + ${record.link_expires_at === null ? 'NULL' : `'${record.link_expires_at}'`}, '${safeReplace(record.link_tags)}', + '${safeReplace(record.user_id)}', '${safeReplace(record.user_name)}', '${safeReplace(record.user_email)}', + '${safeReplace(record.user_attributes)}', '${safeReplace(record.team_id)}', '${safeReplace(record.team_name)}', + '${safeReplace(record.team_attributes)}', '${safeReplace(record.project_id)}', '${safeReplace(record.project_name)}', + '${safeReplace(record.project_attributes)}', '${safeReplace(record.qr_code_id)}', '${safeReplace(record.qr_code_name)}', + '${safeReplace(record.qr_code_attributes)}', '${safeReplace(record.visitor_id)}', '${safeReplace(record.session_id)}', + '${safeReplace(record.ip_address)}', '${safeReplace(record.country)}', '${safeReplace(record.city)}', + '${safeReplace(record.device_type)}', '${safeReplace(record.browser)}', '${safeReplace(record.os)}', + '${safeReplace(record.user_agent)}', '${safeReplace(record.referrer)}', '${safeReplace(record.utm_source)}', + '${safeReplace(record.utm_medium)}', '${safeReplace(record.utm_campaign)}', '${safeReplace(record.utm_term)}', + '${safeReplace(record.utm_content)}', ${record.time_spent_sec}, ${record.is_bounce}, ${record.is_qr_scan}, + '${safeReplace(record.conversion_type)}', ${record.conversion_value}, '${safeReplace(record.req_full_path)}')`; + }).join(", ")} + `; + + if (insertSQL.length === 0) { + console.log("没有新记录需要插入"); + return 0; + } + + // 发送请求到ClickHouse + const clickhouseUrl = clickhouseConfig.clickhouse_url; + try { + logWithTimestamp("发送插入请求到ClickHouse..."); + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}` + }, + body: insertSQL, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); + } + + logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`); + return newRecords.length; + } catch (err) { + logWithTimestamp(`向ClickHouse插入数据失败: ${err.message}`); + throw err; + } + }; + + // 批量处理记录 + let processedRecords = 0; + let totalBatchRecords = 0; + let lastSyncTime = 0; + + for (let page = 0; processedRecords < recordsToProcess; page++) { + // 检查超时 + if (checkTimeout()) { + logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); + break; + } + + // 每批次都输出进度 + logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); + + // 查询MongoDB数据 + const records = await traceCollection.find(query) + .sort({ createTime: 1 }) + .skip(page * batch_size) + .limit(batch_size) + .toArray(); + + if (records.length === 0) { + logWithTimestamp("没有找到更多数据,同步结束"); + break; + } + + // 找到数据,开始处理 + logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); + // 输出当前批次的部分数据信息 + if (records.length > 0) { + logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`); + if (records.length > 1) { + logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); + } + + // 如果开启了调试,输出一些URL样本 + if (debug_utm) { + const sampleSize = Math.min(5, records.length); + logWithTimestamp(`URL样本 (前${sampleSize}条):`); + for (let i = 0; i < sampleSize; i++) { + if (records[i].url) { + logWithTimestamp(`样本 ${i+1}: ${records[i].url}`); + } + } + } + } + + const batchSize = await processRecords(records); + processedRecords += records.length; + totalBatchRecords += batchSize; + + // 更新最后处理的记录时间和ID + if (records.length > 0) { + const lastRecord = records[records.length - 1]; + lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime); + } + + logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + } + + // 更新同步状态 + if (processedRecords > 0 && lastSyncTime > 0) { + // 只在非自定义时间范围模式下更新同步状态 + if (!useCustomTimeRange) { + // 创建新的同步状态,简化对象结构 + const newSyncState = { + last_sync_time: lastSyncTime, + records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords + }; + + try { + // 保存同步状态 + await setVariable("f/shorturl_analytics/mongo_sync_state", newSyncState); + logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); + } catch (err) { + logWithTimestamp(`更新同步状态失败: ${err.message},将继续执行`); + } + } else { + logWithTimestamp("使用自定义时间范围模式,不更新全局同步状态"); + } + } + + return { + success: true, + records_processed: processedRecords, + records_synced: totalBatchRecords, + last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null, + message: useCustomTimeRange ? "自定义时间范围数据同步完成" : "数据同步完成", + custom_time_range_used: useCustomTimeRange + }; + } catch (err) { + console.error("同步过程中发生错误:", err); + return { + success: false, + error: err.message, + stack: err.stack + }; + } finally { + // 关闭MongoDB连接 + await client.close(); + console.log("MongoDB连接已关闭"); + } +} + +// 执行主函数 +main().then(result => { + console.log("任务执行结果:", result); + process.exit(result.success ? 0 : 1); +}).catch(err => { + console.error("执行出错:", err); + process.exit(1); +}); \ No newline at end of file diff --git a/windmill/sync_mongo_to_events.ts b/windmill/sync_mongo_to_events.ts index b3ecc0b..72542bc 100644 --- a/windmill/sync_mongo_to_events.ts +++ b/windmill/sync_mongo_to_events.ts @@ -1,4 +1,12 @@ // 从MongoDB的trace表同步数据到ClickHouse的events表 +// +// 支持以下同步模式: +// 1. 增量同步:基于上次同步状态,只同步新数据(默认模式) +// 2. 自定义时间范围同步:通过指定开始时间和结束时间,同步特定时间范围内的数据 +// - 可以通过时间戳参数(start_time/end_time)指定范围 +// - 也可以通过日期字符串参数(start_date/end_date)指定范围,支持ISO格式或yyyy-MM-dd格式 +// +// 使用自定义时间范围时,将不会更新同步状态,避免干扰增量同步进度 import { getVariable, setVariable } from "npm:windmill-client@1"; import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; @@ -68,6 +76,33 @@ interface UtmParams { // 同步状态键名 const SYNC_STATE_KEY = "f/shorturl_analytics/mongo_sync_state"; +// 日期字符串转时间戳工具函数(接受ISO字符串或yyyy-MM-dd格式) +function dateToTimestamp(dateStr: string): number { + try { + // 尝试直接解析完整的ISO日期字符串 + const date = new Date(dateStr); + + // 检查是否为有效日期 + if (isNaN(date.getTime())) { + // 尝试解析yyyy-MM-dd格式,默认设置为当天的00:00:00 + const parts = dateStr.split('-'); + if (parts.length === 3) { + const year = parseInt(parts[0], 10); + const month = parseInt(parts[1], 10) - 1; // 月份从0开始 + const day = parseInt(parts[2], 10); + + const dateObj = new Date(year, month, day, 0, 0, 0); + return dateObj.getTime(); + } + throw new Error(`无法解析日期字符串: ${dateStr}`); + } + + return date.getTime(); + } catch (err) { + throw new Error(`日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } +} + // 从URL中提取UTM参数的函数,增强版 function extractUtmParams(url: string, debug = false): UtmParams { const defaultUtmParams: UtmParams = { @@ -188,7 +223,12 @@ export async function main( force_insert = true, database_override = "shorturl_analytics", // 添加数据库名称参数,默认为shorturl_analytics reset_sync_state = false, // 添加参数用于重置同步状态 - debug_utm = false // 添加参数控制UTM调试日志输出 + debug_utm = false, // 添加参数控制UTM调试日志输出 + start_time?: number, // 添加参数指定同步的开始时间戳,可选 + end_time?: number, // 添加参数指定同步的结束时间戳,可选 + use_custom_time_range = false, // 添加参数控制是否使用自定义时间范围 + start_date?: string, // 添加开始日期字符串参数(ISO格式或yyyy-MM-dd格式) + end_date?: string // 添加结束日期字符串参数(ISO格式或yyyy-MM-dd格式) ) { const logWithTimestamp = (message: string) => { const now = new Date(); @@ -197,6 +237,34 @@ export async function main( logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); + + // 处理日期字符串参数,转换为时间戳 + if (start_date) { + try { + start_time = dateToTimestamp(start_date); + logWithTimestamp(`将开始日期 ${start_date} 转换为时间戳 ${start_time}`); + use_custom_time_range = true; + } catch (err) { + logWithTimestamp(`开始日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } + } + + if (end_date) { + try { + end_time = dateToTimestamp(end_date); + // 如果是日期格式,设置为当天结束时间 (23:59:59.999) + if (end_date.split('-').length === 3 && end_date.length <= 10) { + end_time += 24 * 60 * 60 * 1000 - 1; // 加上23:59:59.999 + logWithTimestamp(`将结束日期 ${end_date} 转换为当天结束时间戳 ${end_time}`); + } else { + logWithTimestamp(`将结束日期 ${end_date} 转换为时间戳 ${end_time}`); + } + use_custom_time_range = true; + } catch (err) { + logWithTimestamp(`结束日期转换错误: ${err instanceof Error ? err.message : String(err)}`); + } + } + if (skip_clickhouse_check) { logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); } @@ -209,6 +277,14 @@ export async function main( if (debug_utm) { logWithTimestamp("已启用UTM参数调试日志"); } + if (use_custom_time_range) { + if (start_time) { + logWithTimestamp(`已启用自定义时间范围:开始时间 ${new Date(start_time).toISOString()}`); + } + if (end_time) { + logWithTimestamp(`已启用自定义时间范围:结束时间 ${new Date(end_time).toISOString()}`); + } + } // 设置超时 const startTime = Date.now(); @@ -331,8 +407,28 @@ export async function main( type: 1 // 只同步type为1的记录 }; - // 如果有上次同步状态,则只获取更新的记录 - if (lastSyncState && lastSyncState.last_sync_time) { + // 根据时间范围参数构建查询条件 + if (use_custom_time_range) { + // 使用自定义时间范围 + const timeQuery: Record = {}; + + if (start_time) { + timeQuery.$gte = start_time; + logWithTimestamp(`将只同步createTime >= ${start_time} (${new Date(start_time).toISOString()}) 的记录`); + } + + if (end_time) { + timeQuery.$lte = end_time; + logWithTimestamp(`将只同步createTime <= ${end_time} (${new Date(end_time).toISOString()}) 的记录`); + } + + // 只有当至少指定了一个时间限制时才添加时间查询条件 + if (Object.keys(timeQuery).length > 0) { + query.createTime = timeQuery; + } + } + // 如果不使用自定义时间范围,且有上次同步状态,则只获取更新的记录 + else if (lastSyncState && lastSyncState.last_sync_time) { // 使用上次同步时间作为过滤条件 query.createTime = { $gt: lastSyncState.last_sync_time }; logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`); @@ -641,20 +737,25 @@ export async function main( // 更新同步状态 if (processedRecords > 0 && lastSyncTime > 0) { - // 创建新的同步状态,简化对象结构 - const newSyncState: SyncState = { - last_sync_time: lastSyncTime, - records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords, // 使用处理的总记录数,而不是实际插入数 - }; - - try { - // 保存同步状态 - await setVariable(SYNC_STATE_KEY, newSyncState); - logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); - } catch (err) { - const error = err as Error; - logWithTimestamp(`更新同步状态失败: ${error.message},将继续执行`); - // 不抛出错误,继续执行 + // 只在非自定义时间范围模式下更新同步状态 + if (!use_custom_time_range) { + // 创建新的同步状态,简化对象结构 + const newSyncState: SyncState = { + last_sync_time: lastSyncTime, + records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords, // 使用处理的总记录数,而不是实际插入数 + }; + + try { + // 保存同步状态 + await setVariable(SYNC_STATE_KEY, newSyncState); + logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); + } catch (err) { + const error = err as Error; + logWithTimestamp(`更新同步状态失败: ${error.message},将继续执行`); + // 不抛出错误,继续执行 + } + } else { + logWithTimestamp("使用自定义时间范围模式,不更新全局同步状态"); } } @@ -663,7 +764,8 @@ export async function main( records_processed: processedRecords, records_synced: totalBatchRecords, last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null, - message: "数据同步完成" + message: use_custom_time_range ? "自定义时间范围数据同步完成" : "数据同步完成", + custom_time_range_used: use_custom_time_range }; } catch (err) { console.error("同步过程中发生错误:", err);