From 45ffaccb7a45005282e8d60bde270c524a9547e0 Mon Sep 17 00:00:00 2001 From: William Tso Date: Fri, 21 Mar 2025 23:43:16 +0800 Subject: [PATCH] sync data fix --- windmill/sync_shorturl_event_from_mongo.ts | 2 +- .../sync_shorturl_from_mongo_to_clickhouse.ts | 21 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/windmill/sync_shorturl_event_from_mongo.ts b/windmill/sync_shorturl_event_from_mongo.ts index 6e635a7..98fca68 100644 --- a/windmill/sync_shorturl_event_from_mongo.ts +++ b/windmill/sync_shorturl_event_from_mongo.ts @@ -128,7 +128,7 @@ export async function main( // 获取上次同步的状态 let syncState: SyncState; try { - const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state"); + const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_event_sync_state"); try { syncState = JSON.parse(rawSyncState); console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); diff --git a/windmill/sync_shorturl_from_mongo_to_clickhouse.ts b/windmill/sync_shorturl_from_mongo_to_clickhouse.ts index 18e1e94..cf1065d 100644 --- a/windmill/sync_shorturl_from_mongo_to_clickhouse.ts +++ b/windmill/sync_shorturl_from_mongo_to_clickhouse.ts @@ -22,9 +22,10 @@ interface ClickHouseConfig { interface ShortRecord { _id: ObjectId; slug: string; // 短链接的slug部分 - url: string; // 原始URL + origin: string; // 原始URL + domain?: string; // 域名 createTime: number; // 创建时间戳 - user: string; // 创建用户 + user?: string; // 创建用户 title?: string; // 标题 description?: string; // 描述 tags?: string[]; // 标签 @@ -41,9 +42,9 @@ interface SyncState { } export async function main( - batch_size = 50, + batch_size = 100, initial_sync = false, - max_records = 1000, + max_records = 999999, timeout_minutes = 30, skip_clickhouse_check = false, force_insert = false @@ -248,7 +249,7 @@ export async function main( try { // 提取所有记录的ID - const recordIds = records.map(record => record.slug); + const recordIds = records.map(record => record._id.toString()); logWithTimestamp(`待检查的短链接ID: ${recordIds.join(', ')}`); // 构建查询SQL,检查记录是否已存在 @@ -311,7 +312,7 @@ export async function main( } // 过滤出不存在的记录 - const newRecords = records.filter(record => !existingIds.has(record.slug)); + const newRecords = records.filter(record => !existingIds.has(record._id.toString())); logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); return newRecords; @@ -374,11 +375,11 @@ export async function main( const expiresAtStr = record.expiresAt ? new Date(record.expiresAt).toISOString().replace('Z', '') : null; return { - link_id: record.slug, - original_url: record.url || "", + link_id: record._id.toString(), // 使用MongoDB的_id作为link_id + original_url: record.origin || "", created_at: createdAtStr, created_by: record.user || "unknown", - title: record.title || (record.url ? record.url.substring(0, 50) : "无标题"), + title: record.slug, // 使用slug作为title description: record.description || "", tags: record.tags || [], is_active: record.active !== undefined ? record.active : true, @@ -528,4 +529,4 @@ export async function main( await client.close(); console.log("MongoDB连接已关闭"); } -} +} \ No newline at end of file