Update sync_mongo_to_events.ts to force insert records and simplify ClickHouse checks. Removed existing record validation logic and added placeholders for missing data attributes.

This commit is contained in:
2025-04-21 23:54:15 +08:00
parent 3cbb76db36
commit ed1d2e59f6

View File

@@ -185,7 +185,7 @@ export async function main(
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false,
force_insert = true,
database_override = "shorturl_analytics", // 添加数据库名称参数默认为shorturl_analytics
reset_sync_state = false, // 添加参数用于重置同步状态
debug_utm = false // 添加参数控制UTM调试日志输出
@@ -391,104 +391,6 @@ export async function main(
}
};
// 检查记录是否已经存在于ClickHouse中
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
if (records.length === 0) return [];
// 如果跳过ClickHouse检查或强制插入则直接返回所有记录
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`已跳过ClickHouse重复检查准备处理所有 ${records.length} 条记录`);
return records;
}
logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`);
try {
// 验证数据库名称
if (!clickhouseConfig.clickhouse_database || clickhouseConfig.clickhouse_database === "undefined") {
throw new Error("数据库名称未定义或无效,请检查配置");
}
// 提取所有记录的ID
const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询
logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`);
// 构建查询SQL检查记录是否已存在确保添加FORMAT JSON来获取正确的JSON格式响应
const query = `
SELECT link_id, visitor_id
FROM ${clickhouseConfig.clickhouse_database}.events
WHERE link_id IN ('${recordIds.join("','")}')
FORMAT JSON
`;
logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`);
// 发送请求到ClickHouse添加10秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`);
}
// 获取响应文本以便记录
const responseText = await response.text();
logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`);
if (!responseText.trim()) {
logWithTimestamp("ClickHouse返回空响应假定没有记录存在");
return records; // 如果响应为空,假设没有记录
}
// 解析结果
let result;
try {
result = JSON.parse(responseText);
} catch (err) {
logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`);
throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`);
}
// 确保result有正确的结构
if (!result.data) {
logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`);
return records; // 如果没有data字段假设没有记录
}
// 提取已存在的记录ID
const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id));
logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`);
if (existingIds.size > 0) {
logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`);
}
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse查询出错: ${error.message}`);
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查将继续处理所有记录");
return records;
} else {
throw error; // 如果没有启用跳过检查,则抛出错误
}
}
};
// 在处理记录前先检查ClickHouse连接
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
@@ -502,27 +404,10 @@ export async function main(
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`);
// 检查记录是否已存在
let newRecords;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_clickhouse_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
// 强制使用所有记录,不检查重复
const newRecords = records;
if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理");
return 0;
}
logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`);
logWithTimestamp(`准备处理 ${newRecords.length} 条记录...`);
// 获取链接信息 - 新增代码
const slugIds = newRecords.map(record => record.slugId);
@@ -533,7 +418,7 @@ export async function main(
// 创建映射用于快速查找 - 新增代码
const shortLinksMap = new Map(shortLinks.map((link: ShortRecord) => [link._id.toString(), link]));
logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息`);
logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息${newRecords.length - shortLinks.length} 条数据将使用占位符`);
// 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => {
@@ -570,11 +455,11 @@ export async function main(
event_type: record.type === 1 ? "visit" : "custom",
event_attributes: JSON.stringify(eventAttributes),
link_id: record.slugId.toString(),
link_slug: shortLink?.slug || "",
link_slug: shortLink?.slug || "unknown_slug", // 使用占位符
link_label: record.label || "",
link_title: shortLink?.title || "",
link_original_url: shortLink?.origin || "",
link_attributes: JSON.stringify({ domain: shortLink?.domain || null }),
link_title: shortLink?.title || "unknown_title", // 使用占位符
link_original_url: shortLink?.origin || "https://unknown.url", // 使用占位符
link_attributes: JSON.stringify({ domain: shortLink?.domain || "unknown_domain" }), // 使用占位符
link_created_at: shortLink?.createTime
? new Date(shortLink.createTime).toISOString().replace('T', ' ').replace('Z', '')
: eventTime.toISOString().replace('T', ' ').replace('Z', ''),
@@ -582,34 +467,34 @@ export async function main(
? new Date(shortLink.expiresAt).toISOString().replace('T', ' ').replace('Z', '')
: null,
link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]",
user_id: shortLink?.user || "",
user_name: "",
user_id: shortLink?.user || "unknown_user", // 使用占位符
user_name: "unknown_user", // 使用占位符
user_email: "",
user_attributes: "{}",
team_id: shortLink?.teamId || "",
team_name: "",
team_id: shortLink?.teamId || "unknown_team", // 使用占位符
team_name: "unknown_team", // 使用占位符
team_attributes: "{}",
project_id: shortLink?.projectId || "",
project_name: "",
project_id: shortLink?.projectId || "unknown_project", // 使用占位符
project_name: "unknown_project", // 使用占位符
project_attributes: "{}",
qr_code_id: "",
qr_code_name: "",
qr_code_attributes: "{}",
visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID
session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID
ip_address: record.ip,
country: "", // 这些字段在MongoDB中不存在使用默认值
visitor_id: record._id.toString(),
session_id: record._id.toString() + "-" + record.createTime,
ip_address: record.ip || "0.0.0.0", // 使用占位符
country: "",
city: "",
device_type: record.platform || "unknown",
browser: record.browser || "",
os: record.platformOS || "",
user_agent: record.browser + " " + record.browserVersion,
browser: record.browser || "unknown", // 使用占位符
os: record.platformOS || "unknown", // 使用占位符
user_agent: (record.browser || "unknown") + " " + (record.browserVersion || "unknown"), // 使用占位符
referrer: record.url || "",
utm_source: utmParams.utm_source,
utm_medium: utmParams.utm_medium,
utm_campaign: utmParams.utm_campaign,
utm_term: utmParams.utm_term,
utm_content: utmParams.utm_content,
utm_source: utmParams.utm_source || "",
utm_medium: utmParams.utm_medium || "",
utm_campaign: utmParams.utm_campaign || "",
utm_term: utmParams.utm_term || "",
utm_content: utmParams.utm_content || "",
time_spent_sec: 0,
is_bounce: true,
is_qr_scan: false,
@@ -693,7 +578,6 @@ export async function main(
let processedRecords = 0;
let totalBatchRecords = 0;
let lastSyncTime = 0;
let lastSyncId = "";
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
@@ -750,7 +634,6 @@ export async function main(
if (records.length > 0) {
const lastRecord = records[records.length - 1];
lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime);
lastSyncId = lastRecord._id.toString();
}
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
@@ -758,11 +641,10 @@ export async function main(
// 更新同步状态
if (processedRecords > 0 && lastSyncTime > 0) {
// 创建新的同步状态
// 创建新的同步状态,简化对象结构
const newSyncState: SyncState = {
last_sync_time: lastSyncTime,
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords,
last_sync_id: lastSyncId
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords, // 使用处理的总记录数,而不是实际插入数
};
try {
@@ -771,7 +653,8 @@ export async function main(
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
} catch (err) {
const error = err as Error;
logWithTimestamp(`更新同步状态失败: ${error.message}`);
logWithTimestamp(`更新同步状态失败: ${error.message},将继续执行`);
// 不抛出错误,继续执行
}
}