diff --git a/app/api/events/track/readme.md b/app/api/events/track/readme.md new file mode 100644 index 0000000..b48b51b --- /dev/null +++ b/app/api/events/track/readme.md @@ -0,0 +1,175 @@ + +# 事件跟踪接口说明 + +## 概述 +该接口用于跟踪用户交互事件并将数据存储到 ClickHouse 数据库中。支持记录各种类型的事件,并可包含与链接、用户、团队、项目等相关的详细信息。 + +## 接口信息 +- **URL**: `/api/events/track` +- **方法**: `POST` +- **Content-Type**: `application/json` + +## 请求参数 + +### 必填字段 +| 参数 | 类型 | 描述 | +|------|------|------| +| `event_type` | string | 事件类型,如 'click', 'view', 'conversion' | + +### 核心事件字段 +| 参数 | 类型 | 必填 | 描述 | +|------|------|------|------| +| `event_id` | string | 否 | 事件唯一标识符,不提供时自动生成UUID | +| `event_time` | string/Date | 否 | 事件发生时间,格式为ISO日期字符串,默认为当前时间 | +| `event_attributes` | object/string | 否 | 事件相关的其他属性,可以是JSON对象或JSON字符串 | + +### 链接信息 +| 参数 | 类型 | 必填 | 描述 | +|------|------|------|------| +| `link_id` | string | 否 | 短链接的唯一ID | +| `link_slug` | string | 否 | 短链接的slug部分 | +| `link_label` | string | 否 | 短链接的显示名称 | +| `link_title` | string | 否 | 短链接的标题 | +| `link_original_url` | string | 否 | 原始目标URL | +| `link_attributes` | object/string | 否 | 链接相关的额外属性 | +| `link_created_at` | string/Date | 否 | 链接创建时间 | +| `link_expires_at` | string/Date | 否 | 链接过期时间 | +| `link_tags` | array/string | 否 | 链接标签,可以是数组或JSON字符串 | + +### 用户信息 +| 参数 | 类型 | 必填 | 描述 | +|------|------|------|------| +| `user_id` | string | 否 | 用户ID | +| `user_name` | string | 否 | 用户名称 | +| `user_email` | string | 否 | 用户邮箱 | +| `user_attributes` | object/string | 否 | 用户相关的其他属性 | + +### 团队和项目信息 +| 参数 | 类型 | 必填 | 描述 | +|------|------|------|------| +| `team_id` | string | 否 | 团队ID | +| `team_name` | string | 否 | 团队名称 | +| `team_attributes` | object/string | 否 | 团队相关的其他属性 | +| `project_id` | string | 否 | 项目ID | +| `project_name` | string | 否 | 项目名称 | +| `project_attributes` | object/string | 否 | 项目相关的其他属性 | + +### 二维码信息 +| 参数 | 类型 | 必填 | 描述 | +|------|------|------|------| +| `qr_code_id` | string | 否 | 二维码ID | +| `qr_code_name` | string | 否 | 二维码名称 | +| `qr_code_attributes` | object/string | 否 | 二维码相关的其他属性 | + +### 访问者信息 +| 参数 | 类型 | 必填 | 描述 | +|------|------|------|------| +| `visitor_id` | string | 否 | 访问者唯一标识符,不提供时自动生成 | +| `session_id` | string | 否 | 会话ID,不提供时自动生成 | +| `ip_address` | string | 否 | 访问者IP地址,默认从请求头获取 | +| `country` | string | 否 | 访问者所在国家 | +| `city` | string | 否 | 访问者所在城市 | +| `device_type` | string | 否 | 设备类型 (如 desktop, mobile, tablet) | +| `browser` | string | 否 | 浏览器名称 | +| `os` | string | 否 | 操作系统 | +| `user_agent` | string | 否 | 用户代理字符串,默认从请求头获取 | + +### 引荐来源信息 +| 参数 | 类型 | 必填 | 描述 | +|------|------|------|------| +| `referrer` | string | 否 | 引荐URL,默认从请求头获取 | +| `utm_source` | string | 否 | UTM来源参数 | +| `utm_medium` | string | 否 | UTM媒介参数 | +| `utm_campaign` | string | 否 | UTM活动参数 | +| `utm_term` | string | 否 | UTM术语参数 | +| `utm_content` | string | 否 | UTM内容参数 | + +### 交互信息 +| 参数 | 类型 | 必填 | 描述 | +|------|------|------|------| +| `time_spent_sec` | number | 否 | 用户在页面上停留的时间(秒),默认0 | +| `is_bounce` | boolean | 否 | 是否是跳出(只访问一个页面),默认true | +| `is_qr_scan` | boolean | 否 | 是否来自二维码扫描,默认false | +| `conversion_type` | string | 否 | 转化类型 | +| `conversion_value` | number | 否 | 转化价值,默认0 | + +## 响应格式 + +### 成功响应 (201 Created) +```json +{ + "success": true, + "message": "Event tracked successfully", + "event_id": "uuid-of-tracked-event" +} +``` + +### 错误响应 + +#### 缺少必填字段 (400 Bad Request) +```json +{ + "error": "Missing required field: event_type" +} +``` + +#### 服务器错误 (500 Internal Server Error) +```json +{ + "error": "Failed to track event", + "details": "具体错误信息" +} +``` + +## 使用示例 + +### 基本事件跟踪请求 +```javascript +fetch('/api/events/track', { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ + event_type: 'click', + link_id: 'abc123', + link_slug: 'promo-summer', + link_original_url: 'https://example.com/summer-promotion' + }) +}) +``` + +### 详细事件跟踪请求 +```javascript +fetch('/api/events/track', { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ + event_type: 'conversion', + link_id: 'abc123', + link_slug: 'promo-summer', + link_original_url: 'https://example.com/summer-promotion', + event_attributes: { + page: '/checkout', + product_id: 'xyz789' + }, + user_id: 'user123', + team_id: 'team456', + project_id: 'proj789', + visitor_id: 'vis987', + is_bounce: false, + time_spent_sec: 120, + conversion_type: 'purchase', + conversion_value: 99.99, + utm_source: 'email', + utm_campaign: 'summer_sale' + }) +}) +``` + +## 注意事项 +- 所有对象类型的字段(如 `event_attributes`)可以作为对象或预先格式化的JSON字符串传递 +- 如果不提供 `event_id`、`visitor_id` 或 `session_id`,系统将自动生成 +- 时间戳字段接受ISO格式的日期字符串,并会被转换为ClickHouse兼容的格式 diff --git a/scripts/db/sync_mongo_to_events.ts b/scripts/db/sync_mongo_to_events.ts deleted file mode 100644 index bc91133..0000000 --- a/scripts/db/sync_mongo_to_events.ts +++ /dev/null @@ -1,364 +0,0 @@ -// Sync data from MongoDB trace table to ClickHouse events table -import { getVariable } from "npm:windmill-client@1"; -import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; - -interface MongoConfig { - host: string; - port: string; - db: string; - username: string; - password: string; -} - -interface ClickHouseConfig { - clickhouse_host: string; - clickhouse_port: number; - clickhouse_user: string; - clickhouse_password: string; - clickhouse_database: string; - clickhouse_url: string; -} - -interface TraceRecord { - _id: ObjectId; - slugId: ObjectId; - label: string | null; - ip: string; - type: number; - platform: string; - platformOS: string; - browser: string; - browserVersion: string; - url: string; - createTime: number; -} - -export async function main( - batch_size = 1000, - max_records = 9999999, - timeout_minutes = 60, - skip_clickhouse_check = false, - force_insert = false -) { - const logWithTimestamp = (message: string) => { - const now = new Date(); - console.log(`[${now.toISOString()}] ${message}`); - }; - - logWithTimestamp("Starting sync from MongoDB to ClickHouse events table"); - logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`); - - // Set timeout - const startTime = Date.now(); - const timeoutMs = timeout_minutes * 60 * 1000; - - const checkTimeout = () => { - if (Date.now() - startTime > timeoutMs) { - console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`); - return true; - } - return false; - }; - - // Get MongoDB and ClickHouse connection info - let mongoConfig: MongoConfig; - let clickhouseConfig: ClickHouseConfig; - - try { - const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); - mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig; - - const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); - clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig; - } catch (error) { - console.error("Failed to get config:", error); - throw error; - } - - // Build MongoDB connection URL - let mongoUrl = "mongodb://"; - if (mongoConfig.username && mongoConfig.password) { - mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; - } - mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; - - // Connect to MongoDB - const client = new MongoClient(); - try { - await client.connect(mongoUrl); - console.log("MongoDB connected successfully"); - - const db = client.database(mongoConfig.db); - const traceCollection = db.collection("trace"); - - // Build query conditions - const query: Record = { - type: 1 // Only sync records with type 1 - }; - - // Count total records - const totalRecords = await traceCollection.countDocuments(query); - console.log(`Found ${totalRecords} records to sync`); - - const recordsToProcess = Math.min(totalRecords, max_records); - console.log(`Will process ${recordsToProcess} records`); - - if (totalRecords === 0) { - console.log("No records to sync, task completed"); - return { - success: true, - records_synced: 0, - message: "No records to sync" - }; - } - - // Check ClickHouse connection - const checkClickHouseConnection = async (): Promise => { - if (skip_clickhouse_check) { - logWithTimestamp("Skipping ClickHouse connection check"); - return true; - } - - try { - logWithTimestamp("Testing ClickHouse connection..."); - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, - }, - body: "SELECT 1", - signal: AbortSignal.timeout(5000) - }); - - if (response.ok) { - logWithTimestamp("ClickHouse connection test successful"); - return true; - } else { - const errorText = await response.text(); - logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`); - return false; - } - } catch (err) { - logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`); - return false; - } - }; - - // Check if records exist in ClickHouse - const checkExistingRecords = async (records: TraceRecord[]): Promise => { - if (records.length === 0) return []; - - if (skip_clickhouse_check || force_insert) { - logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`); - return records; - } - - try { - const recordIds = records.map(record => record._id.toString()); - - const query = ` - SELECT event_id - FROM ${clickhouseConfig.clickhouse_database}.events - WHERE event_attributes LIKE '%"mongo_id":"%' - AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%') - FORMAT JSON - `; - - const response = await fetch(clickhouseConfig.clickhouse_url, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` - }, - body: query, - signal: AbortSignal.timeout(10000) - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse query error: ${response.status} ${errorText}`); - } - - const result = await response.json(); - const existingIds = new Set(result.data.map((row: any) => { - const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/); - return matches ? matches[1] : null; - }).filter(Boolean)); - - return records.filter(record => !existingIds.has(record._id.toString())); - } catch (err) { - logWithTimestamp(`Error checking existing records: ${(err as Error).message}`); - return skip_clickhouse_check ? records : []; - } - }; - - // Process records function - const processRecords = async (records: TraceRecord[]) => { - if (records.length === 0) return 0; - - const newRecords = await checkExistingRecords(records); - if (newRecords.length === 0) { - logWithTimestamp("All records already exist, skipping"); - return 0; - } - - // Prepare ClickHouse insert data - const clickhouseData = newRecords.map(record => { - const eventTime = new Date(record.createTime).toISOString(); - return { - event_time: eventTime, - event_type: "click", - event_attributes: JSON.stringify({ - mongo_id: record._id.toString(), - original_type: record.type - }), - - // Link information - link_id: record.slugId.toString(), - link_slug: "", - link_label: record.label || "", - link_title: "", - link_original_url: record.url || "", - link_attributes: "{}", - link_created_at: eventTime, - link_expires_at: null, - link_tags: "[]", - - // User information (empty as not available in trace) - user_id: "", - user_name: "", - user_email: "", - user_attributes: "{}", - - // Team information (empty as not available in trace) - team_id: "", - team_name: "", - team_attributes: "{}", - - // Project information (empty as not available in trace) - project_id: "", - project_name: "", - project_attributes: "{}", - - // QR code information (empty as not available in trace) - qr_code_id: "", - qr_code_name: "", - qr_code_attributes: "{}", - - // Visitor information - visitor_id: record._id.toString(), - session_id: `${record._id.toString()}-${record.createTime}`, - ip_address: record.ip || "", - country: "", - city: "", - device_type: record.platform || "unknown", - browser: record.browser || "", - os: record.platformOS || "", - user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(), - - // Source information - referrer: record.url || "", - utm_source: "", - utm_medium: "", - utm_campaign: "", - - // Interaction information - time_spent_sec: 0, - is_bounce: true, - is_qr_scan: false, - conversion_type: "visit", - conversion_value: 0 - }; - }); - - // Generate ClickHouse insert SQL - const insertSQL = ` - INSERT INTO ${clickhouseConfig.clickhouse_database}.events - FORMAT JSONEachRow - ${JSON.stringify(clickhouseData)} - `; - - try { - const response = await fetch(clickhouseConfig.clickhouse_url, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` - }, - body: insertSQL, - signal: AbortSignal.timeout(20000) - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`); - } - - logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`); - return newRecords.length; - } catch (err) { - logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`); - throw err; - } - }; - - // Check ClickHouse connection before processing - const clickhouseConnected = await checkClickHouseConnection(); - if (!clickhouseConnected && !skip_clickhouse_check) { - throw new Error("ClickHouse connection failed, cannot continue sync"); - } - - // Process records in batches - let processedRecords = 0; - let totalBatchRecords = 0; - - for (let page = 0; processedRecords < recordsToProcess; page++) { - if (checkTimeout()) { - logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`); - break; - } - - logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`); - - const records = await traceCollection.find( - query, - { - allowDiskUse: true, - sort: { createTime: 1 }, - skip: page * batch_size, - limit: batch_size - } - ).toArray(); - - if (records.length === 0) { - logWithTimestamp("No more records found, sync complete"); - break; - } - - const batchSize = await processRecords(records); - processedRecords += records.length; - totalBatchRecords += batchSize; - - logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`); - } - - return { - success: true, - records_processed: processedRecords, - records_synced: totalBatchRecords, - message: "Data sync completed" - }; - } catch (err) { - console.error("Error during sync:", err); - return { - success: false, - error: err instanceof Error ? err.message : String(err), - stack: err instanceof Error ? err.stack : undefined - }; - } finally { - await client.close(); - console.log("MongoDB connection closed"); - } -} \ No newline at end of file diff --git a/windmill/sync_mongo_to_events.ts b/windmill/sync_mongo_to_events.ts index 38522c7..6cb2811 100644 --- a/windmill/sync_mongo_to_events.ts +++ b/windmill/sync_mongo_to_events.ts @@ -1,5 +1,5 @@ -// Sync data from MongoDB trace table to ClickHouse events table -import { getVariable } from "npm:windmill-client@1"; +// 从MongoDB的trace表同步数据到ClickHouse的events表 +import { getVariable, setVariable } from "npm:windmill-client@1"; import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; interface MongoConfig { @@ -15,6 +15,7 @@ interface ClickHouseConfig { clickhouse_port: number; clickhouse_user: string; clickhouse_password: string; + clickhouse_database: string; clickhouse_url: string; } @@ -32,25 +33,10 @@ interface TraceRecord { createTime: number; } -interface ShortRecord { - _id: ObjectId; - slug: string; // 短链接的slug部分 - origin: string; // 原始URL - domain?: string; // 域名 - createTime: number; // 创建时间戳 - user?: string; // 创建用户 - title?: string; // 标题 - description?: string; // 描述 - tags?: string[]; // 标签 - active?: boolean; // 是否活跃 - expiresAt?: number; // 过期时间戳 - teamId?: string; // 团队ID - projectId?: string; // 项目ID -} - -interface ClickHouseRow { - event_id: string; - event_attributes: string; +interface SyncState { + last_sync_time: number; + records_synced: number; + last_sync_id?: string; } export async function main( @@ -58,90 +44,138 @@ export async function main( max_records = 9999999, timeout_minutes = 60, skip_clickhouse_check = false, - force_insert = false + force_insert = false, + database_override = "shorturl_analytics" // 添加数据库名称参数,默认为shorturl_analytics ) { const logWithTimestamp = (message: string) => { const now = new Date(); console.log(`[${now.toISOString()}] ${message}`); }; - logWithTimestamp("Starting sync from MongoDB to ClickHouse events table"); - logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`); + logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); + logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); + if (skip_clickhouse_check) { + logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); + } + if (force_insert) { + logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); + } - // Set timeout + // 设置超时 const startTime = Date.now(); const timeoutMs = timeout_minutes * 60 * 1000; + // 检查是否超时 const checkTimeout = () => { if (Date.now() - startTime > timeoutMs) { - console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`); + console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); return true; } return false; }; - // Get MongoDB and ClickHouse connection info + // 获取MongoDB和ClickHouse的连接信息 let mongoConfig: MongoConfig; let clickhouseConfig: ClickHouseConfig; try { const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); - mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig; + console.log("原始MongoDB配置:", JSON.stringify(rawMongoConfig)); + + // 尝试解析配置,如果是字符串形式 + if (typeof rawMongoConfig === "string") { + try { + mongoConfig = JSON.parse(rawMongoConfig); + } catch (e) { + console.error("MongoDB配置解析失败:", e); + throw e; + } + } else { + mongoConfig = rawMongoConfig as MongoConfig; + } const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); - clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig; + console.log("原始ClickHouse配置:", JSON.stringify(rawClickhouseConfig)); + + // 尝试解析配置,如果是字符串形式 + if (typeof rawClickhouseConfig === "string") { + try { + clickhouseConfig = JSON.parse(rawClickhouseConfig); + } catch (e) { + console.error("ClickHouse配置解析失败:", e); + throw e; + } + } else { + clickhouseConfig = rawClickhouseConfig as ClickHouseConfig; + } + + // 检查并修复数据库配置 + if (!clickhouseConfig.clickhouse_database || clickhouseConfig.clickhouse_database === "undefined") { + logWithTimestamp(`⚠️ 警告: 数据库名称未定义或为'undefined',使用提供的默认值: ${database_override}`); + clickhouseConfig.clickhouse_database = database_override; + } + + console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig)); + console.log("ClickHouse配置解析为:", JSON.stringify({ + ...clickhouseConfig, + clickhouse_password: "****" // 隐藏密码 + })); + + logWithTimestamp(`将使用ClickHouse数据库: ${clickhouseConfig.clickhouse_database}`); } catch (error) { - console.error("Failed to get config:", error); + console.error("获取配置失败:", error); throw error; } - // Build MongoDB connection URL + // 构建MongoDB连接URL let mongoUrl = "mongodb://"; if (mongoConfig.username && mongoConfig.password) { mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; } mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; - // Connect to MongoDB + console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); + + // 连接MongoDB const client = new MongoClient(); try { await client.connect(mongoUrl); - console.log("MongoDB connected successfully"); + console.log("MongoDB连接成功"); const db = client.database(mongoConfig.db); const traceCollection = db.collection("trace"); - const shortCollection = db.collection("short"); - // Build query conditions + // 构建查询条件,获取所有记录 const query: Record = { - type: 1 // Only sync records with type 1 + type: 1 // 只同步type为1的记录 }; - // Count total records + // 计算总记录数 const totalRecords = await traceCollection.countDocuments(query); - console.log(`Found ${totalRecords} records to sync`); + console.log(`找到 ${totalRecords} 条记录需要同步`); + // 限制此次处理的记录数量 const recordsToProcess = Math.min(totalRecords, max_records); - console.log(`Will process ${recordsToProcess} records`); + console.log(`本次将处理 ${recordsToProcess} 条记录`); if (totalRecords === 0) { - console.log("No records to sync, task completed"); + console.log("没有记录需要同步,任务完成"); return { success: true, records_synced: 0, - message: "No records to sync" + message: "没有记录需要同步" }; } - // Check ClickHouse connection + // 检查ClickHouse连接状态 const checkClickHouseConnection = async (): Promise => { if (skip_clickhouse_check) { - logWithTimestamp("Skipping ClickHouse connection check"); + logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); return true; } try { - logWithTimestamp("Testing ClickHouse connection..."); + logWithTimestamp("测试ClickHouse连接..."); const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; const response = await fetch(clickhouseUrl, { method: "POST", @@ -149,45 +183,61 @@ export async function main( "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, }, - body: "SELECT 1", + body: `SELECT 1 FROM ${clickhouseConfig.clickhouse_database}.events LIMIT 1`, + // 设置5秒超时 signal: AbortSignal.timeout(5000) }); if (response.ok) { - logWithTimestamp("ClickHouse connection test successful"); + logWithTimestamp("ClickHouse连接测试成功"); return true; } else { const errorText = await response.text(); - logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`); + logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); return false; } } catch (err) { - logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`); + const error = err as Error; + logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`); return false; } }; - // Check if records exist in ClickHouse + // 检查记录是否已经存在于ClickHouse中 const checkExistingRecords = async (records: TraceRecord[]): Promise => { if (records.length === 0) return []; + // 如果跳过ClickHouse检查或强制插入,则直接返回所有记录 if (skip_clickhouse_check || force_insert) { - logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`); + logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`); return records; } + logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`); + try { - const recordIds = records.map(record => record._id.toString()); + // 验证数据库名称 + if (!clickhouseConfig.clickhouse_database || clickhouseConfig.clickhouse_database === "undefined") { + throw new Error("数据库名称未定义或无效,请检查配置"); + } + // 提取所有记录的ID + const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询 + logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`); + + // 构建查询SQL,检查记录是否已存在,确保添加FORMAT JSON来获取正确的JSON格式响应 const query = ` - SELECT event_id - FROM shorturl_analytics.events - WHERE event_attributes LIKE '%"mongo_id":"%' - AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%') + SELECT link_id, visitor_id + FROM ${clickhouseConfig.clickhouse_database}.events + WHERE link_id IN ('${recordIds.join("','")}') FORMAT JSON `; - const response = await fetch(clickhouseConfig.clickhouse_url, { + logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`); + + // 发送请求到ClickHouse,添加10秒超时 + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + const response = await fetch(clickhouseUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", @@ -199,134 +249,195 @@ export async function main( if (!response.ok) { const errorText = await response.text(); - throw new Error(`ClickHouse query error: ${response.status} ${errorText}`); + throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); } - const result = await response.json(); - const existingIds = new Set(result.data.map((row: ClickHouseRow) => { - const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/); - return matches ? matches[1] : null; - }).filter(Boolean)); + // 获取响应文本以便记录 + const responseText = await response.text(); + logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`); - return records.filter(record => !existingIds.has(record._id.toString())); + if (!responseText.trim()) { + logWithTimestamp("ClickHouse返回空响应,假定没有记录存在"); + return records; // 如果响应为空,假设没有记录 + } + + // 解析结果 + let result; + try { + result = JSON.parse(responseText); + } catch (err) { + logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`); + throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`); + } + + // 确保result有正确的结构 + if (!result.data) { + logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`); + return records; // 如果没有data字段,假设没有记录 + } + + // 提取已存在的记录ID + const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id)); + + logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); + if (existingIds.size > 0) { + logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`); + } + + // 过滤出不存在的记录 + const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id + logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); + + return newRecords; } catch (err) { - logWithTimestamp(`Error checking existing records: ${(err as Error).message}`); - return skip_clickhouse_check ? records : []; + const error = err as Error; + logWithTimestamp(`ClickHouse查询出错: ${error.message}`); + if (skip_clickhouse_check) { + logWithTimestamp("已启用跳过ClickHouse检查,将继续处理所有记录"); + return records; + } else { + throw error; // 如果没有启用跳过检查,则抛出错误 + } } }; - // Process records function + // 在处理记录前先检查ClickHouse连接 + const clickhouseConnected = await checkClickHouseConnection(); + if (!clickhouseConnected && !skip_clickhouse_check) { + logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); + throw new Error("ClickHouse连接失败,无法继续同步"); + } + + // 处理记录的函数 const processRecords = async (records: TraceRecord[]) => { if (records.length === 0) return 0; - const newRecords = await checkExistingRecords(records); + logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); + + // 检查记录是否已存在 + let newRecords; + try { + newRecords = await checkExistingRecords(records); + } catch (err) { + const error = err as Error; + logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); + if (!skip_clickhouse_check && !force_insert) { + throw error; + } + // 如果跳过检查或强制插入,则使用所有记录 + logWithTimestamp("将使用所有记录进行处理"); + newRecords = records; + } + if (newRecords.length === 0) { - logWithTimestamp("All records already exist, skipping"); + logWithTimestamp("所有记录都已存在,跳过处理"); return 0; } - // Get link information for all records - const slugIds = newRecords.map(record => record.slugId); - const shortLinks = await shortCollection.find({ - _id: { $in: slugIds } - }).toArray(); - - // Create a map for quick lookup - const shortLinksMap = new Map(shortLinks.map(link => [link._id.toString(), link])); + logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`); - // Prepare ClickHouse insert data + // 准备ClickHouse插入数据 const clickhouseData = newRecords.map(record => { - const shortLink = shortLinksMap.get(record.slugId.toString()); - - // 将毫秒时间戳转换为 DateTime64(3) 格式 - const formatDateTime = (timestamp: number) => { - return new Date(timestamp).toISOString().replace('T', ' ').replace('Z', ''); - }; - + const eventTime = new Date(record.createTime); + // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 return { - // Event base information - event_id: record._id.toString(), - event_time: formatDateTime(record.createTime), - event_type: "click", - event_attributes: JSON.stringify({ - original_type: record.type - }), - - // Link information from short collection + // UUID将由ClickHouse自动生成 (event_id) + event_time: eventTime.toISOString().replace('T', ' ').replace('Z', ''), + event_type: record.type === 1 ? "visit" : "custom", + event_attributes: `{"mongo_id":"${record._id.toString()}"}`, link_id: record.slugId.toString(), - link_slug: shortLink?.slug || "", + link_slug: "", // 这些字段可能需要从其他表获取 link_label: record.label || "", link_title: "", - link_original_url: shortLink?.origin || "", - link_attributes: JSON.stringify({ - domain: shortLink?.domain || null - }), - link_created_at: shortLink?.createTime ? formatDateTime(shortLink.createTime) : formatDateTime(record.createTime), - link_expires_at: shortLink?.expiresAt ? formatDateTime(shortLink.expiresAt) : null, - link_tags: "[]", // Empty array as default - - // User information - user_id: shortLink?.user || "", + link_original_url: "", + link_attributes: "{}", + link_created_at: eventTime.toISOString().replace('T', ' ').replace('Z', ''), // 暂用访问时间代替,可能需要从其他表获取 + link_expires_at: null, + link_tags: "[]", + user_id: "", user_name: "", user_email: "", user_attributes: "{}", - - // Team information - team_id: shortLink?.teamId || "", + team_id: "", team_name: "", team_attributes: "{}", - - // Project information - project_id: shortLink?.projectId || "", + project_id: "", project_name: "", project_attributes: "{}", - - // QR code information qr_code_id: "", qr_code_name: "", qr_code_attributes: "{}", - - // Visitor information - visitor_id: "", // Empty string as default - session_id: `${record.slugId.toString()}-${record.createTime}`, - ip_address: record.ip || "", - country: "", + visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID + session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID + ip_address: record.ip, + country: "", // 这些字段在MongoDB中不存在,使用默认值 city: "", - device_type: record.platform || "", + device_type: record.platform || "unknown", browser: record.browser || "", os: record.platformOS || "", - user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(), - - // Source information + user_agent: record.browser + " " + record.browserVersion, referrer: record.url || "", utm_source: "", utm_medium: "", utm_campaign: "", - - // Interaction information + utm_term: "", + utm_content: "", time_spent_sec: 0, is_bounce: true, is_qr_scan: false, conversion_type: "visit", - conversion_value: 0 + conversion_value: 0, + req_full_path: record.url || "" }; }); - // Generate ClickHouse insert SQL - const rows = clickhouseData.map(row => { - // 只需要处理JSON字符串的转义 - const formattedRow = { - ...row, - event_attributes: row.event_attributes.replace(/\\/g, '\\\\'), - link_attributes: row.link_attributes.replace(/\\/g, '\\\\') - }; - return JSON.stringify(formattedRow); - }).join('\n'); + // 生成ClickHouse插入SQL + const insertSQL = ` + INSERT INTO ${clickhouseConfig.clickhouse_database}.events + (event_time, event_type, event_attributes, link_id, link_slug, link_label, link_title, + link_original_url, link_attributes, link_created_at, link_expires_at, link_tags, + user_id, user_name, user_email, user_attributes, team_id, team_name, team_attributes, + project_id, project_name, project_attributes, qr_code_id, qr_code_name, qr_code_attributes, + visitor_id, session_id, ip_address, country, city, device_type, browser, os, user_agent, + referrer, utm_source, utm_medium, utm_campaign, utm_term, utm_content, time_spent_sec, + is_bounce, is_qr_scan, conversion_type, conversion_value, req_full_path) + VALUES ${clickhouseData.map(record => { + // 确保所有字符串值都是字符串类型,并安全处理替换 + const safeReplace = (val: unknown): string => { + // 确保值是字符串,如果是null或undefined则使用空字符串 + const str = val === null || val === undefined ? "" : String(val); + // 安全替换单引号 + return str.replace(/'/g, "''"); + }; + + return `('${record.event_time}', '${safeReplace(record.event_type)}', '${safeReplace(record.event_attributes)}', + '${record.link_id}', '${safeReplace(record.link_slug)}', '${safeReplace(record.link_label)}', '${safeReplace(record.link_title)}', + '${safeReplace(record.link_original_url)}', '${safeReplace(record.link_attributes)}', '${record.link_created_at}', + ${record.link_expires_at === null ? 'NULL' : `'${record.link_expires_at}'`}, '${safeReplace(record.link_tags)}', + '${safeReplace(record.user_id)}', '${safeReplace(record.user_name)}', '${safeReplace(record.user_email)}', + '${safeReplace(record.user_attributes)}', '${safeReplace(record.team_id)}', '${safeReplace(record.team_name)}', + '${safeReplace(record.team_attributes)}', '${safeReplace(record.project_id)}', '${safeReplace(record.project_name)}', + '${safeReplace(record.project_attributes)}', '${safeReplace(record.qr_code_id)}', '${safeReplace(record.qr_code_name)}', + '${safeReplace(record.qr_code_attributes)}', '${safeReplace(record.visitor_id)}', '${safeReplace(record.session_id)}', + '${safeReplace(record.ip_address)}', '${safeReplace(record.country)}', '${safeReplace(record.city)}', + '${safeReplace(record.device_type)}', '${safeReplace(record.browser)}', '${safeReplace(record.os)}', + '${safeReplace(record.user_agent)}', '${safeReplace(record.referrer)}', '${safeReplace(record.utm_source)}', + '${safeReplace(record.utm_medium)}', '${safeReplace(record.utm_campaign)}', '${safeReplace(record.utm_term)}', + '${safeReplace(record.utm_content)}', ${record.time_spent_sec}, ${record.is_bounce}, ${record.is_qr_scan}, + '${safeReplace(record.conversion_type)}', ${record.conversion_value}, '${safeReplace(record.req_full_path)}')`; + }).join(", ")} + `; - const insertSQL = `INSERT INTO shorturl_analytics.events FORMAT JSONEachRow\n${rows}`; + if (insertSQL.length === 0) { + console.log("没有新记录需要插入"); + return 0; + } + // 发送请求到ClickHouse,添加20秒超时 + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; try { - const response = await fetch(clickhouseConfig.clickhouse_url, { + logWithTimestamp("发送插入请求到ClickHouse..."); + const response = await fetch(clickhouseUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", @@ -338,35 +449,33 @@ export async function main( if (!response.ok) { const errorText = await response.text(); - throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`); + throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); } - logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`); + logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`); return newRecords.length; } catch (err) { - logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`); - throw err; + const error = err as Error; + logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`); + throw error; } }; - // Check ClickHouse connection before processing - const clickhouseConnected = await checkClickHouseConnection(); - if (!clickhouseConnected && !skip_clickhouse_check) { - throw new Error("ClickHouse connection failed, cannot continue sync"); - } - - // Process records in batches + // 批量处理记录 let processedRecords = 0; let totalBatchRecords = 0; for (let page = 0; processedRecords < recordsToProcess; page++) { + // 检查超时 if (checkTimeout()) { - logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`); + logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); break; } - logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`); + // 每批次都输出进度 + logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); const records = await traceCollection.find( query, { @@ -378,32 +487,43 @@ export async function main( ).toArray(); if (records.length === 0) { - logWithTimestamp("No more records found, sync complete"); + logWithTimestamp("没有找到更多数据,同步结束"); break; } + + // 找到数据,开始处理 + logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); + // 输出当前批次的部分数据信息 + if (records.length > 0) { + logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`); + if (records.length > 1) { + logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); + } + } const batchSize = await processRecords(records); processedRecords += records.length; totalBatchRecords += batchSize; - logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`); + logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); } return { success: true, records_processed: processedRecords, records_synced: totalBatchRecords, - message: "Data sync completed" + message: "数据同步完成" }; } catch (err) { - console.error("Error during sync:", err); + console.error("同步过程中发生错误:", err); return { success: false, error: err instanceof Error ? err.message : String(err), stack: err instanceof Error ? err.stack : undefined }; } finally { + // 关闭MongoDB连接 await client.close(); - console.log("MongoDB connection closed"); + console.log("MongoDB连接已关闭"); } -} \ No newline at end of file +} diff --git a/windmill/sync_shorturl_to_clickhouse_intime.ts b/windmill/sync_shorturl_to_clickhouse_intime.ts new file mode 100644 index 0000000..2d36197 --- /dev/null +++ b/windmill/sync_shorturl_to_clickhouse_intime.ts @@ -0,0 +1,660 @@ +// 文件名: sync_resource_relations.ts +// 描述: 此脚本用于同步PostgreSQL中资源关联数据到ClickHouse +// 作者: AI Assistant +// 创建日期: 2023-10-31 + +import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts"; +import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts"; + +// PostgreSQL配置接口 +interface PgConfig { + host: string; + port: number; + user: string; + password: string; + dbname?: string; + [key: string]: unknown; +} + +// ClickHouse配置接口 +interface ChConfig { + clickhouse_host: string; + clickhouse_port: number; + clickhouse_user: string; + clickhouse_password: string; + clickhouse_url?: string; +} + +// 资源相关接口定义 +interface TeamData { + team_id: string; + team_name: string; + team_description?: string; + project_id?: string; +} + +interface ProjectData { + project_id: string; + project_name: string; + project_description?: string; + assigned_at?: string; + resource_id?: string; +} + +interface TagData { + tag_id: string; + tag_name: string; + tag_type?: string; + created_at?: string; + resource_id?: string; +} + +interface FavoriteData { + favorite_id: string; + user_id: string; + first_name?: string; + last_name?: string; + email?: string; + created_at?: string; +} + +// 资源关联数据接口 +interface ResourceRelations { + resource_id: string; + teams?: TeamData[]; + projects?: ProjectData[]; + tags?: TagData[]; + favorites?: FavoriteData[]; + external_id?: string; + type?: string; + attributes?: Record; +} + +/** + * 同步PostgreSQL资源关联数据到ClickHouse + */ +export async function main( + params: { + /** 要同步的资源ID列表 */ + resource_ids: string[]; + /** 是否同步teams数据 */ + sync_teams?: boolean; + /** 是否同步projects数据 */ + sync_projects?: boolean; + /** 是否同步tags数据 */ + sync_tags?: boolean; + /** 是否同步favorites数据 */ + sync_favorites?: boolean; + /** 是否为测试模式(不执行实际更新) */ + dry_run?: boolean; + /** 是否显示详细日志 */ + verbose?: boolean; + } +) { + // 设置默认参数 + const resource_ids = params.resource_ids || []; + const sync_teams = params.sync_teams !== false; + const sync_projects = params.sync_projects !== false; + const sync_tags = params.sync_tags !== false; + const sync_favorites = params.sync_favorites !== false; + const dry_run = params.dry_run || false; + const verbose = params.verbose || false; + + if (resource_ids.length === 0) { + return { success: false, message: "至少需要提供一个资源ID" }; + } + + // 初始化日志函数 + const log = (message: string, isVerbose = false) => { + if (!isVerbose || verbose) { + console.log(message); + } + }; + + log(`开始同步资源关联数据: ${resource_ids.join(", ")}`); + log(`同步选项: teams=${sync_teams}, projects=${sync_projects}, tags=${sync_tags}, favorites=${sync_favorites}`, true); + + let pgPool: Pool | null = null; + + try { + // 1. 获取数据库配置 + log("获取PostgreSQL数据库配置...", true); + const pgConfig = await getResource('f/limq/postgresql') as PgConfig; + + // 2. 创建PostgreSQL连接池 + pgPool = new Pool({ + hostname: pgConfig.host, + port: pgConfig.port, + user: pgConfig.user, + password: pgConfig.password, + database: pgConfig.dbname || 'postgres' + }, 3); + + // 3. 获取需要更新的资源完整数据 + const resourcesData = await getResourcesWithRelations(pgPool, resource_ids, { + sync_teams, + sync_projects, + sync_tags, + sync_favorites + }, log); + + log(`成功获取 ${resourcesData.length} 个资源的关联数据`); + + if (resourcesData.length === 0) { + return { success: true, message: "没有找到需要更新的资源数据", updated: 0 }; + } + + // 4. 获取ClickHouse配置 + const chConfig = await getClickHouseConfig(); + + // 5. 对每个资源执行更新 + if (!dry_run) { + // 5a. 更新shorturl表数据 + const shorturlUpdated = await updateClickHouseShorturl(resourcesData, chConfig, log); + + // 5b. 更新events表数据 + const eventsUpdated = await updateClickHouseEvents(resourcesData, chConfig, log); + + return { + success: true, + message: "资源关联数据同步完成", + shorturl_updated: shorturlUpdated, + events_updated: eventsUpdated, + total_updated: shorturlUpdated + eventsUpdated + }; + } else { + log("测试模式: 不执行实际更新"); + if (resourcesData.length > 0) { + log("示例数据:"); + log(JSON.stringify(resourcesData[0], null, 2)); + } + return { success: true, dry_run: true, resources: resourcesData }; + } + } catch (error) { + const errorMessage = `同步过程中发生错误: ${(error as Error).message}`; + log(errorMessage); + if ((error as Error).stack) { + log(`错误堆栈: ${(error as Error).stack}`, true); + } + return { success: false, message: errorMessage }; + } finally { + if (pgPool) { + await pgPool.end(); + log("PostgreSQL连接池已关闭", true); + } + } +} + +/** + * 从PostgreSQL获取资源及其关联数据 + */ +async function getResourcesWithRelations( + pgPool: Pool, + resourceIds: string[], + options: { + sync_teams: boolean; + sync_projects: boolean; + sync_tags: boolean; + sync_favorites: boolean; + }, + log: (message: string, isVerbose?: boolean) => void +): Promise { + const client = await pgPool.connect(); + + try { + // 准备资源IDs参数 + const resourceIdsParam = resourceIds.map(id => `'${id}'`).join(','); + + // 1. 获取基本资源信息 + log(`获取资源基本信息: ${resourceIdsParam}`, true); + const resourcesQuery = ` + SELECT + r.id, + r.external_id, + r.type, + r.attributes, + r.schema_version, + r.created_at, + r.updated_at + FROM + limq.resources r + WHERE + r.id IN (${resourceIdsParam}) + AND r.deleted_at IS NULL + `; + + const resourcesResult = await client.queryObject(resourcesQuery); + + if (resourcesResult.rows.length === 0) { + log(`未找到有效的资源数据`, true); + return []; + } + + // 处理每个资源 + const enrichedResources: ResourceRelations[] = []; + + for (const resource of resourcesResult.rows) { + const resourceId = resource.id as string; + log(`处理资源ID: ${resourceId}`, true); + + // 初始化关联数据对象 + const relationData: ResourceRelations = { + resource_id: resourceId, + external_id: resource.external_id as string, + type: resource.type as string, + attributes: parseJsonField(resource.attributes) + }; + + // 2. 获取项目关联 + if (options.sync_projects) { + const projectsQuery = ` + SELECT + pr.resource_id, pr.project_id, + p.name as project_name, p.description as project_description, + pr.assigned_at + FROM + limq.project_resources pr + JOIN + limq.projects p ON pr.project_id = p.id + WHERE + pr.resource_id = $1 + AND p.deleted_at IS NULL + `; + + const projectsResult = await client.queryObject(projectsQuery, [resourceId]); + relationData.projects = projectsResult.rows as ProjectData[]; + log(`找到 ${projectsResult.rows.length} 个关联项目`, true); + } + + // 3. 获取标签关联 + if (options.sync_tags) { + const tagsQuery = ` + SELECT + rt.resource_id, rt.tag_id, rt.created_at, + t.name as tag_name, t.type as tag_type + FROM + limq.resource_tags rt + JOIN + limq.tags t ON rt.tag_id = t.id + WHERE + rt.resource_id = $1 + AND t.deleted_at IS NULL + `; + + const tagsResult = await client.queryObject(tagsQuery, [resourceId]); + relationData.tags = tagsResult.rows as TagData[]; + log(`找到 ${tagsResult.rows.length} 个关联标签`, true); + } + + // 4. 获取团队关联(通过项目) + if (options.sync_teams && relationData.projects && relationData.projects.length > 0) { + const projectIds = relationData.projects.map((p: ProjectData) => p.project_id); + + if (projectIds.length > 0) { + const teamsQuery = ` + SELECT + tp.team_id, tp.project_id, + t.name as team_name, t.description as team_description + FROM + limq.team_projects tp + JOIN + limq.teams t ON tp.team_id = t.id + WHERE + tp.project_id = ANY($1::uuid[]) + AND t.deleted_at IS NULL + `; + + const teamsResult = await client.queryObject(teamsQuery, [projectIds]); + relationData.teams = teamsResult.rows as TeamData[]; + log(`找到 ${teamsResult.rows.length} 个关联团队`, true); + } + } + + // 5. 获取收藏关联 + if (options.sync_favorites) { + const favoritesQuery = ` + SELECT + f.id as favorite_id, f.user_id, f.created_at, + u.first_name, u.last_name, u.email + FROM + limq.favorite f + JOIN + limq.users u ON f.user_id = u.id + WHERE + f.favoritable_id = $1 + AND f.favoritable_type = 'resource' + AND f.deleted_at IS NULL + `; + + const favoritesResult = await client.queryObject(favoritesQuery, [resourceId]); + relationData.favorites = favoritesResult.rows as FavoriteData[]; + log(`找到 ${favoritesResult.rows.length} 个收藏记录`, true); + } + + // 添加到结果集 + enrichedResources.push(relationData); + } + + return enrichedResources; + } finally { + client.release(); + } +} + +/** + * 更新ClickHouse中的shorturl表数据 + */ +async function updateClickHouseShorturl( + resources: ResourceRelations[], + chConfig: ChConfig, + log: (message: string, isVerbose?: boolean) => void +): Promise { + // 只处理类型为shorturl的资源 + const shorturls = resources.filter(r => r.type === 'shorturl'); + + if (shorturls.length === 0) { + log('没有找到shorturl类型的资源,跳过shorturl表更新'); + return 0; + } + + log(`准备更新 ${shorturls.length} 个shorturl资源`); + + let updatedCount = 0; + + // 检查ClickHouse中是否存在shorturl表 + const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.shorturl'); + + if (!tableExists) { + log('ClickHouse中未找到shorturl表,请先创建表'); + return 0; + } + + // 对每个资源执行更新 + for (const shorturl of shorturls) { + try { + // 格式化团队数据 + const teams = JSON.stringify(shorturl.teams || []); + + // 格式化项目数据 + const projects = JSON.stringify(shorturl.projects || []); + + // 格式化标签数据 + const tags = JSON.stringify((shorturl.tags || []).map((t: TagData) => ({ + tag_id: t.tag_id, + tag_name: t.tag_name, + tag_type: t.tag_type, + created_at: t.created_at + }))); + + // 格式化收藏数据 + const favorites = JSON.stringify((shorturl.favorites || []).map((f: FavoriteData) => ({ + favorite_id: f.favorite_id, + user_id: f.user_id, + user_name: `${f.first_name || ""} ${f.last_name || ""}`.trim(), + created_at: f.created_at + }))); + + // 尝试更新ClickHouse数据 + const updateQuery = ` + ALTER TABLE shorturl_analytics.shorturl + UPDATE + teams = '${escapeString(teams)}', + projects = '${escapeString(projects)}', + tags = '${escapeString(tags)}', + favorites = '${escapeString(favorites)}' + WHERE id = '${shorturl.resource_id}' + `; + + await executeClickHouseQuery(chConfig, updateQuery); + log(`更新shorturl完成: ${shorturl.resource_id}`, true); + updatedCount++; + + } catch (error) { + log(`更新shorturl ${shorturl.resource_id} 失败: ${(error as Error).message}`); + } + } + + return updatedCount; +} + +/** + * 更新ClickHouse中的events表数据 + */ +async function updateClickHouseEvents( + resources: ResourceRelations[], + chConfig: ChConfig, + log: (message: string, isVerbose?: boolean) => void +): Promise { + // 过滤出有external_id的资源 + const resourcesWithExternalId = resources.filter(r => r.external_id && r.external_id.trim() !== ''); + + if (resourcesWithExternalId.length === 0) { + log('没有找到具有external_id的资源,跳过events表更新'); + return 0; + } + + log(`准备更新events表中与 ${resourcesWithExternalId.length} 个外部ID相关的记录`); + + // 检查ClickHouse中是否存在events表 + const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.events'); + + if (!tableExists) { + log('ClickHouse中未找到events表,请先创建表'); + return 0; + } + + // 提取所有的external_id + const externalIds = resourcesWithExternalId.map(r => r.external_id).filter(Boolean) as string[]; + + // 构建资源数据映射(使用external_id作为键) + const resourceMapByExternalId = resourcesWithExternalId.reduce((map, resource) => { + if (resource.external_id) { + map[resource.external_id] = resource; + } + return map; + }, {} as Record); + + // 获取ClickHouse中相关资源的事件记录数量 + let updatedCount = 0; + + try { + // 格式化外部ID列表 + const formattedExternalIds = externalIds.map(id => `'${id}'`).join(', '); + + // 先查询是否有相关事件 + const countQuery = ` + SELECT COUNT(*) as count + FROM shorturl_analytics.events + WHERE event_id IN (${formattedExternalIds}) + `; + + const countResult = await executeClickHouseQuery(chConfig, countQuery); + const eventCount = parseInt(countResult.trim(), 10); + + if (eventCount === 0) { + // 尝试另一种查询方式 + const alternateCountQuery = ` + SELECT COUNT(*) as count + FROM shorturl_analytics.events + WHERE link_id IN (${formattedExternalIds}) + `; + + const alternateCountResult = await executeClickHouseQuery(chConfig, alternateCountQuery); + const alternateEventCount = parseInt(alternateCountResult.trim(), 10); + + if (alternateEventCount === 0) { + log('没有找到相关事件记录,跳过events表更新'); + log(`已尝试的匹配字段: event_id,link_id`, true); + return 0; + } else { + log(`找到 ${alternateEventCount} 条以link_id匹配的事件记录需要更新`); + } + } else { + log(`找到 ${eventCount} 条以event_id匹配的事件记录需要更新`); + } + + // 批量更新每个资源相关的事件记录 + for (const externalId of externalIds) { + const resource = resourceMapByExternalId[externalId]; + + if (!resource) continue; + + // 获取关联数据 + const tags = resource.tags ? JSON.stringify(resource.tags) : null; + + if (tags) { + // 尝试通过event_id更新事件标签 + const updateTagsQueryByEventId = ` + ALTER TABLE shorturl_analytics.events + UPDATE link_tags = '${escapeString(tags)}' + WHERE event_id = '${externalId}' + `; + + await executeClickHouseQuery(chConfig, updateTagsQueryByEventId); + log(`尝试通过event_id更新事件标签: ${externalId}`, true); + + // 尝试通过link_id更新事件标签 + const updateTagsQueryByLinkId = ` + ALTER TABLE shorturl_analytics.events + UPDATE link_tags = '${escapeString(tags)}' + WHERE link_id = '${externalId}' + `; + + await executeClickHouseQuery(chConfig, updateTagsQueryByLinkId); + log(`尝试通过link_id更新事件标签: ${externalId}`, true); + } + + // 如果资源有resource_id,也尝试使用它来更新 + if (resource.resource_id) { + const updateByResourceId = ` + ALTER TABLE shorturl_analytics.events + UPDATE link_tags = '${escapeString(tags || '[]')}' + WHERE link_id = '${resource.resource_id}' + `; + + await executeClickHouseQuery(chConfig, updateByResourceId); + log(`尝试通过resource_id更新事件标签: ${resource.resource_id}`, true); + } + + updatedCount++; + } + + log(`已尝试更新 ${updatedCount} 个资源的事件记录`); + + } catch (error) { + log(`更新events表失败: ${(error as Error).message}`); + } + + return updatedCount; +} + +/** + * 获取ClickHouse配置 + */ +async function getClickHouseConfig(): Promise { + try { + const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse"); + + // 确保配置不为空 + if (!chConfigJson) { + throw new Error("未找到ClickHouse配置"); + } + + // 解析JSON字符串为对象 + let chConfig: ChConfig; + if (typeof chConfigJson === 'string') { + try { + chConfig = JSON.parse(chConfigJson); + } catch (_) { + throw new Error("ClickHouse配置不是有效的JSON"); + } + } else { + chConfig = chConfigJson as ChConfig; + } + + // 验证并构建URL + if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) { + chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`; + } + + if (!chConfig.clickhouse_url) { + throw new Error("ClickHouse配置缺少URL"); + } + + return chConfig; + } catch (error) { + throw new Error(`获取ClickHouse配置失败: ${(error as Error).message}`); + } +} + +/** + * 检查ClickHouse中是否存在指定表 + */ +async function checkClickHouseTable(chConfig: ChConfig, tableName: string): Promise { + try { + const query = `EXISTS TABLE ${tableName}`; + const result = await executeClickHouseQuery(chConfig, query); + return result.trim() === '1'; + } catch (error) { + console.error(`检查表 ${tableName} 失败:`, error); + return false; + } +} + +/** + * 执行ClickHouse查询 + */ +async function executeClickHouseQuery(chConfig: ChConfig, query: string): Promise { + // 确保URL有效 + if (!chConfig.clickhouse_url) { + throw new Error("无效的ClickHouse URL: 未定义"); + } + + // 执行HTTP请求 + try { + const response = await fetch(chConfig.clickhouse_url, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}` + }, + body: query, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse查询失败 (${response.status}): ${errorText}`); + } + + return await response.text(); + } catch (error) { + throw new Error(`执行ClickHouse查询失败: ${(error as Error).message}`); + } +} + +/** + * 解析JSON字段 + */ +function parseJsonField(field: unknown): Record { + if (!field) return {}; + + try { + if (typeof field === 'string') { + return JSON.parse(field); + } else if (typeof field === 'object') { + return field as Record; + } + } catch (error) { + console.warn(`无法解析JSON字段:`, error); + } + + return {}; +} + +/** + * 转义字符串,避免SQL注入 + */ +function escapeString(str: string): string { + if (!str) return ''; + return str.replace(/'/g, "''"); +} \ No newline at end of file