This commit is contained in:
2025-04-18 08:33:39 +08:00
parent 2e34cd5b4b
commit 4262f789da

View File

@@ -56,9 +56,130 @@ interface SyncState {
last_sync_id?: string;
}
// 定义UTM参数接口
interface UtmParams {
utm_source: string;
utm_medium: string;
utm_campaign: string;
utm_term: string;
utm_content: string;
}
// 同步状态键名
const SYNC_STATE_KEY = "f/shorturl_analytics/mongo_sync_state";
// 从URL中提取UTM参数的函数增强版
function extractUtmParams(url: string, debug = false): UtmParams {
const defaultUtmParams: UtmParams = {
utm_source: "",
utm_medium: "",
utm_campaign: "",
utm_term: "",
utm_content: ""
};
if (!url) return defaultUtmParams;
if (debug) {
console.log(`[UTM提取] 原始URL: ${url}`);
}
// 准备一个解析后的参数对象
const params: UtmParams = { ...defaultUtmParams };
// 尝试多种方法提取UTM参数
// 方法1: 使用URL对象解析
try {
// 先处理URL确保是完整的URL格式
let normalizedUrl = url;
if (!url.match(/^https?:\/\//i)) {
normalizedUrl = `https://example.com${url.startsWith('/') ? '' : '/'}${url}`;
}
const urlObj = new URL(normalizedUrl);
// 读取URL参数
if (urlObj.searchParams.has('utm_source'))
params.utm_source = urlObj.searchParams.get('utm_source') || "";
if (urlObj.searchParams.has('utm_medium'))
params.utm_medium = urlObj.searchParams.get('utm_medium') || "";
if (urlObj.searchParams.has('utm_campaign'))
params.utm_campaign = urlObj.searchParams.get('utm_campaign') || "";
if (urlObj.searchParams.has('utm_term'))
params.utm_term = urlObj.searchParams.get('utm_term') || "";
if (urlObj.searchParams.has('utm_content'))
params.utm_content = urlObj.searchParams.get('utm_content') || "";
if (debug) {
console.log(`[UTM提取] URL对象解析结果: ${JSON.stringify(params)}`);
}
// 如果至少找到一个UTM参数则返回
if (params.utm_source || params.utm_medium || params.utm_campaign ||
params.utm_term || params.utm_content) {
return params;
}
} catch (_err) {
if (debug) {
console.log(`[UTM提取] URL对象解析失败尝试正则表达式`);
}
}
// 方法2: 使用正则表达式提取参数
// 使用正则表达式(最安全的方法,适用于任何格式)
const sourceMatch = url.match(/[?&]utm_source=([^&#]+)/i);
if (sourceMatch && sourceMatch[1]) {
try {
params.utm_source = decodeURIComponent(sourceMatch[1]);
} catch (_) {
params.utm_source = sourceMatch[1];
}
}
const mediumMatch = url.match(/[?&]utm_medium=([^&#]+)/i);
if (mediumMatch && mediumMatch[1]) {
try {
params.utm_medium = decodeURIComponent(mediumMatch[1]);
} catch (_) {
params.utm_medium = mediumMatch[1];
}
}
const campaignMatch = url.match(/[?&]utm_campaign=([^&#]+)/i);
if (campaignMatch && campaignMatch[1]) {
try {
params.utm_campaign = decodeURIComponent(campaignMatch[1]);
} catch (_) {
params.utm_campaign = campaignMatch[1];
}
}
const termMatch = url.match(/[?&]utm_term=([^&#]+)/i);
if (termMatch && termMatch[1]) {
try {
params.utm_term = decodeURIComponent(termMatch[1]);
} catch (_) {
params.utm_term = termMatch[1];
}
}
const contentMatch = url.match(/[?&]utm_content=([^&#]+)/i);
if (contentMatch && contentMatch[1]) {
try {
params.utm_content = decodeURIComponent(contentMatch[1]);
} catch (_) {
params.utm_content = contentMatch[1];
}
}
if (debug) {
console.log(`[UTM提取] 正则表达式解析结果: ${JSON.stringify(params)}`);
}
return params;
}
export async function main(
batch_size = 1000,
max_records = 9999999,
@@ -66,7 +187,8 @@ export async function main(
skip_clickhouse_check = false,
force_insert = false,
database_override = "shorturl_analytics", // 添加数据库名称参数默认为shorturl_analytics
reset_sync_state = false // 添加参数用于重置同步状态
reset_sync_state = false, // 添加参数用于重置同步状态
debug_utm = false // 添加参数控制UTM调试日志输出
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
@@ -84,6 +206,9 @@ export async function main(
if (reset_sync_state) {
logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据");
}
if (debug_utm) {
logWithTimestamp("已启用UTM参数调试日志");
}
// 设置超时
const startTime = Date.now();
@@ -407,42 +532,64 @@ export async function main(
}).toArray();
// 创建映射用于快速查找 - 新增代码
const shortLinksMap = new Map(shortLinks.map(link => [link._id.toString(), link]));
const shortLinksMap = new Map(shortLinks.map((link: ShortRecord) => [link._id.toString(), link]));
logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息`);
// 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => {
const eventTime = new Date(record.createTime);
// 获取对应的短链接信息 - 新增代码
const shortLink = shortLinksMap.get(record.slugId.toString());
const shortLink = shortLinksMap.get(record.slugId.toString()) as ShortRecord | undefined;
// 提取URL中的UTM参数 - 增加调试日志
if (debug_utm && record.url) {
logWithTimestamp(`======== UTM参数调试 ========`);
logWithTimestamp(`记录ID: ${record._id.toString()}`);
logWithTimestamp(`原始URL: ${record.url}`);
}
const utmParams = extractUtmParams(record.url || "", debug_utm);
if (debug_utm) {
logWithTimestamp(`提取的UTM参数: ${JSON.stringify(utmParams)}`);
logWithTimestamp(`===========================`);
}
// 保存提取的UTM参数和URL到event_attributes
const eventAttributes = {
mongo_id: record._id.toString(),
url: record.url || "",
...(record.url ? { raw_url: record.url } : {})
};
// 转换MongoDB记录为ClickHouse格式匹配ClickHouse表结构
return {
// UUID将由ClickHouse自动生成 (event_id)
event_time: eventTime.toISOString().replace('T', ' ').replace('Z', ''),
event_type: record.type === 1 ? "visit" : "custom",
event_attributes: `{"mongo_id":"${record._id.toString()}"}`,
event_attributes: JSON.stringify(eventAttributes),
link_id: record.slugId.toString(),
link_slug: shortLink?.slug || "", // 新增: 从short获取slug
link_slug: shortLink?.slug || "",
link_label: record.label || "",
link_title: shortLink?.title || "", // 新增: 从short获取标题
link_original_url: shortLink?.origin || "", // 新增: 从short获取原始URL
link_attributes: JSON.stringify({ domain: shortLink?.domain || null }), // 新增: 从short获取域名信息
link_title: shortLink?.title || "",
link_original_url: shortLink?.origin || "",
link_attributes: JSON.stringify({ domain: shortLink?.domain || null }),
link_created_at: shortLink?.createTime
? new Date(shortLink.createTime).toISOString().replace('T', ' ').replace('Z', '')
: eventTime.toISOString().replace('T', ' ').replace('Z', ''), // 新增: 使用真实的创建时间
: eventTime.toISOString().replace('T', ' ').replace('Z', ''),
link_expires_at: shortLink?.expiresAt
? new Date(shortLink.expiresAt).toISOString().replace('T', ' ').replace('Z', '')
: null, // 新增: 使用真实的过期时间
link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]", // 新增: 从short获取标签
user_id: shortLink?.user || "", // 新增: 从short获取用户ID
: null,
link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]",
user_id: shortLink?.user || "",
user_name: "",
user_email: "",
user_attributes: "{}",
team_id: shortLink?.teamId || "", // 新增: 从short获取团队ID
team_id: shortLink?.teamId || "",
team_name: "",
team_attributes: "{}",
project_id: shortLink?.projectId || "", // 新增: 从short获取项目ID
project_id: shortLink?.projectId || "",
project_name: "",
project_attributes: "{}",
qr_code_id: "",
@@ -458,11 +605,11 @@ export async function main(
os: record.platformOS || "",
user_agent: record.browser + " " + record.browserVersion,
referrer: record.url || "",
utm_source: "",
utm_medium: "",
utm_campaign: "",
utm_term: "",
utm_content: "",
utm_source: utmParams.utm_source,
utm_medium: utmParams.utm_medium,
utm_campaign: utmParams.utm_campaign,
utm_term: utmParams.utm_term,
utm_content: utmParams.utm_content,
time_spent_sec: 0,
is_bounce: true,
is_qr_scan: false,
@@ -582,6 +729,17 @@ export async function main(
if (records.length > 1) {
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`);
}
// 如果开启了调试输出一些URL样本
if (debug_utm) {
const sampleSize = Math.min(5, records.length);
logWithTimestamp(`URL样本 (前${sampleSize}条):`);
for (let i = 0; i < sampleSize; i++) {
if (records[i].url) {
logWithTimestamp(`样本 ${i+1}: ${records[i].url}`);
}
}
}
}
const batchSize = await processRecords(records);