diff --git a/windmill/sync_shorturl_to_clickhouse.ts b/windmill/sync_shorturl_to_clickhouse.ts index 9268889..cd29ccc 100644 --- a/windmill/sync_shorturl_to_clickhouse.ts +++ b/windmill/sync_shorturl_to_clickhouse.ts @@ -554,6 +554,47 @@ async function insertToClickhouse(data: Record[]) { // 构建INSERT查询 const columns = Object.keys(data[0]).join(", "); + // 收集所有记录的ID + const recordIds = data.map(record => record.id as string); + console.log(`需要处理的记录数: ${recordIds.length}`); + + // 先删除可能存在的重复记录 + try { + console.log(`删除可能存在的重复记录...`); + + // 按批次处理删除,避免请求过大 + const deleteBatchSize = 100; + for (let i = 0; i < recordIds.length; i += deleteBatchSize) { + const idBatch = recordIds.slice(i, i + deleteBatchSize); + const formattedIds = idBatch.map(id => `'${id}'`).join(', '); + + const deleteQuery = ` + ALTER TABLE shorturl_analytics.shorturl + DELETE WHERE id IN (${formattedIds}) + `; + + const response = await fetch(chConfig.clickhouse_url, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}` + }, + body: deleteQuery, + }); + + if (!response.ok) { + const errorText = await response.text(); + console.warn(`删除记录时出错 (批次 ${i/deleteBatchSize + 1}): ${errorText}`); + // 继续执行,不中断流程 + } else { + console.log(`成功删除批次 ${i/deleteBatchSize + 1}/${Math.ceil(recordIds.length/deleteBatchSize)}的潜在重复记录`); + } + } + } catch (error) { + console.warn(`删除重复记录时出错: ${(error as Error).message}`); + // 继续执行,不因为删除失败而中断整个过程 + } + const query = ` INSERT INTO shorturl_analytics.shorturl (${columns}) FORMAT JSONEachRow