From 4262f789da5ce5af12dd67656c99e45f863c5e1a Mon Sep 17 00:00:00 2001 From: William Tso Date: Fri, 18 Apr 2025 08:33:39 +0800 Subject: [PATCH] utm sync --- windmill/sync_mongo_to_events.ts | 196 ++++++++++++++++++++++++++++--- 1 file changed, 177 insertions(+), 19 deletions(-) diff --git a/windmill/sync_mongo_to_events.ts b/windmill/sync_mongo_to_events.ts index d2982df..ae36c5c 100644 --- a/windmill/sync_mongo_to_events.ts +++ b/windmill/sync_mongo_to_events.ts @@ -56,9 +56,130 @@ interface SyncState { last_sync_id?: string; } +// 定义UTM参数接口 +interface UtmParams { + utm_source: string; + utm_medium: string; + utm_campaign: string; + utm_term: string; + utm_content: string; +} + // 同步状态键名 const SYNC_STATE_KEY = "f/shorturl_analytics/mongo_sync_state"; +// 从URL中提取UTM参数的函数,增强版 +function extractUtmParams(url: string, debug = false): UtmParams { + const defaultUtmParams: UtmParams = { + utm_source: "", + utm_medium: "", + utm_campaign: "", + utm_term: "", + utm_content: "" + }; + + if (!url) return defaultUtmParams; + + if (debug) { + console.log(`[UTM提取] 原始URL: ${url}`); + } + + // 准备一个解析后的参数对象 + const params: UtmParams = { ...defaultUtmParams }; + + // 尝试多种方法提取UTM参数 + + // 方法1: 使用URL对象解析 + try { + // 先处理URL,确保是完整的URL格式 + let normalizedUrl = url; + if (!url.match(/^https?:\/\//i)) { + normalizedUrl = `https://example.com${url.startsWith('/') ? '' : '/'}${url}`; + } + + const urlObj = new URL(normalizedUrl); + + // 读取URL参数 + if (urlObj.searchParams.has('utm_source')) + params.utm_source = urlObj.searchParams.get('utm_source') || ""; + if (urlObj.searchParams.has('utm_medium')) + params.utm_medium = urlObj.searchParams.get('utm_medium') || ""; + if (urlObj.searchParams.has('utm_campaign')) + params.utm_campaign = urlObj.searchParams.get('utm_campaign') || ""; + if (urlObj.searchParams.has('utm_term')) + params.utm_term = urlObj.searchParams.get('utm_term') || ""; + if (urlObj.searchParams.has('utm_content')) + params.utm_content = urlObj.searchParams.get('utm_content') || ""; + + if (debug) { + console.log(`[UTM提取] URL对象解析结果: ${JSON.stringify(params)}`); + } + + // 如果至少找到一个UTM参数,则返回 + if (params.utm_source || params.utm_medium || params.utm_campaign || + params.utm_term || params.utm_content) { + return params; + } + } catch (_err) { + if (debug) { + console.log(`[UTM提取] URL对象解析失败,尝试正则表达式`); + } + } + + // 方法2: 使用正则表达式提取参数 + // 使用正则表达式(最安全的方法,适用于任何格式) + const sourceMatch = url.match(/[?&]utm_source=([^&#]+)/i); + if (sourceMatch && sourceMatch[1]) { + try { + params.utm_source = decodeURIComponent(sourceMatch[1]); + } catch (_) { + params.utm_source = sourceMatch[1]; + } + } + + const mediumMatch = url.match(/[?&]utm_medium=([^&#]+)/i); + if (mediumMatch && mediumMatch[1]) { + try { + params.utm_medium = decodeURIComponent(mediumMatch[1]); + } catch (_) { + params.utm_medium = mediumMatch[1]; + } + } + + const campaignMatch = url.match(/[?&]utm_campaign=([^&#]+)/i); + if (campaignMatch && campaignMatch[1]) { + try { + params.utm_campaign = decodeURIComponent(campaignMatch[1]); + } catch (_) { + params.utm_campaign = campaignMatch[1]; + } + } + + const termMatch = url.match(/[?&]utm_term=([^&#]+)/i); + if (termMatch && termMatch[1]) { + try { + params.utm_term = decodeURIComponent(termMatch[1]); + } catch (_) { + params.utm_term = termMatch[1]; + } + } + + const contentMatch = url.match(/[?&]utm_content=([^&#]+)/i); + if (contentMatch && contentMatch[1]) { + try { + params.utm_content = decodeURIComponent(contentMatch[1]); + } catch (_) { + params.utm_content = contentMatch[1]; + } + } + + if (debug) { + console.log(`[UTM提取] 正则表达式解析结果: ${JSON.stringify(params)}`); + } + + return params; +} + export async function main( batch_size = 1000, max_records = 9999999, @@ -66,7 +187,8 @@ export async function main( skip_clickhouse_check = false, force_insert = false, database_override = "shorturl_analytics", // 添加数据库名称参数,默认为shorturl_analytics - reset_sync_state = false // 添加参数用于重置同步状态 + reset_sync_state = false, // 添加参数用于重置同步状态 + debug_utm = false // 添加参数控制UTM调试日志输出 ) { const logWithTimestamp = (message: string) => { const now = new Date(); @@ -84,6 +206,9 @@ export async function main( if (reset_sync_state) { logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据"); } + if (debug_utm) { + logWithTimestamp("已启用UTM参数调试日志"); + } // 设置超时 const startTime = Date.now(); @@ -407,42 +532,64 @@ export async function main( }).toArray(); // 创建映射用于快速查找 - 新增代码 - const shortLinksMap = new Map(shortLinks.map(link => [link._id.toString(), link])); + const shortLinksMap = new Map(shortLinks.map((link: ShortRecord) => [link._id.toString(), link])); logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息`); // 准备ClickHouse插入数据 const clickhouseData = newRecords.map(record => { const eventTime = new Date(record.createTime); + // 获取对应的短链接信息 - 新增代码 - const shortLink = shortLinksMap.get(record.slugId.toString()); + const shortLink = shortLinksMap.get(record.slugId.toString()) as ShortRecord | undefined; + + // 提取URL中的UTM参数 - 增加调试日志 + if (debug_utm && record.url) { + logWithTimestamp(`======== UTM参数调试 ========`); + logWithTimestamp(`记录ID: ${record._id.toString()}`); + logWithTimestamp(`原始URL: ${record.url}`); + } + + const utmParams = extractUtmParams(record.url || "", debug_utm); + + if (debug_utm) { + logWithTimestamp(`提取的UTM参数: ${JSON.stringify(utmParams)}`); + logWithTimestamp(`===========================`); + } + + // 保存提取的UTM参数和URL到event_attributes + const eventAttributes = { + mongo_id: record._id.toString(), + url: record.url || "", + ...(record.url ? { raw_url: record.url } : {}) + }; // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 return { // UUID将由ClickHouse自动生成 (event_id) event_time: eventTime.toISOString().replace('T', ' ').replace('Z', ''), event_type: record.type === 1 ? "visit" : "custom", - event_attributes: `{"mongo_id":"${record._id.toString()}"}`, + event_attributes: JSON.stringify(eventAttributes), link_id: record.slugId.toString(), - link_slug: shortLink?.slug || "", // 新增: 从short获取slug + link_slug: shortLink?.slug || "", link_label: record.label || "", - link_title: shortLink?.title || "", // 新增: 从short获取标题 - link_original_url: shortLink?.origin || "", // 新增: 从short获取原始URL - link_attributes: JSON.stringify({ domain: shortLink?.domain || null }), // 新增: 从short获取域名信息 + link_title: shortLink?.title || "", + link_original_url: shortLink?.origin || "", + link_attributes: JSON.stringify({ domain: shortLink?.domain || null }), link_created_at: shortLink?.createTime ? new Date(shortLink.createTime).toISOString().replace('T', ' ').replace('Z', '') - : eventTime.toISOString().replace('T', ' ').replace('Z', ''), // 新增: 使用真实的创建时间 + : eventTime.toISOString().replace('T', ' ').replace('Z', ''), link_expires_at: shortLink?.expiresAt ? new Date(shortLink.expiresAt).toISOString().replace('T', ' ').replace('Z', '') - : null, // 新增: 使用真实的过期时间 - link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]", // 新增: 从short获取标签 - user_id: shortLink?.user || "", // 新增: 从short获取用户ID + : null, + link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]", + user_id: shortLink?.user || "", user_name: "", user_email: "", user_attributes: "{}", - team_id: shortLink?.teamId || "", // 新增: 从short获取团队ID + team_id: shortLink?.teamId || "", team_name: "", team_attributes: "{}", - project_id: shortLink?.projectId || "", // 新增: 从short获取项目ID + project_id: shortLink?.projectId || "", project_name: "", project_attributes: "{}", qr_code_id: "", @@ -458,11 +605,11 @@ export async function main( os: record.platformOS || "", user_agent: record.browser + " " + record.browserVersion, referrer: record.url || "", - utm_source: "", - utm_medium: "", - utm_campaign: "", - utm_term: "", - utm_content: "", + utm_source: utmParams.utm_source, + utm_medium: utmParams.utm_medium, + utm_campaign: utmParams.utm_campaign, + utm_term: utmParams.utm_term, + utm_content: utmParams.utm_content, time_spent_sec: 0, is_bounce: true, is_qr_scan: false, @@ -582,6 +729,17 @@ export async function main( if (records.length > 1) { logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); } + + // 如果开启了调试,输出一些URL样本 + if (debug_utm) { + const sampleSize = Math.min(5, records.length); + logWithTimestamp(`URL样本 (前${sampleSize}条):`); + for (let i = 0; i < sampleSize; i++) { + if (records[i].url) { + logWithTimestamp(`样本 ${i+1}: ${records[i].url}`); + } + } + } } const batchSize = await processRecords(records);