Files
shorturl-analytics/windmill/sync_shorturl_from_mongo_to_clickhouse.ts
2025-03-21 22:40:03 +08:00

532 lines
20 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 从MongoDB的short表同步数据到ClickHouse的links表
import { getVariable, setVariable } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
interface MongoConfig {
host: string;
port: string;
db: string;
username: string;
password: string;
}
interface ClickHouseConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_database: string;
clickhouse_url: string;
}
interface ShortRecord {
_id: ObjectId;
slug: string; // 短链接的slug部分
url: string; // 原始URL
createTime: number; // 创建时间戳
user: string; // 创建用户
title?: string; // 标题
description?: string; // 描述
tags?: string[]; // 标签
active?: boolean; // 是否活跃
expiresAt?: number; // 过期时间戳
teamId?: string; // 团队ID
projectId?: string; // 项目ID
}
interface SyncState {
last_sync_time: number;
records_synced: number;
last_sync_id?: string;
}
export async function main(
batch_size = 50,
initial_sync = false,
max_records = 1000,
timeout_minutes = 30,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("开始执行MongoDB到ClickHouse的短链接同步任务...");
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
if (skip_clickhouse_check) {
logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式不会检查记录是否已存在");
}
if (force_insert) {
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
}
// 设置超时
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
// 检查是否超时
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`);
return true;
}
return false;
};
// 获取MongoDB和ClickHouse的连接信息
let mongoConfig: MongoConfig;
let clickhouseConfig: ClickHouseConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
console.log("原始MongoDB配置:", typeof rawMongoConfig === "string" ? rawMongoConfig : JSON.stringify(rawMongoConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawMongoConfig === "string") {
try {
mongoConfig = JSON.parse(rawMongoConfig);
} catch (e) {
console.error("MongoDB配置解析失败:", e);
throw e;
}
} else {
mongoConfig = rawMongoConfig as MongoConfig;
}
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
console.log("原始ClickHouse配置:", typeof rawClickhouseConfig === "string" ? rawClickhouseConfig : JSON.stringify(rawClickhouseConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawClickhouseConfig === "string") {
try {
clickhouseConfig = JSON.parse(rawClickhouseConfig);
} catch (e) {
console.error("ClickHouse配置解析失败:", e);
throw e;
}
} else {
clickhouseConfig = rawClickhouseConfig as ClickHouseConfig;
}
console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig));
console.log("ClickHouse配置解析为:", JSON.stringify(clickhouseConfig));
} catch (error) {
console.error("获取配置失败:", error);
throw error;
}
// 构建MongoDB连接URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 获取上次同步的状态
let syncState: SyncState;
try {
const rawSyncState = await getVariable<string>("f/shorturl_analytics/clickhouse/shorturl_links_sync_state");
try {
syncState = JSON.parse(rawSyncState);
console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`);
} catch (parseError) {
console.error("解析同步状态失败:", parseError);
throw parseError;
}
} catch (_unused_error) {
console.log("未找到同步状态,创建初始同步状态");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 如果强制从头开始同步
if (initial_sync) {
console.log("强制从头开始同步");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 连接MongoDB
const client = new MongoClient();
try {
await client.connect(mongoUrl);
console.log("MongoDB连接成功");
const db = client.database(mongoConfig.db);
const shortCollection = db.collection<ShortRecord>("short");
// 构建查询条件,只查询新的记录
const query: Record<string, unknown> = {};
if (syncState.last_sync_time > 0) {
query.createTime = { $gt: syncState.last_sync_time };
}
if (syncState.last_sync_id) {
// 如果有上次同步的ID则从该ID之后开始查询
query._id = { $gt: new ObjectId(syncState.last_sync_id) };
}
// 计算总记录数
const totalRecords = await shortCollection.countDocuments(query);
console.log(`找到 ${totalRecords} 条新短链接记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
console.log("没有新记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
total_synced: syncState.records_synced,
message: "没有新记录需要同步"
};
}
// 分批处理记录
let processedRecords = 0;
let lastId: string | undefined;
let lastCreateTime = syncState.last_sync_time;
let totalBatchRecords = 0;
// 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查不测试连接");
return true;
}
try {
logWithTimestamp("测试ClickHouse连接...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
},
body: "SELECT 1 FORMAT JSON",
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse连接测试成功");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`);
return false;
}
};
// 检查记录是否已经存在于ClickHouse中
const checkExistingRecords = async (records: ShortRecord[]): Promise<ShortRecord[]> => {
if (records.length === 0) return [];
// 如果跳过ClickHouse检查或强制插入则直接返回所有记录
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`已跳过ClickHouse重复检查准备处理所有 ${records.length} 条记录`);
return records;
}
logWithTimestamp(`正在检查 ${records.length} 条短链接记录是否已存在于ClickHouse中...`);
try {
// 提取所有记录的ID
const recordIds = records.map(record => record.slug);
logWithTimestamp(`待检查的短链接ID: ${recordIds.join(', ')}`);
// 构建查询SQL检查记录是否已存在
const query = `
SELECT link_id
FROM ${clickhouseConfig.clickhouse_database}.links
WHERE link_id IN ('${recordIds.join("','")}')
FORMAT JSON
`;
logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`);
// 发送请求到ClickHouse
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`);
}
// 获取响应文本
const responseText = await response.text();
logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`);
if (!responseText.trim()) {
logWithTimestamp("ClickHouse返回空响应假定没有记录存在");
return records;
}
// 解析结果
let result;
try {
result = JSON.parse(responseText);
} catch (err) {
logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`);
throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`);
}
// 确保result有正确的结构
if (!result.data) {
logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`);
return records;
}
// 提取已存在的记录ID
const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id));
logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`);
if (existingIds.size > 0) {
logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`);
}
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingIds.has(record.slug));
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse查询出错: ${error.message}`);
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查将继续处理所有记录");
return records;
} else {
throw error;
}
}
};
// 在处理记录前先检查ClickHouse连接
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
logWithTimestamp("⚠️ ClickHouse连接测试失败请启用skip_clickhouse_check=true参数来跳过连接检查");
throw new Error("ClickHouse连接失败无法继续同步");
}
// 处理记录的函数
const processRecords = async (records: ShortRecord[]) => {
if (records.length === 0) return 0;
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条短链接记录...`);
// 检查记录是否已存在
let newRecords;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_clickhouse_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理");
// 更新同步状态,即使没有新增记录
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
return 0;
}
logWithTimestamp(`准备处理 ${newRecords.length} 条新短链接记录...`);
// 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => {
// 转换MongoDB记录为ClickHouse格式匹配ClickHouse表结构
// 处理日期时间移除ISO格式中的Z以使ClickHouse正确解析
const createdAtStr = new Date(record.createTime).toISOString().replace('Z', '');
const expiresAtStr = record.expiresAt ? new Date(record.expiresAt).toISOString().replace('Z', '') : null;
return {
link_id: record.slug,
original_url: record.url || "",
created_at: createdAtStr,
created_by: record.user || "unknown",
title: record.title || (record.url ? record.url.substring(0, 50) : "无标题"),
description: record.description || "",
tags: record.tags || [],
is_active: record.active !== undefined ? record.active : true,
expires_at: expiresAtStr,
team_id: record.teamId || "",
project_id: record.projectId || ""
};
});
// 更新同步状态使用原始records的最后一条以确保进度正确
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`);
// 生成ClickHouse插入SQL
// 注意Array类型需要特殊处理这里将tags作为JSON字符串处理
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.links
(link_id, original_url, created_at, created_by, title, description, tags, is_active, expires_at, team_id, project_id)
VALUES
${clickhouseData.map(record => {
// 处理tags数组
const tagsStr = JSON.stringify(record.tags || []);
// 处理expires_at可能为null的情况
const expiresAt = record.expires_at ? `'${record.expires_at}'` : "NULL";
// 确保所有字段在使用replace前都有默认值
const safeOriginalUrl = (record.original_url || "").replace(/'/g, "''");
const safeCreatedBy = (record.created_by || "unknown").replace(/'/g, "''");
const safeTitle = (record.title || "无标题").replace(/'/g, "''");
const safeDescription = (record.description || "").replace(/'/g, "''");
const safeTeamId = record.team_id || "";
const safeProjectId = record.project_id || "";
return `('${record.link_id}', '${safeOriginalUrl}', '${record.created_at}', '${safeCreatedBy}', '${safeTitle}', '${safeDescription}', ${tagsStr}, ${record.is_active}, ${expiresAt}, '${safeTeamId}', '${safeProjectId}')`;
}).join(", ")}
`;
if (clickhouseData.length === 0) {
console.log("没有新记录需要插入");
return 0;
}
// 发送请求到ClickHouse
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
try {
logWithTimestamp("发送插入请求到ClickHouse...");
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: insertSQL,
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`);
}
logWithTimestamp(`成功插入 ${newRecords.length} 条短链接记录到ClickHouse`);
return newRecords.length;
} catch (err) {
const error = err as Error;
logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`);
throw error;
}
};
// 批量处理记录
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
break;
}
// 每批次都输出进度
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await shortCollection.find(query)
.sort({ createTime: 1, _id: 1 })
.skip(page * batch_size)
.limit(batch_size)
.toArray();
if (records.length === 0) {
logWithTimestamp(`${page+1} 批次没有找到数据,结束处理`);
break;
}
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, Slug=${records[0].slug}, 时间=${new Date(records[0].createTime).toISOString()}`);
if (records.length > 1) {
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, Slug=${records[records.length-1].slug}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`);
}
}
const batchSize = await processRecords(records);
processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在
totalBatchRecords += batchSize; // 只增加实际插入的记录数
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
// 更新查询条件,以便下一批次查询
query.createTime = { $gt: lastCreateTime };
if (lastId) {
query._id = { $gt: new ObjectId(lastId) };
}
logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`);
}
// 更新同步状态
const newSyncState: SyncState = {
last_sync_time: lastCreateTime,
records_synced: syncState.records_synced + totalBatchRecords,
last_sync_id: lastId
};
await setVariable("f/shorturl_analytics/clickhouse/shorturl_links_sync_state", JSON.stringify(newSyncState));
console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`);
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
total_synced: newSyncState.records_synced,
last_sync_time: new Date(newSyncState.last_sync_time).toISOString(),
message: "短链接数据同步完成"
};
} catch (err) {
console.error("同步过程中发生错误:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
// 关闭MongoDB连接
await client.close();
console.log("MongoDB连接已关闭");
}
}