diff --git a/windmill/sync_mongo_to_events.ts b/windmill/sync_mongo_to_events.ts index ae36c5c..b3ecc0b 100644 --- a/windmill/sync_mongo_to_events.ts +++ b/windmill/sync_mongo_to_events.ts @@ -185,7 +185,7 @@ export async function main( max_records = 9999999, timeout_minutes = 60, skip_clickhouse_check = false, - force_insert = false, + force_insert = true, database_override = "shorturl_analytics", // 添加数据库名称参数,默认为shorturl_analytics reset_sync_state = false, // 添加参数用于重置同步状态 debug_utm = false // 添加参数控制UTM调试日志输出 @@ -391,104 +391,6 @@ export async function main( } }; - // 检查记录是否已经存在于ClickHouse中 - const checkExistingRecords = async (records: TraceRecord[]): Promise => { - if (records.length === 0) return []; - - // 如果跳过ClickHouse检查或强制插入,则直接返回所有记录 - if (skip_clickhouse_check || force_insert) { - logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`); - return records; - } - - logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`); - - try { - // 验证数据库名称 - if (!clickhouseConfig.clickhouse_database || clickhouseConfig.clickhouse_database === "undefined") { - throw new Error("数据库名称未定义或无效,请检查配置"); - } - - // 提取所有记录的ID - const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询 - logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`); - - // 构建查询SQL,检查记录是否已存在,确保添加FORMAT JSON来获取正确的JSON格式响应 - const query = ` - SELECT link_id, visitor_id - FROM ${clickhouseConfig.clickhouse_database}.events - WHERE link_id IN ('${recordIds.join("','")}') - FORMAT JSON - `; - - logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`); - - // 发送请求到ClickHouse,添加10秒超时 - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` - }, - body: query, - signal: AbortSignal.timeout(10000) - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); - } - - // 获取响应文本以便记录 - const responseText = await response.text(); - logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`); - - if (!responseText.trim()) { - logWithTimestamp("ClickHouse返回空响应,假定没有记录存在"); - return records; // 如果响应为空,假设没有记录 - } - - // 解析结果 - let result; - try { - result = JSON.parse(responseText); - } catch (err) { - logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`); - throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`); - } - - // 确保result有正确的结构 - if (!result.data) { - logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`); - return records; // 如果没有data字段,假设没有记录 - } - - // 提取已存在的记录ID - const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id)); - - logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); - if (existingIds.size > 0) { - logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`); - } - - // 过滤出不存在的记录 - const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id - logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); - - return newRecords; - } catch (err) { - const error = err as Error; - logWithTimestamp(`ClickHouse查询出错: ${error.message}`); - if (skip_clickhouse_check) { - logWithTimestamp("已启用跳过ClickHouse检查,将继续处理所有记录"); - return records; - } else { - throw error; // 如果没有启用跳过检查,则抛出错误 - } - } - }; - // 在处理记录前先检查ClickHouse连接 const clickhouseConnected = await checkClickHouseConnection(); if (!clickhouseConnected && !skip_clickhouse_check) { @@ -502,27 +404,10 @@ export async function main( logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); - // 检查记录是否已存在 - let newRecords; - try { - newRecords = await checkExistingRecords(records); - } catch (err) { - const error = err as Error; - logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); - if (!skip_clickhouse_check && !force_insert) { - throw error; - } - // 如果跳过检查或强制插入,则使用所有记录 - logWithTimestamp("将使用所有记录进行处理"); - newRecords = records; - } + // 强制使用所有记录,不检查重复 + const newRecords = records; - if (newRecords.length === 0) { - logWithTimestamp("所有记录都已存在,跳过处理"); - return 0; - } - - logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`); + logWithTimestamp(`准备处理 ${newRecords.length} 条记录...`); // 获取链接信息 - 新增代码 const slugIds = newRecords.map(record => record.slugId); @@ -533,7 +418,7 @@ export async function main( // 创建映射用于快速查找 - 新增代码 const shortLinksMap = new Map(shortLinks.map((link: ShortRecord) => [link._id.toString(), link])); - logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息`); + logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息,${newRecords.length - shortLinks.length} 条数据将使用占位符`); // 准备ClickHouse插入数据 const clickhouseData = newRecords.map(record => { @@ -570,46 +455,46 @@ export async function main( event_type: record.type === 1 ? "visit" : "custom", event_attributes: JSON.stringify(eventAttributes), link_id: record.slugId.toString(), - link_slug: shortLink?.slug || "", + link_slug: shortLink?.slug || "unknown_slug", // 使用占位符 link_label: record.label || "", - link_title: shortLink?.title || "", - link_original_url: shortLink?.origin || "", - link_attributes: JSON.stringify({ domain: shortLink?.domain || null }), + link_title: shortLink?.title || "unknown_title", // 使用占位符 + link_original_url: shortLink?.origin || "https://unknown.url", // 使用占位符 + link_attributes: JSON.stringify({ domain: shortLink?.domain || "unknown_domain" }), // 使用占位符 link_created_at: shortLink?.createTime ? new Date(shortLink.createTime).toISOString().replace('T', ' ').replace('Z', '') - : eventTime.toISOString().replace('T', ' ').replace('Z', ''), + : eventTime.toISOString().replace('T', ' ').replace('Z', ''), link_expires_at: shortLink?.expiresAt ? new Date(shortLink.expiresAt).toISOString().replace('T', ' ').replace('Z', '') - : null, - link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]", - user_id: shortLink?.user || "", - user_name: "", + : null, + link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]", + user_id: shortLink?.user || "unknown_user", // 使用占位符 + user_name: "unknown_user", // 使用占位符 user_email: "", user_attributes: "{}", - team_id: shortLink?.teamId || "", - team_name: "", + team_id: shortLink?.teamId || "unknown_team", // 使用占位符 + team_name: "unknown_team", // 使用占位符 team_attributes: "{}", - project_id: shortLink?.projectId || "", - project_name: "", + project_id: shortLink?.projectId || "unknown_project", // 使用占位符 + project_name: "unknown_project", // 使用占位符 project_attributes: "{}", qr_code_id: "", qr_code_name: "", qr_code_attributes: "{}", - visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID - session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID - ip_address: record.ip, - country: "", // 这些字段在MongoDB中不存在,使用默认值 + visitor_id: record._id.toString(), + session_id: record._id.toString() + "-" + record.createTime, + ip_address: record.ip || "0.0.0.0", // 使用占位符 + country: "", city: "", device_type: record.platform || "unknown", - browser: record.browser || "", - os: record.platformOS || "", - user_agent: record.browser + " " + record.browserVersion, + browser: record.browser || "unknown", // 使用占位符 + os: record.platformOS || "unknown", // 使用占位符 + user_agent: (record.browser || "unknown") + " " + (record.browserVersion || "unknown"), // 使用占位符 referrer: record.url || "", - utm_source: utmParams.utm_source, - utm_medium: utmParams.utm_medium, - utm_campaign: utmParams.utm_campaign, - utm_term: utmParams.utm_term, - utm_content: utmParams.utm_content, + utm_source: utmParams.utm_source || "", + utm_medium: utmParams.utm_medium || "", + utm_campaign: utmParams.utm_campaign || "", + utm_term: utmParams.utm_term || "", + utm_content: utmParams.utm_content || "", time_spent_sec: 0, is_bounce: true, is_qr_scan: false, @@ -693,7 +578,6 @@ export async function main( let processedRecords = 0; let totalBatchRecords = 0; let lastSyncTime = 0; - let lastSyncId = ""; for (let page = 0; processedRecords < recordsToProcess; page++) { // 检查超时 @@ -750,7 +634,6 @@ export async function main( if (records.length > 0) { const lastRecord = records[records.length - 1]; lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime); - lastSyncId = lastRecord._id.toString(); } logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); @@ -758,11 +641,10 @@ export async function main( // 更新同步状态 if (processedRecords > 0 && lastSyncTime > 0) { - // 创建新的同步状态 + // 创建新的同步状态,简化对象结构 const newSyncState: SyncState = { last_sync_time: lastSyncTime, - records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords, - last_sync_id: lastSyncId + records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords, // 使用处理的总记录数,而不是实际插入数 }; try { @@ -771,7 +653,8 @@ export async function main( logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); } catch (err) { const error = err as Error; - logWithTimestamp(`更新同步状态失败: ${error.message}`); + logWithTimestamp(`更新同步状态失败: ${error.message},将继续执行`); + // 不抛出错误,继续执行 } }