diff --git a/sync_shorturl_event_from_mongo.ts b/sync_shorturl_event_from_mongo.ts index ecfbe58..2de05d4 100644 --- a/sync_shorturl_event_from_mongo.ts +++ b/sync_shorturl_event_from_mongo.ts @@ -42,6 +42,7 @@ interface SyncState { last_sync_time: number; records_synced: number; last_sync_id?: string; + sync_filter?: string; } // 替代 Windmill 的变量存储函数 @@ -162,7 +163,10 @@ export async function main( const traceCollection = db.collection("trace"); // 构建查询条件,只查询新的记录 - const query: Record = {}; + const query: Record = { + // 添加type为1的过滤条件 + type: 1 + }; if (syncState.last_sync_time > 0) { query.createTime = { $gt: syncState.last_sync_time }; @@ -170,7 +174,6 @@ export async function main( if (syncState.last_sync_id) { // 如果有上次同步的ID,则从该ID之后开始查询 - // 注意:这需要MongoDB中createTime相同的记录按_id排序 query._id = { $gt: new ObjectId(syncState.last_sync_id) }; } @@ -196,7 +199,7 @@ export async function main( let processedRecords = 0; let lastId: string | undefined; let lastCreateTime = syncState.last_sync_time; - let totalBatchRecords = 0; + const totalBatchRecords = 0; // 检查ClickHouse连接状态 const checkClickHouseConnection = async (): Promise => { @@ -436,140 +439,63 @@ export async function main( body: insertSQL, signal: AbortSignal.timeout(20000) }); - - if (!response.ok) { + + if (response.ok) { + logWithTimestamp("ClickHouse插入响应: 成功"); + return newRecords.length; + } else { const errorText = await response.text(); - throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); + throw new Error(`ClickHouse插入失败: ${response.status} ${errorText}`); } - - logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`); - return newRecords.length; } catch (err) { const error = err as Error; - logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`); + logWithTimestamp(`ClickHouse插入请求出错: ${error.message}`); throw error; } }; - // 批量处理记录 - for (let page = 0; processedRecords < recordsToProcess; page++) { - // 检查超时 - if (checkTimeout()) { - logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); - break; - } + // 处理所有记录 + let totalSyncedRecords = 0; + while (processedRecords < recordsToProcess) { + const batch = await traceCollection.find(query).skip(processedRecords).limit(batch_size).toArray(); + const batchRecords = await processRecords(batch); + processedRecords += batch.length; + totalSyncedRecords += batchRecords; + logWithTimestamp(`已处理 ${processedRecords} 条记录,共 ${totalSyncedRecords} 条记录已同步`); - // 每批次都输出进度 - logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); - - logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); - const records = await traceCollection.find(query) - .sort({ createTime: 1, _id: 1 }) - .skip(page * batch_size) - .limit(batch_size) - .toArray(); + // 更新同步状态 + const newSyncState: SyncState = { + last_sync_time: lastCreateTime, + records_synced: syncState.records_synced + totalBatchRecords, + last_sync_id: lastId, + // 可以添加一个标记,表明这是type=1的过滤同步 + sync_filter: "type_1_only" + }; - if (records.length === 0) { - logWithTimestamp(`第 ${page+1} 批次没有找到数据,结束处理`); - break; - } - - logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); - // 输出当前批次的部分数据信息 - if (records.length > 0) { - logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`); - if (records.length > 1) { - logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); - } - } + // 保存同步状态 + await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState)); - const batchSize = await processRecords(records); - processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在 - totalBatchRecords += batchSize; // 只增加实际插入的记录数 - - logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); - - // 更新查询条件,以便下一批次查询 - query.createTime = { $gt: lastCreateTime }; - if (lastId) { - query._id = { $gt: new ObjectId(lastId) }; - } - logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`); + // 检查是否超时 + if (checkTimeout()) break; } - // 更新同步状态 - const newSyncState: SyncState = { - last_sync_time: lastCreateTime, - records_synced: syncState.records_synced + totalBatchRecords, - last_sync_id: lastId - }; - - await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState)); - console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`); - + // 返回同步结果 return { success: true, - records_processed: processedRecords, - records_synced: totalBatchRecords, - total_synced: newSyncState.records_synced, - last_sync_time: new Date(newSyncState.last_sync_time).toISOString(), - message: "数据同步完成" + records_synced: totalSyncedRecords, + total_synced: syncState.records_synced + totalSyncedRecords, + message: "同步完成" }; } catch (err) { - console.error("同步过程中发生错误:", err); + const error = err as Error; + logWithTimestamp(`同步任务出错: ${error.message}`); return { success: false, - error: err instanceof Error ? err.message : String(err), - stack: err instanceof Error ? err.stack : undefined + records_synced: 0, + total_synced: syncState.records_synced, + message: error.message }; } finally { - // 关闭MongoDB连接 await client.close(); - console.log("MongoDB连接已关闭"); } -} - -// 如果直接执行此脚本 -if (require.main === module) { - // 解析命令行参数 - const args = process.argv.slice(2); - const params: Record = { - batch_size: 1000, - initial_sync: false, - max_records: 9999999, - timeout_minutes: 60, - skip_clickhouse_check: false, - force_insert: false - }; - - // 简单的参数解析 - for (let i = 0; i < args.length; i += 2) { - if (args[i].startsWith('--') && i + 1 < args.length) { - const key = args[i].substring(2); - let value: any = args[i + 1]; - - // 类型转换 - if (value === 'true') value = true; - else if (value === 'false') value = false; - else if (!isNaN(Number(value))) value = Number(value); - - params[key] = value; - } - } - - console.log('启动同步任务,参数:', params); - main( - params.batch_size, - params.initial_sync, - params.max_records, - params.timeout_minutes, - params.skip_clickhouse_check, - params.force_insert - ).then(result => { - console.log('同步任务完成:', result); - process.exit(result.success ? 0 : 1); - }).catch(err => { - console.error('同步任务失败:', err); - process.exit(1); - }); -} \ No newline at end of file +} \ No newline at end of file