sync event data

This commit is contained in:
2025-03-24 22:47:02 +08:00
parent 37aafbe636
commit b8d5b0545a
2 changed files with 153 additions and 111 deletions

View File

@@ -0,0 +1,122 @@
-- 修改设备类型字段从枚举类型更改为字符串类型
-- 先删除依赖于link_events表的物化视图
DROP TABLE IF EXISTS limq.platform_distribution;
DROP TABLE IF EXISTS limq.link_hourly_patterns;
DROP TABLE IF EXISTS limq.link_daily_stats;
DROP TABLE IF EXISTS limq.team_daily_stats;
DROP TABLE IF EXISTS limq.project_daily_stats;
-- 修改link_events表的device_type字段
ALTER TABLE
limq.link_events
MODIFY
COLUMN device_type String;
-- 重新创建物化视图
-- 每日链接汇总视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.link_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, link_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
link_id,
count() AS total_clicks,
uniqExact(visitor_id) AS unique_visitors,
uniqExact(session_id) AS unique_sessions,
sum(time_spent_sec) AS total_time_spent,
avg(time_spent_sec) AS avg_time_spent,
countIf(is_bounce) AS bounce_count,
countIf(event_type = 'conversion') AS conversion_count,
uniqExact(referrer) AS unique_referrers,
countIf(device_type = 'mobile') AS mobile_count,
countIf(device_type = 'tablet') AS tablet_count,
countIf(device_type = 'desktop') AS desktop_count,
countIf(is_qr_scan) AS qr_scan_count,
sum(conversion_value) AS total_conversion_value
FROM
limq.link_events
GROUP BY
date,
link_id;
-- 每小时访问模式视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.link_hourly_patterns ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, hour, link_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
toHour(event_time) AS hour,
link_id,
count() AS visits,
uniqExact(visitor_id) AS unique_visitors
FROM
limq.link_events
GROUP BY
date,
hour,
link_id;
-- 平台分布视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.platform_distribution ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, utm_source, device_type) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
utm_source,
device_type,
count() AS visits,
uniqExact(visitor_id) AS unique_visitors
FROM
limq.link_events
WHERE
utm_source != ''
GROUP BY
date,
utm_source,
device_type;
-- 团队每日统计视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.team_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, team_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
l.team_id AS team_id,
count() AS total_clicks,
uniqExact(e.visitor_id) AS unique_visitors,
countIf(e.event_type = 'conversion') AS conversion_count,
uniqExact(e.link_id) AS links_used,
countIf(e.is_qr_scan) AS qr_scan_count
FROM
limq.link_events e
JOIN limq.links l ON e.link_id = l.link_id
WHERE
l.team_id != ''
GROUP BY
date,
l.team_id;
-- 项目每日统计视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.project_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, project_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
l.project_id AS project_id,
count() AS total_clicks,
uniqExact(e.visitor_id) AS unique_visitors,
countIf(e.event_type = 'conversion') AS conversion_count,
uniqExact(e.link_id) AS links_used,
countIf(e.is_qr_scan) AS qr_scan_count
FROM
limq.link_events e
JOIN limq.links l ON e.link_id = l.link_id
WHERE
l.project_id != ''
GROUP BY
date,
l.project_id;

View File

@@ -40,12 +40,11 @@ interface SyncState {
} }
export async function main( export async function main(
batch_size = 1000, // 减小批处理大小为5 batch_size = 1000,
initial_sync = false, max_records = 9999999,
max_records = 9999999, // 只同步10条记录用于测试 timeout_minutes = 60,
timeout_minutes = 60, // 减少超时时间为5分钟 skip_clickhouse_check = false,
skip_clickhouse_check = false, // 是否跳过ClickHouse重复检查 force_insert = false
force_insert = false // 强制插入所有记录,不检查是否已存在
) { ) {
const logWithTimestamp = (message: string) => { const logWithTimestamp = (message: string) => {
const now = new Date(); const now = new Date();
@@ -125,34 +124,6 @@ export async function main(
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 获取上次同步的状态
let syncState: SyncState;
try {
const rawSyncState = await getVariable<string>("f/shorturl_analytics/clickhouse/shorturl_event_sync_state");
try {
syncState = JSON.parse(rawSyncState);
console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`);
} catch (parseError) {
console.error("解析同步状态失败:", parseError);
throw parseError;
}
} catch (_unused_error) {
console.log("未找到同步状态,创建初始同步状态");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 如果强制从头开始同步
if (initial_sync) {
console.log("强制从头开始同步");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 连接MongoDB // 连接MongoDB
const client = new MongoClient(); const client = new MongoClient();
try { try {
@@ -162,43 +133,28 @@ export async function main(
const db = client.database(mongoConfig.db); const db = client.database(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace"); const traceCollection = db.collection<TraceRecord>("trace");
// 构建查询条件,只查询新的记录 // 构建查询条件,获取所有记录
const query: Record<string, unknown> = {}; const query: Record<string, unknown> = {
type: 1 // 只同步type为1的记录
if (syncState.last_sync_time > 0) { };
query.createTime = { $gt: syncState.last_sync_time };
}
if (syncState.last_sync_id) {
// 如果有上次同步的ID则从该ID之后开始查询
// 注意这需要MongoDB中createTime相同的记录按_id排序
query._id = { $gt: new ObjectId(syncState.last_sync_id) };
}
// 计算总记录数 // 计算总记录数
const totalRecords = await traceCollection.countDocuments(query); const totalRecords = await traceCollection.countDocuments(query);
console.log(`找到 ${totalRecords}记录需要同步`); console.log(`找到 ${totalRecords} 条记录需要同步`);
// 限制此次处理的记录数量 // 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records); const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`本次将处理 ${recordsToProcess} 条记录`); console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) { if (totalRecords === 0) {
console.log("没有记录需要同步,任务完成"); console.log("没有记录需要同步,任务完成");
return { return {
success: true, success: true,
records_synced: 0, records_synced: 0,
total_synced: syncState.records_synced, message: "没有记录需要同步"
message: "没有新记录需要同步"
}; };
} }
// 分批处理记录
let processedRecords = 0;
let lastId: string | undefined;
let lastCreateTime = syncState.last_sync_time;
let totalBatchRecords = 0;
// 检查ClickHouse连接状态 // 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => { const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) { if (skip_clickhouse_check) {
@@ -358,10 +314,6 @@ export async function main(
if (newRecords.length === 0) { if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理"); logWithTimestamp("所有记录都已存在,跳过处理");
// 更新同步状态,即使没有新增记录
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
return 0; return 0;
} }
@@ -385,7 +337,7 @@ export async function main(
utm_medium: "", utm_medium: "",
utm_campaign: "", utm_campaign: "",
user_agent: record.browser + " " + record.browserVersion, user_agent: record.browser + " " + record.browserVersion,
device_type: record.platform === "mobile" ? 1 : (record.platform === "tablet" ? 2 : 3), device_type: record.platform || "unknown",
browser: record.browser || "", browser: record.browser || "",
os: record.platformOS || "", os: record.platformOS || "",
time_spent_sec: 0, time_spent_sec: 0,
@@ -398,12 +350,6 @@ export async function main(
}; };
}); });
// 更新同步状态使用原始records的最后一条以确保进度正确
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`);
// 生成ClickHouse插入SQL // 生成ClickHouse插入SQL
const insertSQL = ` const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events
@@ -421,7 +367,7 @@ export async function main(
return `('${record.link_id}', '${safeReplace(record.channel_id)}', '${record.visitor_id}', '${record.session_id}', return `('${record.link_id}', '${safeReplace(record.channel_id)}', '${record.visitor_id}', '${record.session_id}',
${record.event_type}, '${safeReplace(record.ip_address)}', '', '', ${record.event_type}, '${safeReplace(record.ip_address)}', '', '',
'${safeReplace(record.referrer)}', '', '', '', '${safeReplace(record.user_agent)}', ${record.device_type}, '${safeReplace(record.referrer)}', '', '', '', '${safeReplace(record.user_agent)}', '${safeReplace(record.device_type)}',
'${safeReplace(record.browser)}', '${safeReplace(record.os)}', '${safeReplace(record.browser)}', '${safeReplace(record.os)}',
0, true, false, '', 1, 0, '${safeReplace(record.custom_data)}')`; 0, true, false, '', 1, 0, '${safeReplace(record.custom_data)}')`;
}).join(", ")} }).join(", ")}
@@ -461,6 +407,9 @@ export async function main(
}; };
// 批量处理记录 // 批量处理记录
let processedRecords = 0;
let totalBatchRecords = 0;
for (let page = 0; processedRecords < recordsToProcess; page++) { for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时 // 检查超时
if (checkTimeout()) { if (checkTimeout()) {
@@ -472,32 +421,22 @@ export async function main(
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await traceCollection.find(query) const records = await traceCollection.find(
.sort({ createTime: 1, _id: 1 }) query,
.skip(page * batch_size) {
.limit(batch_size) allowDiskUse: true,
.toArray(); sort: { createTime: 1 },
skip: page * batch_size,
limit: batch_size
}
).toArray();
if (records.length === 0) { if (records.length === 0) {
logWithTimestamp(`${page+1} 批次没有找到数据,但将继续尝试后续批次...`); logWithTimestamp("没有找到更多数据,同步结束");
break;
// 更新查询条件,以确保能找到后续数据
// 更新查询时间,增加一定的时间窗口以尝试找到后续数据
const timeGap = 3600 * 1000; // 1小时的毫秒数
lastCreateTime += timeGap;
query.createTime = { $gt: lastCreateTime };
if (lastId) {
// 移除ID条件因为我们已经跳过了时间
delete query._id;
}
logWithTimestamp(`调整查询条件向前搜索: 创建时间 > ${new Date(lastCreateTime).toISOString()}`);
// 继续下一批次,不中断循环
continue;
} }
// 找到数据,开始处理
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息 // 输出当前批次的部分数据信息
if (records.length > 0) { if (records.length > 0) {
@@ -508,35 +447,16 @@ export async function main(
} }
const batchSize = await processRecords(records); const batchSize = await processRecords(records);
processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在 processedRecords += records.length;
totalBatchRecords += batchSize; // 只增加实际插入的记录数 totalBatchRecords += batchSize;
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
// 更新查询条件,以便下一批次查询
query.createTime = { $gt: lastCreateTime };
if (lastId) {
query._id = { $gt: new ObjectId(lastId) };
}
logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`);
} }
// 更新同步状态
const newSyncState: SyncState = {
last_sync_time: lastCreateTime,
records_synced: syncState.records_synced + totalBatchRecords,
last_sync_id: lastId
};
await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState));
console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`);
return { return {
success: true, success: true,
records_processed: processedRecords, records_processed: processedRecords,
records_synced: totalBatchRecords, records_synced: totalBatchRecords,
total_synced: newSyncState.records_synced,
last_sync_time: new Date(newSyncState.last_sync_time).toISOString(),
message: "数据同步完成" message: "数据同步完成"
}; };
} catch (err) { } catch (err) {