only sync mongo event type 1

This commit is contained in:
2025-03-24 17:10:01 +08:00
parent 90e2000842
commit 8012fa78c0

View File

@@ -42,6 +42,7 @@ interface SyncState {
last_sync_time: number;
records_synced: number;
last_sync_id?: string;
sync_filter?: string;
}
// 替代 Windmill 的变量存储函数
@@ -162,7 +163,10 @@ export async function main(
const traceCollection = db.collection<TraceRecord>("trace");
// 构建查询条件,只查询新的记录
const query: Record<string, unknown> = {};
const query: Record<string, unknown> = {
// 添加type为1的过滤条件
type: 1
};
if (syncState.last_sync_time > 0) {
query.createTime = { $gt: syncState.last_sync_time };
@@ -170,7 +174,6 @@ export async function main(
if (syncState.last_sync_id) {
// 如果有上次同步的ID则从该ID之后开始查询
// 注意这需要MongoDB中createTime相同的记录按_id排序
query._id = { $gt: new ObjectId(syncState.last_sync_id) };
}
@@ -196,7 +199,7 @@ export async function main(
let processedRecords = 0;
let lastId: string | undefined;
let lastCreateTime = syncState.last_sync_time;
let totalBatchRecords = 0;
const totalBatchRecords = 0;
// 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => {
@@ -437,139 +440,62 @@ export async function main(
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
if (response.ok) {
logWithTimestamp("ClickHouse插入响应: 成功");
return newRecords.length;
} else {
const errorText = await response.text();
throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`);
throw new Error(`ClickHouse插入失败: ${response.status} ${errorText}`);
}
logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`);
return newRecords.length;
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse插入数据失败: ${error.message}`);
logWithTimestamp(`ClickHouse插入请求出错: ${error.message}`);
throw error;
}
};
// 批量处理记录
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
break;
}
// 处理所有记录
let totalSyncedRecords = 0;
while (processedRecords < recordsToProcess) {
const batch = await traceCollection.find(query).skip(processedRecords).limit(batch_size).toArray();
const batchRecords = await processRecords(batch);
processedRecords += batch.length;
totalSyncedRecords += batchRecords;
logWithTimestamp(`已处理 ${processedRecords} 条记录,共 ${totalSyncedRecords} 条记录已同步`);
// 每批次都输出进度
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
// 更新同步状态
const newSyncState: SyncState = {
last_sync_time: lastCreateTime,
records_synced: syncState.records_synced + totalBatchRecords,
last_sync_id: lastId,
// 可以添加一个标记表明这是type=1的过滤同步
sync_filter: "type_1_only"
};
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await traceCollection.find(query)
.sort({ createTime: 1, _id: 1 })
.skip(page * batch_size)
.limit(batch_size)
.toArray();
// 保存同步状态
await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState));
if (records.length === 0) {
logWithTimestamp(`${page+1} 批次没有找到数据,结束处理`);
break;
}
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`);
if (records.length > 1) {
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`);
}
}
const batchSize = await processRecords(records);
processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在
totalBatchRecords += batchSize; // 只增加实际插入的记录数
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
// 更新查询条件,以便下一批次查询
query.createTime = { $gt: lastCreateTime };
if (lastId) {
query._id = { $gt: new ObjectId(lastId) };
}
logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`);
// 检查是否超时
if (checkTimeout()) break;
}
// 更新同步状态
const newSyncState: SyncState = {
last_sync_time: lastCreateTime,
records_synced: syncState.records_synced + totalBatchRecords,
last_sync_id: lastId
};
await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState));
console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`);
// 返回同步结果
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
total_synced: newSyncState.records_synced,
last_sync_time: new Date(newSyncState.last_sync_time).toISOString(),
message: "数据同步完成"
records_synced: totalSyncedRecords,
total_synced: syncState.records_synced + totalSyncedRecords,
message: "同步完成"
};
} catch (err) {
console.error("同步过程中发生错误:", err);
const error = err as Error;
logWithTimestamp(`同步任务出错: ${error.message}`);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
records_synced: 0,
total_synced: syncState.records_synced,
message: error.message
};
} finally {
// 关闭MongoDB连接
await client.close();
console.log("MongoDB连接已关闭");
}
}
// 如果直接执行此脚本
if (require.main === module) {
// 解析命令行参数
const args = process.argv.slice(2);
const params: Record<string, any> = {
batch_size: 1000,
initial_sync: false,
max_records: 9999999,
timeout_minutes: 60,
skip_clickhouse_check: false,
force_insert: false
};
// 简单的参数解析
for (let i = 0; i < args.length; i += 2) {
if (args[i].startsWith('--') && i + 1 < args.length) {
const key = args[i].substring(2);
let value: any = args[i + 1];
// 类型转换
if (value === 'true') value = true;
else if (value === 'false') value = false;
else if (!isNaN(Number(value))) value = Number(value);
params[key] = value;
}
}
console.log('启动同步任务,参数:', params);
main(
params.batch_size,
params.initial_sync,
params.max_records,
params.timeout_minutes,
params.skip_clickhouse_check,
params.force_insert
).then(result => {
console.log('同步任务完成:', result);
process.exit(result.success ? 0 : 1);
}).catch(err => {
console.error('同步任务失败:', err);
process.exit(1);
});
}