diff --git a/scripts/db/sql/clickhouse/modify_device_type.sql b/scripts/db/sql/clickhouse/modify_device_type.sql new file mode 100644 index 0000000..d1651d5 --- /dev/null +++ b/scripts/db/sql/clickhouse/modify_device_type.sql @@ -0,0 +1,122 @@ +-- 修改设备类型字段从枚举类型更改为字符串类型 +-- 先删除依赖于link_events表的物化视图 +DROP TABLE IF EXISTS limq.platform_distribution; + +DROP TABLE IF EXISTS limq.link_hourly_patterns; + +DROP TABLE IF EXISTS limq.link_daily_stats; + +DROP TABLE IF EXISTS limq.team_daily_stats; + +DROP TABLE IF EXISTS limq.project_daily_stats; + +-- 修改link_events表的device_type字段 +ALTER TABLE + limq.link_events +MODIFY + COLUMN device_type String; + +-- 重新创建物化视图 +-- 每日链接汇总视图 +CREATE MATERIALIZED VIEW IF NOT EXISTS limq.link_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date) +ORDER BY + (date, link_id) SETTINGS index_granularity = 8192 AS +SELECT + toDate(event_time) AS date, + link_id, + count() AS total_clicks, + uniqExact(visitor_id) AS unique_visitors, + uniqExact(session_id) AS unique_sessions, + sum(time_spent_sec) AS total_time_spent, + avg(time_spent_sec) AS avg_time_spent, + countIf(is_bounce) AS bounce_count, + countIf(event_type = 'conversion') AS conversion_count, + uniqExact(referrer) AS unique_referrers, + countIf(device_type = 'mobile') AS mobile_count, + countIf(device_type = 'tablet') AS tablet_count, + countIf(device_type = 'desktop') AS desktop_count, + countIf(is_qr_scan) AS qr_scan_count, + sum(conversion_value) AS total_conversion_value +FROM + limq.link_events +GROUP BY + date, + link_id; + +-- 每小时访问模式视图 +CREATE MATERIALIZED VIEW IF NOT EXISTS limq.link_hourly_patterns ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date) +ORDER BY + (date, hour, link_id) SETTINGS index_granularity = 8192 AS +SELECT + toDate(event_time) AS date, + toHour(event_time) AS hour, + link_id, + count() AS visits, + uniqExact(visitor_id) AS unique_visitors +FROM + limq.link_events +GROUP BY + date, + hour, + link_id; + +-- 平台分布视图 +CREATE MATERIALIZED VIEW IF NOT EXISTS limq.platform_distribution ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date) +ORDER BY + (date, utm_source, device_type) SETTINGS index_granularity = 8192 AS +SELECT + toDate(event_time) AS date, + utm_source, + device_type, + count() AS visits, + uniqExact(visitor_id) AS unique_visitors +FROM + limq.link_events +WHERE + utm_source != '' +GROUP BY + date, + utm_source, + device_type; + +-- 团队每日统计视图 +CREATE MATERIALIZED VIEW IF NOT EXISTS limq.team_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date) +ORDER BY + (date, team_id) SETTINGS index_granularity = 8192 AS +SELECT + toDate(event_time) AS date, + l.team_id AS team_id, + count() AS total_clicks, + uniqExact(e.visitor_id) AS unique_visitors, + countIf(e.event_type = 'conversion') AS conversion_count, + uniqExact(e.link_id) AS links_used, + countIf(e.is_qr_scan) AS qr_scan_count +FROM + limq.link_events e + JOIN limq.links l ON e.link_id = l.link_id +WHERE + l.team_id != '' +GROUP BY + date, + l.team_id; + +-- 项目每日统计视图 +CREATE MATERIALIZED VIEW IF NOT EXISTS limq.project_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date) +ORDER BY + (date, project_id) SETTINGS index_granularity = 8192 AS +SELECT + toDate(event_time) AS date, + l.project_id AS project_id, + count() AS total_clicks, + uniqExact(e.visitor_id) AS unique_visitors, + countIf(e.event_type = 'conversion') AS conversion_count, + uniqExact(e.link_id) AS links_used, + countIf(e.is_qr_scan) AS qr_scan_count +FROM + limq.link_events e + JOIN limq.links l ON e.link_id = l.link_id +WHERE + l.project_id != '' +GROUP BY + date, + l.project_id; \ No newline at end of file diff --git a/windmill/sync_shorturl_event_from_mongo.ts b/windmill/sync_shorturl_event_from_mongo.ts index 67bbfb9..345eaad 100644 --- a/windmill/sync_shorturl_event_from_mongo.ts +++ b/windmill/sync_shorturl_event_from_mongo.ts @@ -40,12 +40,11 @@ interface SyncState { } export async function main( - batch_size = 1000, // 减小批处理大小为5 - initial_sync = false, - max_records = 9999999, // 只同步10条记录用于测试 - timeout_minutes = 60, // 减少超时时间为5分钟 - skip_clickhouse_check = false, // 是否跳过ClickHouse重复检查 - force_insert = false // 强制插入所有记录,不检查是否已存在 + batch_size = 1000, + max_records = 9999999, + timeout_minutes = 60, + skip_clickhouse_check = false, + force_insert = false ) { const logWithTimestamp = (message: string) => { const now = new Date(); @@ -125,34 +124,6 @@ export async function main( console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); - // 获取上次同步的状态 - let syncState: SyncState; - try { - const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_event_sync_state"); - try { - syncState = JSON.parse(rawSyncState); - console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); - } catch (parseError) { - console.error("解析同步状态失败:", parseError); - throw parseError; - } - } catch (_unused_error) { - console.log("未找到同步状态,创建初始同步状态"); - syncState = { - last_sync_time: 0, - records_synced: 0, - }; - } - - // 如果强制从头开始同步 - if (initial_sync) { - console.log("强制从头开始同步"); - syncState = { - last_sync_time: 0, - records_synced: 0, - }; - } - // 连接MongoDB const client = new MongoClient(); try { @@ -162,43 +133,28 @@ export async function main( const db = client.database(mongoConfig.db); const traceCollection = db.collection("trace"); - // 构建查询条件,只查询新的记录 - const query: Record = {}; - - if (syncState.last_sync_time > 0) { - query.createTime = { $gt: syncState.last_sync_time }; - } - - if (syncState.last_sync_id) { - // 如果有上次同步的ID,则从该ID之后开始查询 - // 注意:这需要MongoDB中createTime相同的记录按_id排序 - query._id = { $gt: new ObjectId(syncState.last_sync_id) }; - } + // 构建查询条件,获取所有记录 + const query: Record = { + type: 1 // 只同步type为1的记录 + }; // 计算总记录数 const totalRecords = await traceCollection.countDocuments(query); - console.log(`找到 ${totalRecords} 条新记录需要同步`); + console.log(`找到 ${totalRecords} 条记录需要同步`); // 限制此次处理的记录数量 const recordsToProcess = Math.min(totalRecords, max_records); console.log(`本次将处理 ${recordsToProcess} 条记录`); if (totalRecords === 0) { - console.log("没有新记录需要同步,任务完成"); + console.log("没有记录需要同步,任务完成"); return { success: true, records_synced: 0, - total_synced: syncState.records_synced, - message: "没有新记录需要同步" + message: "没有记录需要同步" }; } - // 分批处理记录 - let processedRecords = 0; - let lastId: string | undefined; - let lastCreateTime = syncState.last_sync_time; - let totalBatchRecords = 0; - // 检查ClickHouse连接状态 const checkClickHouseConnection = async (): Promise => { if (skip_clickhouse_check) { @@ -358,10 +314,6 @@ export async function main( if (newRecords.length === 0) { logWithTimestamp("所有记录都已存在,跳过处理"); - // 更新同步状态,即使没有新增记录 - const lastRecord = records[records.length - 1]; - lastId = lastRecord._id.toString(); - lastCreateTime = lastRecord.createTime; return 0; } @@ -385,7 +337,7 @@ export async function main( utm_medium: "", utm_campaign: "", user_agent: record.browser + " " + record.browserVersion, - device_type: record.platform === "mobile" ? 1 : (record.platform === "tablet" ? 2 : 3), + device_type: record.platform || "unknown", browser: record.browser || "", os: record.platformOS || "", time_spent_sec: 0, @@ -398,12 +350,6 @@ export async function main( }; }); - // 更新同步状态(使用原始records的最后一条,以确保进度正确) - const lastRecord = records[records.length - 1]; - lastId = lastRecord._id.toString(); - lastCreateTime = lastRecord.createTime; - logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`); - // 生成ClickHouse插入SQL const insertSQL = ` INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events @@ -421,7 +367,7 @@ export async function main( return `('${record.link_id}', '${safeReplace(record.channel_id)}', '${record.visitor_id}', '${record.session_id}', ${record.event_type}, '${safeReplace(record.ip_address)}', '', '', - '${safeReplace(record.referrer)}', '', '', '', '${safeReplace(record.user_agent)}', ${record.device_type}, + '${safeReplace(record.referrer)}', '', '', '', '${safeReplace(record.user_agent)}', '${safeReplace(record.device_type)}', '${safeReplace(record.browser)}', '${safeReplace(record.os)}', 0, true, false, '', 1, 0, '${safeReplace(record.custom_data)}')`; }).join(", ")} @@ -461,6 +407,9 @@ export async function main( }; // 批量处理记录 + let processedRecords = 0; + let totalBatchRecords = 0; + for (let page = 0; processedRecords < recordsToProcess; page++) { // 检查超时 if (checkTimeout()) { @@ -472,32 +421,22 @@ export async function main( logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); - const records = await traceCollection.find(query) - .sort({ createTime: 1, _id: 1 }) - .skip(page * batch_size) - .limit(batch_size) - .toArray(); + const records = await traceCollection.find( + query, + { + allowDiskUse: true, + sort: { createTime: 1 }, + skip: page * batch_size, + limit: batch_size + } + ).toArray(); if (records.length === 0) { - logWithTimestamp(`第 ${page+1} 批次没有找到数据,但将继续尝试后续批次...`); - - // 更新查询条件,以确保能找到后续数据 - // 更新查询时间,增加一定的时间窗口以尝试找到后续数据 - const timeGap = 3600 * 1000; // 1小时的毫秒数 - lastCreateTime += timeGap; - - query.createTime = { $gt: lastCreateTime }; - if (lastId) { - // 移除ID条件,因为我们已经跳过了时间 - delete query._id; - } - - logWithTimestamp(`调整查询条件向前搜索: 创建时间 > ${new Date(lastCreateTime).toISOString()}`); - - // 继续下一批次,不中断循环 - continue; + logWithTimestamp("没有找到更多数据,同步结束"); + break; } + // 找到数据,开始处理 logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); // 输出当前批次的部分数据信息 if (records.length > 0) { @@ -508,35 +447,16 @@ export async function main( } const batchSize = await processRecords(records); - processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在 - totalBatchRecords += batchSize; // 只增加实际插入的记录数 + processedRecords += records.length; + totalBatchRecords += batchSize; logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); - - // 更新查询条件,以便下一批次查询 - query.createTime = { $gt: lastCreateTime }; - if (lastId) { - query._id = { $gt: new ObjectId(lastId) }; - } - logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`); } - // 更新同步状态 - const newSyncState: SyncState = { - last_sync_time: lastCreateTime, - records_synced: syncState.records_synced + totalBatchRecords, - last_sync_id: lastId - }; - - await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState)); - console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`); - return { success: true, records_processed: processedRecords, records_synced: totalBatchRecords, - total_synced: newSyncState.records_synced, - last_sync_time: new Date(newSyncState.last_sync_time).toISOString(), message: "数据同步完成" }; } catch (err) {