// Sync data from MongoDB trace table to ClickHouse events table import { getVariable } from "npm:windmill-client@1"; import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; interface MongoConfig { host: string; port: string; db: string; username: string; password: string; } interface ClickHouseConfig { clickhouse_host: string; clickhouse_port: number; clickhouse_user: string; clickhouse_password: string; clickhouse_database: string; clickhouse_url: string; } interface TraceRecord { _id: ObjectId; slugId: ObjectId; label: string | null; ip: string; type: number; platform: string; platformOS: string; browser: string; browserVersion: string; url: string; createTime: number; } export async function main( batch_size = 1000, max_records = 9999999, timeout_minutes = 60, skip_clickhouse_check = false, force_insert = false ) { const logWithTimestamp = (message: string) => { const now = new Date(); console.log(`[${now.toISOString()}] ${message}`); }; logWithTimestamp("Starting sync from MongoDB to ClickHouse events table"); logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`); // Set timeout const startTime = Date.now(); const timeoutMs = timeout_minutes * 60 * 1000; const checkTimeout = () => { if (Date.now() - startTime > timeoutMs) { console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`); return true; } return false; }; // Get MongoDB and ClickHouse connection info let mongoConfig: MongoConfig; let clickhouseConfig: ClickHouseConfig; try { const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig; const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig; } catch (error) { console.error("Failed to get config:", error); throw error; } // Build MongoDB connection URL let mongoUrl = "mongodb://"; if (mongoConfig.username && mongoConfig.password) { mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; } mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; // Connect to MongoDB const client = new MongoClient(); try { await client.connect(mongoUrl); console.log("MongoDB connected successfully"); const db = client.database(mongoConfig.db); const traceCollection = db.collection("trace"); // Build query conditions const query: Record = { type: 1 // Only sync records with type 1 }; // Count total records const totalRecords = await traceCollection.countDocuments(query); console.log(`Found ${totalRecords} records to sync`); const recordsToProcess = Math.min(totalRecords, max_records); console.log(`Will process ${recordsToProcess} records`); if (totalRecords === 0) { console.log("No records to sync, task completed"); return { success: true, records_synced: 0, message: "No records to sync" }; } // Check ClickHouse connection const checkClickHouseConnection = async (): Promise => { if (skip_clickhouse_check) { logWithTimestamp("Skipping ClickHouse connection check"); return true; } try { logWithTimestamp("Testing ClickHouse connection..."); const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; const response = await fetch(clickhouseUrl, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, }, body: "SELECT 1", signal: AbortSignal.timeout(5000) }); if (response.ok) { logWithTimestamp("ClickHouse connection test successful"); return true; } else { const errorText = await response.text(); logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`); return false; } } catch (err) { logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`); return false; } }; // Check if records exist in ClickHouse const checkExistingRecords = async (records: TraceRecord[]): Promise => { if (records.length === 0) return []; if (skip_clickhouse_check || force_insert) { logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`); return records; } try { const recordIds = records.map(record => record._id.toString()); const query = ` SELECT event_id FROM ${clickhouseConfig.clickhouse_database}.events WHERE event_attributes LIKE '%"mongo_id":"%' AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%') FORMAT JSON `; const response = await fetch(clickhouseConfig.clickhouse_url, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` }, body: query, signal: AbortSignal.timeout(10000) }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse query error: ${response.status} ${errorText}`); } const result = await response.json(); const existingIds = new Set(result.data.map((row: any) => { const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/); return matches ? matches[1] : null; }).filter(Boolean)); return records.filter(record => !existingIds.has(record._id.toString())); } catch (err) { logWithTimestamp(`Error checking existing records: ${(err as Error).message}`); return skip_clickhouse_check ? records : []; } }; // Process records function const processRecords = async (records: TraceRecord[]) => { if (records.length === 0) return 0; const newRecords = await checkExistingRecords(records); if (newRecords.length === 0) { logWithTimestamp("All records already exist, skipping"); return 0; } // Prepare ClickHouse insert data const clickhouseData = newRecords.map(record => { const eventTime = new Date(record.createTime).toISOString(); return { event_time: eventTime, event_type: "click", event_attributes: JSON.stringify({ mongo_id: record._id.toString(), original_type: record.type }), // Link information link_id: record.slugId.toString(), link_slug: "", link_label: record.label || "", link_title: "", link_original_url: record.url || "", link_attributes: "{}", link_created_at: eventTime, link_expires_at: null, link_tags: "[]", // User information (empty as not available in trace) user_id: "", user_name: "", user_email: "", user_attributes: "{}", // Team information (empty as not available in trace) team_id: "", team_name: "", team_attributes: "{}", // Project information (empty as not available in trace) project_id: "", project_name: "", project_attributes: "{}", // QR code information (empty as not available in trace) qr_code_id: "", qr_code_name: "", qr_code_attributes: "{}", // Visitor information visitor_id: record._id.toString(), session_id: `${record._id.toString()}-${record.createTime}`, ip_address: record.ip || "", country: "", city: "", device_type: record.platform || "unknown", browser: record.browser || "", os: record.platformOS || "", user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(), // Source information referrer: record.url || "", utm_source: "", utm_medium: "", utm_campaign: "", // Interaction information time_spent_sec: 0, is_bounce: true, is_qr_scan: false, conversion_type: "visit", conversion_value: 0 }; }); // Generate ClickHouse insert SQL const insertSQL = ` INSERT INTO ${clickhouseConfig.clickhouse_database}.events FORMAT JSONEachRow ${JSON.stringify(clickhouseData)} `; try { const response = await fetch(clickhouseConfig.clickhouse_url, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` }, body: insertSQL, signal: AbortSignal.timeout(20000) }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`); } logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`); return newRecords.length; } catch (err) { logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`); throw err; } }; // Check ClickHouse connection before processing const clickhouseConnected = await checkClickHouseConnection(); if (!clickhouseConnected && !skip_clickhouse_check) { throw new Error("ClickHouse connection failed, cannot continue sync"); } // Process records in batches let processedRecords = 0; let totalBatchRecords = 0; for (let page = 0; processedRecords < recordsToProcess; page++) { if (checkTimeout()) { logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`); break; } logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`); const records = await traceCollection.find( query, { allowDiskUse: true, sort: { createTime: 1 }, skip: page * batch_size, limit: batch_size } ).toArray(); if (records.length === 0) { logWithTimestamp("No more records found, sync complete"); break; } const batchSize = await processRecords(records); processedRecords += records.length; totalBatchRecords += batchSize; logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`); } return { success: true, records_processed: processedRecords, records_synced: totalBatchRecords, message: "Data sync completed" }; } catch (err) { console.error("Error during sync:", err); return { success: false, error: err instanceof Error ? err.message : String(err), stack: err instanceof Error ? err.stack : undefined }; } finally { await client.close(); console.log("MongoDB connection closed"); } }