Enhance MongoDB to ClickHouse synchronization script by adding support for custom time range synchronization, allowing users to specify start and end dates. Update .env file to include MongoDB connection URL and add .gitignore for script dependencies.
This commit is contained in:
@@ -1,4 +1,12 @@
|
||||
// 从MongoDB的trace表同步数据到ClickHouse的events表
|
||||
//
|
||||
// 支持以下同步模式:
|
||||
// 1. 增量同步:基于上次同步状态,只同步新数据(默认模式)
|
||||
// 2. 自定义时间范围同步:通过指定开始时间和结束时间,同步特定时间范围内的数据
|
||||
// - 可以通过时间戳参数(start_time/end_time)指定范围
|
||||
// - 也可以通过日期字符串参数(start_date/end_date)指定范围,支持ISO格式或yyyy-MM-dd格式
|
||||
//
|
||||
// 使用自定义时间范围时,将不会更新同步状态,避免干扰增量同步进度
|
||||
import { getVariable, setVariable } from "npm:windmill-client@1";
|
||||
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
|
||||
|
||||
@@ -68,6 +76,33 @@ interface UtmParams {
|
||||
// 同步状态键名
|
||||
const SYNC_STATE_KEY = "f/shorturl_analytics/mongo_sync_state";
|
||||
|
||||
// 日期字符串转时间戳工具函数(接受ISO字符串或yyyy-MM-dd格式)
|
||||
function dateToTimestamp(dateStr: string): number {
|
||||
try {
|
||||
// 尝试直接解析完整的ISO日期字符串
|
||||
const date = new Date(dateStr);
|
||||
|
||||
// 检查是否为有效日期
|
||||
if (isNaN(date.getTime())) {
|
||||
// 尝试解析yyyy-MM-dd格式,默认设置为当天的00:00:00
|
||||
const parts = dateStr.split('-');
|
||||
if (parts.length === 3) {
|
||||
const year = parseInt(parts[0], 10);
|
||||
const month = parseInt(parts[1], 10) - 1; // 月份从0开始
|
||||
const day = parseInt(parts[2], 10);
|
||||
|
||||
const dateObj = new Date(year, month, day, 0, 0, 0);
|
||||
return dateObj.getTime();
|
||||
}
|
||||
throw new Error(`无法解析日期字符串: ${dateStr}`);
|
||||
}
|
||||
|
||||
return date.getTime();
|
||||
} catch (err) {
|
||||
throw new Error(`日期转换错误: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
// 从URL中提取UTM参数的函数,增强版
|
||||
function extractUtmParams(url: string, debug = false): UtmParams {
|
||||
const defaultUtmParams: UtmParams = {
|
||||
@@ -188,7 +223,12 @@ export async function main(
|
||||
force_insert = true,
|
||||
database_override = "shorturl_analytics", // 添加数据库名称参数,默认为shorturl_analytics
|
||||
reset_sync_state = false, // 添加参数用于重置同步状态
|
||||
debug_utm = false // 添加参数控制UTM调试日志输出
|
||||
debug_utm = false, // 添加参数控制UTM调试日志输出
|
||||
start_time?: number, // 添加参数指定同步的开始时间戳,可选
|
||||
end_time?: number, // 添加参数指定同步的结束时间戳,可选
|
||||
use_custom_time_range = false, // 添加参数控制是否使用自定义时间范围
|
||||
start_date?: string, // 添加开始日期字符串参数(ISO格式或yyyy-MM-dd格式)
|
||||
end_date?: string // 添加结束日期字符串参数(ISO格式或yyyy-MM-dd格式)
|
||||
) {
|
||||
const logWithTimestamp = (message: string) => {
|
||||
const now = new Date();
|
||||
@@ -197,6 +237,34 @@ export async function main(
|
||||
|
||||
logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务");
|
||||
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
|
||||
|
||||
// 处理日期字符串参数,转换为时间戳
|
||||
if (start_date) {
|
||||
try {
|
||||
start_time = dateToTimestamp(start_date);
|
||||
logWithTimestamp(`将开始日期 ${start_date} 转换为时间戳 ${start_time}`);
|
||||
use_custom_time_range = true;
|
||||
} catch (err) {
|
||||
logWithTimestamp(`开始日期转换错误: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (end_date) {
|
||||
try {
|
||||
end_time = dateToTimestamp(end_date);
|
||||
// 如果是日期格式,设置为当天结束时间 (23:59:59.999)
|
||||
if (end_date.split('-').length === 3 && end_date.length <= 10) {
|
||||
end_time += 24 * 60 * 60 * 1000 - 1; // 加上23:59:59.999
|
||||
logWithTimestamp(`将结束日期 ${end_date} 转换为当天结束时间戳 ${end_time}`);
|
||||
} else {
|
||||
logWithTimestamp(`将结束日期 ${end_date} 转换为时间戳 ${end_time}`);
|
||||
}
|
||||
use_custom_time_range = true;
|
||||
} catch (err) {
|
||||
logWithTimestamp(`结束日期转换错误: ${err instanceof Error ? err.message : String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (skip_clickhouse_check) {
|
||||
logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在");
|
||||
}
|
||||
@@ -209,6 +277,14 @@ export async function main(
|
||||
if (debug_utm) {
|
||||
logWithTimestamp("已启用UTM参数调试日志");
|
||||
}
|
||||
if (use_custom_time_range) {
|
||||
if (start_time) {
|
||||
logWithTimestamp(`已启用自定义时间范围:开始时间 ${new Date(start_time).toISOString()}`);
|
||||
}
|
||||
if (end_time) {
|
||||
logWithTimestamp(`已启用自定义时间范围:结束时间 ${new Date(end_time).toISOString()}`);
|
||||
}
|
||||
}
|
||||
|
||||
// 设置超时
|
||||
const startTime = Date.now();
|
||||
@@ -331,8 +407,28 @@ export async function main(
|
||||
type: 1 // 只同步type为1的记录
|
||||
};
|
||||
|
||||
// 如果有上次同步状态,则只获取更新的记录
|
||||
if (lastSyncState && lastSyncState.last_sync_time) {
|
||||
// 根据时间范围参数构建查询条件
|
||||
if (use_custom_time_range) {
|
||||
// 使用自定义时间范围
|
||||
const timeQuery: Record<string, number> = {};
|
||||
|
||||
if (start_time) {
|
||||
timeQuery.$gte = start_time;
|
||||
logWithTimestamp(`将只同步createTime >= ${start_time} (${new Date(start_time).toISOString()}) 的记录`);
|
||||
}
|
||||
|
||||
if (end_time) {
|
||||
timeQuery.$lte = end_time;
|
||||
logWithTimestamp(`将只同步createTime <= ${end_time} (${new Date(end_time).toISOString()}) 的记录`);
|
||||
}
|
||||
|
||||
// 只有当至少指定了一个时间限制时才添加时间查询条件
|
||||
if (Object.keys(timeQuery).length > 0) {
|
||||
query.createTime = timeQuery;
|
||||
}
|
||||
}
|
||||
// 如果不使用自定义时间范围,且有上次同步状态,则只获取更新的记录
|
||||
else if (lastSyncState && lastSyncState.last_sync_time) {
|
||||
// 使用上次同步时间作为过滤条件
|
||||
query.createTime = { $gt: lastSyncState.last_sync_time };
|
||||
logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`);
|
||||
@@ -641,20 +737,25 @@ export async function main(
|
||||
|
||||
// 更新同步状态
|
||||
if (processedRecords > 0 && lastSyncTime > 0) {
|
||||
// 创建新的同步状态,简化对象结构
|
||||
const newSyncState: SyncState = {
|
||||
last_sync_time: lastSyncTime,
|
||||
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords, // 使用处理的总记录数,而不是实际插入数
|
||||
};
|
||||
|
||||
try {
|
||||
// 保存同步状态
|
||||
await setVariable(SYNC_STATE_KEY, newSyncState);
|
||||
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
logWithTimestamp(`更新同步状态失败: ${error.message},将继续执行`);
|
||||
// 不抛出错误,继续执行
|
||||
// 只在非自定义时间范围模式下更新同步状态
|
||||
if (!use_custom_time_range) {
|
||||
// 创建新的同步状态,简化对象结构
|
||||
const newSyncState: SyncState = {
|
||||
last_sync_time: lastSyncTime,
|
||||
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords, // 使用处理的总记录数,而不是实际插入数
|
||||
};
|
||||
|
||||
try {
|
||||
// 保存同步状态
|
||||
await setVariable(SYNC_STATE_KEY, newSyncState);
|
||||
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
logWithTimestamp(`更新同步状态失败: ${error.message},将继续执行`);
|
||||
// 不抛出错误,继续执行
|
||||
}
|
||||
} else {
|
||||
logWithTimestamp("使用自定义时间范围模式,不更新全局同步状态");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -663,7 +764,8 @@ export async function main(
|
||||
records_processed: processedRecords,
|
||||
records_synced: totalBatchRecords,
|
||||
last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null,
|
||||
message: "数据同步完成"
|
||||
message: use_custom_time_range ? "自定义时间范围数据同步完成" : "数据同步完成",
|
||||
custom_time_range_used: use_custom_time_range
|
||||
};
|
||||
} catch (err) {
|
||||
console.error("同步过程中发生错误:", err);
|
||||
|
||||
Reference in New Issue
Block a user