// Sync data from MongoDB trace table to ClickHouse events table import { getVariable } from "npm:windmill-client@1"; import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; interface MongoConfig { host: string; port: string; db: string; username: string; password: string; } interface ClickHouseConfig { clickhouse_host: string; clickhouse_port: number; clickhouse_user: string; clickhouse_password: string; clickhouse_url: string; } interface TraceRecord { _id: ObjectId; slugId: ObjectId; label: string | null; ip: string; type: number; platform: string; platformOS: string; browser: string; browserVersion: string; url: string; createTime: number; } interface ShortRecord { _id: ObjectId; slug: string; // 短链接的slug部分 origin: string; // 原始URL domain?: string; // 域名 createTime: number; // 创建时间戳 user?: string; // 创建用户 title?: string; // 标题 description?: string; // 描述 tags?: string[]; // 标签 active?: boolean; // 是否活跃 expiresAt?: number; // 过期时间戳 teamId?: string; // 团队ID projectId?: string; // 项目ID } interface ClickHouseRow { event_id: string; event_attributes: string; } export async function main( batch_size = 1000, max_records = 9999999, timeout_minutes = 60, skip_clickhouse_check = false, force_insert = false ) { const logWithTimestamp = (message: string) => { const now = new Date(); console.log(`[${now.toISOString()}] ${message}`); }; logWithTimestamp("Starting sync from MongoDB to ClickHouse events table"); logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`); // Set timeout const startTime = Date.now(); const timeoutMs = timeout_minutes * 60 * 1000; const checkTimeout = () => { if (Date.now() - startTime > timeoutMs) { console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`); return true; } return false; }; // Get MongoDB and ClickHouse connection info let mongoConfig: MongoConfig; let clickhouseConfig: ClickHouseConfig; try { const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig; const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig; } catch (error) { console.error("Failed to get config:", error); throw error; } // Build MongoDB connection URL let mongoUrl = "mongodb://"; if (mongoConfig.username && mongoConfig.password) { mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; } mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; // Connect to MongoDB const client = new MongoClient(); try { await client.connect(mongoUrl); console.log("MongoDB connected successfully"); const db = client.database(mongoConfig.db); const traceCollection = db.collection("trace"); const shortCollection = db.collection("short"); // Build query conditions const query: Record = { type: 1 // Only sync records with type 1 }; // Count total records const totalRecords = await traceCollection.countDocuments(query); console.log(`Found ${totalRecords} records to sync`); const recordsToProcess = Math.min(totalRecords, max_records); console.log(`Will process ${recordsToProcess} records`); if (totalRecords === 0) { console.log("No records to sync, task completed"); return { success: true, records_synced: 0, message: "No records to sync" }; } // Check ClickHouse connection const checkClickHouseConnection = async (): Promise => { if (skip_clickhouse_check) { logWithTimestamp("Skipping ClickHouse connection check"); return true; } try { logWithTimestamp("Testing ClickHouse connection..."); const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; const response = await fetch(clickhouseUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, }, body: "SELECT 1", signal: AbortSignal.timeout(5000) }); if (response.ok) { logWithTimestamp("ClickHouse connection test successful"); return true; } else { const errorText = await response.text(); logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`); return false; } } catch (err) { logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`); return false; } }; // Check if records exist in ClickHouse const checkExistingRecords = async (records: TraceRecord[]): Promise => { if (records.length === 0) return []; if (skip_clickhouse_check || force_insert) { logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`); return records; } try { const recordIds = records.map(record => record._id.toString()); const query = ` SELECT event_id FROM shorturl_analytics.events WHERE event_attributes LIKE '%"mongo_id":"%' AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%') FORMAT JSON `; const response = await fetch(clickhouseConfig.clickhouse_url, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` }, body: query, signal: AbortSignal.timeout(10000) }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse query error: ${response.status} ${errorText}`); } const result = await response.json(); const existingIds = new Set(result.data.map((row: ClickHouseRow) => { const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/); return matches ? matches[1] : null; }).filter(Boolean)); return records.filter(record => !existingIds.has(record._id.toString())); } catch (err) { logWithTimestamp(`Error checking existing records: ${(err as Error).message}`); return skip_clickhouse_check ? records : []; } }; // Process records function const processRecords = async (records: TraceRecord[]) => { if (records.length === 0) return 0; const newRecords = await checkExistingRecords(records); if (newRecords.length === 0) { logWithTimestamp("All records already exist, skipping"); return 0; } // Get link information for all records const slugIds = newRecords.map(record => record.slugId); const shortLinks = await shortCollection.find({ _id: { $in: slugIds } }).toArray(); // Create a map for quick lookup const shortLinksMap = new Map(shortLinks.map(link => [link._id.toString(), link])); // Prepare ClickHouse insert data const clickhouseData = newRecords.map(record => { const shortLink = shortLinksMap.get(record.slugId.toString()); // 将毫秒时间戳转换为 DateTime64(3) 格式 const formatDateTime = (timestamp: number) => { return new Date(timestamp).toISOString().replace('T', ' ').replace('Z', ''); }; return { // Event base information event_id: record._id.toString(), event_time: formatDateTime(record.createTime), event_type: "click", event_attributes: JSON.stringify({ original_type: record.type }), // Link information from short collection link_id: record.slugId.toString(), link_slug: shortLink?.slug || "", link_label: record.label || "", link_title: "", link_original_url: shortLink?.origin || "", link_attributes: JSON.stringify({ domain: shortLink?.domain || null }), link_created_at: shortLink?.createTime ? formatDateTime(shortLink.createTime) : formatDateTime(record.createTime), link_expires_at: shortLink?.expiresAt ? formatDateTime(shortLink.expiresAt) : null, link_tags: "[]", // Empty array as default // User information user_id: shortLink?.user || "", user_name: "", user_email: "", user_attributes: "{}", // Team information team_id: shortLink?.teamId || "", team_name: "", team_attributes: "{}", // Project information project_id: shortLink?.projectId || "", project_name: "", project_attributes: "{}", // QR code information qr_code_id: "", qr_code_name: "", qr_code_attributes: "{}", // Visitor information visitor_id: "", // Empty string as default session_id: `${record.slugId.toString()}-${record.createTime}`, ip_address: record.ip || "", country: "", city: "", device_type: record.platform || "", browser: record.browser || "", os: record.platformOS || "", user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(), // Source information referrer: record.url || "", utm_source: "", utm_medium: "", utm_campaign: "", // Interaction information time_spent_sec: 0, is_bounce: true, is_qr_scan: false, conversion_type: "visit", conversion_value: 0 }; }); // Generate ClickHouse insert SQL const rows = clickhouseData.map(row => { // 只需要处理JSON字符串的转义 const formattedRow = { ...row, event_attributes: row.event_attributes.replace(/\\/g, '\\\\'), link_attributes: row.link_attributes.replace(/\\/g, '\\\\') }; return JSON.stringify(formattedRow); }).join('\n'); const insertSQL = `INSERT INTO shorturl_analytics.events FORMAT JSONEachRow\n${rows}`; try { const response = await fetch(clickhouseConfig.clickhouse_url, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` }, body: insertSQL, signal: AbortSignal.timeout(20000) }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`); } logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`); return newRecords.length; } catch (err) { logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`); throw err; } }; // Check ClickHouse connection before processing const clickhouseConnected = await checkClickHouseConnection(); if (!clickhouseConnected && !skip_clickhouse_check) { throw new Error("ClickHouse connection failed, cannot continue sync"); } // Process records in batches let processedRecords = 0; let totalBatchRecords = 0; for (let page = 0; processedRecords < recordsToProcess; page++) { if (checkTimeout()) { logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`); break; } logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`); const records = await traceCollection.find( query, { allowDiskUse: true, sort: { createTime: 1 }, skip: page * batch_size, limit: batch_size } ).toArray(); if (records.length === 0) { logWithTimestamp("No more records found, sync complete"); break; } const batchSize = await processRecords(records); processedRecords += records.length; totalBatchRecords += batchSize; logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`); } return { success: true, records_processed: processedRecords, records_synced: totalBatchRecords, message: "Data sync completed" }; } catch (err) { console.error("Error during sync:", err); return { success: false, error: err instanceof Error ? err.message : String(err), stack: err instanceof Error ? err.stack : undefined }; } finally { await client.close(); console.log("MongoDB connection closed"); } }