From 1e9e5928d795f47d66a868700097c68ac477bb70 Mon Sep 17 00:00:00 2001 From: William Tso Date: Tue, 25 Mar 2025 14:35:01 +0800 Subject: [PATCH] sync trace & short to clickhouse events --- .../clickhouse/create_shorturl_analytics.sql | 9 +- scripts/db/sync_mongo_to_events.ts | 364 ++++++++++++ windmill/sync_mongo_to_events.ts | 409 ++++++++++++++ windmill/sync_shorturl_event_from_mongo.ts | 474 ---------------- .../sync_shorturl_from_mongo_to_clickhouse.ts | 532 ------------------ 5 files changed, 779 insertions(+), 1009 deletions(-) create mode 100644 scripts/db/sync_mongo_to_events.ts create mode 100644 windmill/sync_mongo_to_events.ts delete mode 100644 windmill/sync_shorturl_event_from_mongo.ts delete mode 100644 windmill/sync_shorturl_from_mongo_to_clickhouse.ts diff --git a/scripts/db/sql/clickhouse/create_shorturl_analytics.sql b/scripts/db/sql/clickhouse/create_shorturl_analytics.sql index 9a21ea3..54d22ca 100644 --- a/scripts/db/sql/clickhouse/create_shorturl_analytics.sql +++ b/scripts/db/sql/clickhouse/create_shorturl_analytics.sql @@ -10,8 +10,9 @@ DROP TABLE IF EXISTS shorturl_analytics.events; -- 创建新表 CREATE TABLE IF NOT EXISTS shorturl_analytics.events ( -- 事件基础信息 - event_id UUID DEFAULT generateUUIDv4(), - event_time DateTime64(3) DEFAULT now64(), + event_id String, + event_time DateTime64(3), + -- 精确到毫秒的时间戳 event_type String, -- click, redirect, conversion, error event_attributes String DEFAULT '{}', @@ -25,7 +26,9 @@ CREATE TABLE IF NOT EXISTS shorturl_analytics.events ( link_original_url String, link_attributes String DEFAULT '{}', link_created_at DateTime64(3), + -- 精确到毫秒的时间戳 link_expires_at Nullable(DateTime64(3)), + -- 精确到毫秒的时间戳 link_tags String DEFAULT '[]', -- Array of {id, name, attributes} -- 用户信息 @@ -68,6 +71,6 @@ CREATE TABLE IF NOT EXISTS shorturl_analytics.events ( conversion_type String, -- 改为String类型 conversion_value Float64 DEFAULT 0 -) ENGINE = MergeTree() PARTITION BY toYYYYMM(event_time) +) ENGINE = MergeTree() PARTITION BY toYYYYMM(event_time) -- 直接使用DateTime64进行分区 ORDER BY (event_time, link_id, event_id) SETTINGS index_granularity = 8192; \ No newline at end of file diff --git a/scripts/db/sync_mongo_to_events.ts b/scripts/db/sync_mongo_to_events.ts new file mode 100644 index 0000000..bc91133 --- /dev/null +++ b/scripts/db/sync_mongo_to_events.ts @@ -0,0 +1,364 @@ +// Sync data from MongoDB trace table to ClickHouse events table +import { getVariable } from "npm:windmill-client@1"; +import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; + +interface MongoConfig { + host: string; + port: string; + db: string; + username: string; + password: string; +} + +interface ClickHouseConfig { + clickhouse_host: string; + clickhouse_port: number; + clickhouse_user: string; + clickhouse_password: string; + clickhouse_database: string; + clickhouse_url: string; +} + +interface TraceRecord { + _id: ObjectId; + slugId: ObjectId; + label: string | null; + ip: string; + type: number; + platform: string; + platformOS: string; + browser: string; + browserVersion: string; + url: string; + createTime: number; +} + +export async function main( + batch_size = 1000, + max_records = 9999999, + timeout_minutes = 60, + skip_clickhouse_check = false, + force_insert = false +) { + const logWithTimestamp = (message: string) => { + const now = new Date(); + console.log(`[${now.toISOString()}] ${message}`); + }; + + logWithTimestamp("Starting sync from MongoDB to ClickHouse events table"); + logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`); + + // Set timeout + const startTime = Date.now(); + const timeoutMs = timeout_minutes * 60 * 1000; + + const checkTimeout = () => { + if (Date.now() - startTime > timeoutMs) { + console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`); + return true; + } + return false; + }; + + // Get MongoDB and ClickHouse connection info + let mongoConfig: MongoConfig; + let clickhouseConfig: ClickHouseConfig; + + try { + const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); + mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig; + + const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); + clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig; + } catch (error) { + console.error("Failed to get config:", error); + throw error; + } + + // Build MongoDB connection URL + let mongoUrl = "mongodb://"; + if (mongoConfig.username && mongoConfig.password) { + mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; + } + mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; + + // Connect to MongoDB + const client = new MongoClient(); + try { + await client.connect(mongoUrl); + console.log("MongoDB connected successfully"); + + const db = client.database(mongoConfig.db); + const traceCollection = db.collection("trace"); + + // Build query conditions + const query: Record = { + type: 1 // Only sync records with type 1 + }; + + // Count total records + const totalRecords = await traceCollection.countDocuments(query); + console.log(`Found ${totalRecords} records to sync`); + + const recordsToProcess = Math.min(totalRecords, max_records); + console.log(`Will process ${recordsToProcess} records`); + + if (totalRecords === 0) { + console.log("No records to sync, task completed"); + return { + success: true, + records_synced: 0, + message: "No records to sync" + }; + } + + // Check ClickHouse connection + const checkClickHouseConnection = async (): Promise => { + if (skip_clickhouse_check) { + logWithTimestamp("Skipping ClickHouse connection check"); + return true; + } + + try { + logWithTimestamp("Testing ClickHouse connection..."); + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, + }, + body: "SELECT 1", + signal: AbortSignal.timeout(5000) + }); + + if (response.ok) { + logWithTimestamp("ClickHouse connection test successful"); + return true; + } else { + const errorText = await response.text(); + logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`); + return false; + } + } catch (err) { + logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`); + return false; + } + }; + + // Check if records exist in ClickHouse + const checkExistingRecords = async (records: TraceRecord[]): Promise => { + if (records.length === 0) return []; + + if (skip_clickhouse_check || force_insert) { + logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`); + return records; + } + + try { + const recordIds = records.map(record => record._id.toString()); + + const query = ` + SELECT event_id + FROM ${clickhouseConfig.clickhouse_database}.events + WHERE event_attributes LIKE '%"mongo_id":"%' + AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%') + FORMAT JSON + `; + + const response = await fetch(clickhouseConfig.clickhouse_url, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` + }, + body: query, + signal: AbortSignal.timeout(10000) + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse query error: ${response.status} ${errorText}`); + } + + const result = await response.json(); + const existingIds = new Set(result.data.map((row: any) => { + const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/); + return matches ? matches[1] : null; + }).filter(Boolean)); + + return records.filter(record => !existingIds.has(record._id.toString())); + } catch (err) { + logWithTimestamp(`Error checking existing records: ${(err as Error).message}`); + return skip_clickhouse_check ? records : []; + } + }; + + // Process records function + const processRecords = async (records: TraceRecord[]) => { + if (records.length === 0) return 0; + + const newRecords = await checkExistingRecords(records); + if (newRecords.length === 0) { + logWithTimestamp("All records already exist, skipping"); + return 0; + } + + // Prepare ClickHouse insert data + const clickhouseData = newRecords.map(record => { + const eventTime = new Date(record.createTime).toISOString(); + return { + event_time: eventTime, + event_type: "click", + event_attributes: JSON.stringify({ + mongo_id: record._id.toString(), + original_type: record.type + }), + + // Link information + link_id: record.slugId.toString(), + link_slug: "", + link_label: record.label || "", + link_title: "", + link_original_url: record.url || "", + link_attributes: "{}", + link_created_at: eventTime, + link_expires_at: null, + link_tags: "[]", + + // User information (empty as not available in trace) + user_id: "", + user_name: "", + user_email: "", + user_attributes: "{}", + + // Team information (empty as not available in trace) + team_id: "", + team_name: "", + team_attributes: "{}", + + // Project information (empty as not available in trace) + project_id: "", + project_name: "", + project_attributes: "{}", + + // QR code information (empty as not available in trace) + qr_code_id: "", + qr_code_name: "", + qr_code_attributes: "{}", + + // Visitor information + visitor_id: record._id.toString(), + session_id: `${record._id.toString()}-${record.createTime}`, + ip_address: record.ip || "", + country: "", + city: "", + device_type: record.platform || "unknown", + browser: record.browser || "", + os: record.platformOS || "", + user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(), + + // Source information + referrer: record.url || "", + utm_source: "", + utm_medium: "", + utm_campaign: "", + + // Interaction information + time_spent_sec: 0, + is_bounce: true, + is_qr_scan: false, + conversion_type: "visit", + conversion_value: 0 + }; + }); + + // Generate ClickHouse insert SQL + const insertSQL = ` + INSERT INTO ${clickhouseConfig.clickhouse_database}.events + FORMAT JSONEachRow + ${JSON.stringify(clickhouseData)} + `; + + try { + const response = await fetch(clickhouseConfig.clickhouse_url, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` + }, + body: insertSQL, + signal: AbortSignal.timeout(20000) + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`); + } + + logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`); + return newRecords.length; + } catch (err) { + logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`); + throw err; + } + }; + + // Check ClickHouse connection before processing + const clickhouseConnected = await checkClickHouseConnection(); + if (!clickhouseConnected && !skip_clickhouse_check) { + throw new Error("ClickHouse connection failed, cannot continue sync"); + } + + // Process records in batches + let processedRecords = 0; + let totalBatchRecords = 0; + + for (let page = 0; processedRecords < recordsToProcess; page++) { + if (checkTimeout()) { + logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`); + break; + } + + logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + const records = await traceCollection.find( + query, + { + allowDiskUse: true, + sort: { createTime: 1 }, + skip: page * batch_size, + limit: batch_size + } + ).toArray(); + + if (records.length === 0) { + logWithTimestamp("No more records found, sync complete"); + break; + } + + const batchSize = await processRecords(records); + processedRecords += records.length; + totalBatchRecords += batchSize; + + logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`); + } + + return { + success: true, + records_processed: processedRecords, + records_synced: totalBatchRecords, + message: "Data sync completed" + }; + } catch (err) { + console.error("Error during sync:", err); + return { + success: false, + error: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined + }; + } finally { + await client.close(); + console.log("MongoDB connection closed"); + } +} \ No newline at end of file diff --git a/windmill/sync_mongo_to_events.ts b/windmill/sync_mongo_to_events.ts new file mode 100644 index 0000000..38522c7 --- /dev/null +++ b/windmill/sync_mongo_to_events.ts @@ -0,0 +1,409 @@ +// Sync data from MongoDB trace table to ClickHouse events table +import { getVariable } from "npm:windmill-client@1"; +import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; + +interface MongoConfig { + host: string; + port: string; + db: string; + username: string; + password: string; +} + +interface ClickHouseConfig { + clickhouse_host: string; + clickhouse_port: number; + clickhouse_user: string; + clickhouse_password: string; + clickhouse_url: string; +} + +interface TraceRecord { + _id: ObjectId; + slugId: ObjectId; + label: string | null; + ip: string; + type: number; + platform: string; + platformOS: string; + browser: string; + browserVersion: string; + url: string; + createTime: number; +} + +interface ShortRecord { + _id: ObjectId; + slug: string; // 短链接的slug部分 + origin: string; // 原始URL + domain?: string; // 域名 + createTime: number; // 创建时间戳 + user?: string; // 创建用户 + title?: string; // 标题 + description?: string; // 描述 + tags?: string[]; // 标签 + active?: boolean; // 是否活跃 + expiresAt?: number; // 过期时间戳 + teamId?: string; // 团队ID + projectId?: string; // 项目ID +} + +interface ClickHouseRow { + event_id: string; + event_attributes: string; +} + +export async function main( + batch_size = 1000, + max_records = 9999999, + timeout_minutes = 60, + skip_clickhouse_check = false, + force_insert = false +) { + const logWithTimestamp = (message: string) => { + const now = new Date(); + console.log(`[${now.toISOString()}] ${message}`); + }; + + logWithTimestamp("Starting sync from MongoDB to ClickHouse events table"); + logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`); + + // Set timeout + const startTime = Date.now(); + const timeoutMs = timeout_minutes * 60 * 1000; + + const checkTimeout = () => { + if (Date.now() - startTime > timeoutMs) { + console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`); + return true; + } + return false; + }; + + // Get MongoDB and ClickHouse connection info + let mongoConfig: MongoConfig; + let clickhouseConfig: ClickHouseConfig; + + try { + const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); + mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig; + + const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); + clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig; + } catch (error) { + console.error("Failed to get config:", error); + throw error; + } + + // Build MongoDB connection URL + let mongoUrl = "mongodb://"; + if (mongoConfig.username && mongoConfig.password) { + mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; + } + mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; + + // Connect to MongoDB + const client = new MongoClient(); + try { + await client.connect(mongoUrl); + console.log("MongoDB connected successfully"); + + const db = client.database(mongoConfig.db); + const traceCollection = db.collection("trace"); + const shortCollection = db.collection("short"); + + // Build query conditions + const query: Record = { + type: 1 // Only sync records with type 1 + }; + + // Count total records + const totalRecords = await traceCollection.countDocuments(query); + console.log(`Found ${totalRecords} records to sync`); + + const recordsToProcess = Math.min(totalRecords, max_records); + console.log(`Will process ${recordsToProcess} records`); + + if (totalRecords === 0) { + console.log("No records to sync, task completed"); + return { + success: true, + records_synced: 0, + message: "No records to sync" + }; + } + + // Check ClickHouse connection + const checkClickHouseConnection = async (): Promise => { + if (skip_clickhouse_check) { + logWithTimestamp("Skipping ClickHouse connection check"); + return true; + } + + try { + logWithTimestamp("Testing ClickHouse connection..."); + const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; + const response = await fetch(clickhouseUrl, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, + }, + body: "SELECT 1", + signal: AbortSignal.timeout(5000) + }); + + if (response.ok) { + logWithTimestamp("ClickHouse connection test successful"); + return true; + } else { + const errorText = await response.text(); + logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`); + return false; + } + } catch (err) { + logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`); + return false; + } + }; + + // Check if records exist in ClickHouse + const checkExistingRecords = async (records: TraceRecord[]): Promise => { + if (records.length === 0) return []; + + if (skip_clickhouse_check || force_insert) { + logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`); + return records; + } + + try { + const recordIds = records.map(record => record._id.toString()); + + const query = ` + SELECT event_id + FROM shorturl_analytics.events + WHERE event_attributes LIKE '%"mongo_id":"%' + AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%') + FORMAT JSON + `; + + const response = await fetch(clickhouseConfig.clickhouse_url, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` + }, + body: query, + signal: AbortSignal.timeout(10000) + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse query error: ${response.status} ${errorText}`); + } + + const result = await response.json(); + const existingIds = new Set(result.data.map((row: ClickHouseRow) => { + const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/); + return matches ? matches[1] : null; + }).filter(Boolean)); + + return records.filter(record => !existingIds.has(record._id.toString())); + } catch (err) { + logWithTimestamp(`Error checking existing records: ${(err as Error).message}`); + return skip_clickhouse_check ? records : []; + } + }; + + // Process records function + const processRecords = async (records: TraceRecord[]) => { + if (records.length === 0) return 0; + + const newRecords = await checkExistingRecords(records); + if (newRecords.length === 0) { + logWithTimestamp("All records already exist, skipping"); + return 0; + } + + // Get link information for all records + const slugIds = newRecords.map(record => record.slugId); + const shortLinks = await shortCollection.find({ + _id: { $in: slugIds } + }).toArray(); + + // Create a map for quick lookup + const shortLinksMap = new Map(shortLinks.map(link => [link._id.toString(), link])); + + // Prepare ClickHouse insert data + const clickhouseData = newRecords.map(record => { + const shortLink = shortLinksMap.get(record.slugId.toString()); + + // 将毫秒时间戳转换为 DateTime64(3) 格式 + const formatDateTime = (timestamp: number) => { + return new Date(timestamp).toISOString().replace('T', ' ').replace('Z', ''); + }; + + return { + // Event base information + event_id: record._id.toString(), + event_time: formatDateTime(record.createTime), + event_type: "click", + event_attributes: JSON.stringify({ + original_type: record.type + }), + + // Link information from short collection + link_id: record.slugId.toString(), + link_slug: shortLink?.slug || "", + link_label: record.label || "", + link_title: "", + link_original_url: shortLink?.origin || "", + link_attributes: JSON.stringify({ + domain: shortLink?.domain || null + }), + link_created_at: shortLink?.createTime ? formatDateTime(shortLink.createTime) : formatDateTime(record.createTime), + link_expires_at: shortLink?.expiresAt ? formatDateTime(shortLink.expiresAt) : null, + link_tags: "[]", // Empty array as default + + // User information + user_id: shortLink?.user || "", + user_name: "", + user_email: "", + user_attributes: "{}", + + // Team information + team_id: shortLink?.teamId || "", + team_name: "", + team_attributes: "{}", + + // Project information + project_id: shortLink?.projectId || "", + project_name: "", + project_attributes: "{}", + + // QR code information + qr_code_id: "", + qr_code_name: "", + qr_code_attributes: "{}", + + // Visitor information + visitor_id: "", // Empty string as default + session_id: `${record.slugId.toString()}-${record.createTime}`, + ip_address: record.ip || "", + country: "", + city: "", + device_type: record.platform || "", + browser: record.browser || "", + os: record.platformOS || "", + user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(), + + // Source information + referrer: record.url || "", + utm_source: "", + utm_medium: "", + utm_campaign: "", + + // Interaction information + time_spent_sec: 0, + is_bounce: true, + is_qr_scan: false, + conversion_type: "visit", + conversion_value: 0 + }; + }); + + // Generate ClickHouse insert SQL + const rows = clickhouseData.map(row => { + // 只需要处理JSON字符串的转义 + const formattedRow = { + ...row, + event_attributes: row.event_attributes.replace(/\\/g, '\\\\'), + link_attributes: row.link_attributes.replace(/\\/g, '\\\\') + }; + return JSON.stringify(formattedRow); + }).join('\n'); + + const insertSQL = `INSERT INTO shorturl_analytics.events FORMAT JSONEachRow\n${rows}`; + + try { + const response = await fetch(clickhouseConfig.clickhouse_url, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` + }, + body: insertSQL, + signal: AbortSignal.timeout(20000) + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`); + } + + logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`); + return newRecords.length; + } catch (err) { + logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`); + throw err; + } + }; + + // Check ClickHouse connection before processing + const clickhouseConnected = await checkClickHouseConnection(); + if (!clickhouseConnected && !skip_clickhouse_check) { + throw new Error("ClickHouse connection failed, cannot continue sync"); + } + + // Process records in batches + let processedRecords = 0; + let totalBatchRecords = 0; + + for (let page = 0; processedRecords < recordsToProcess; page++) { + if (checkTimeout()) { + logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`); + break; + } + + logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + const records = await traceCollection.find( + query, + { + allowDiskUse: true, + sort: { createTime: 1 }, + skip: page * batch_size, + limit: batch_size + } + ).toArray(); + + if (records.length === 0) { + logWithTimestamp("No more records found, sync complete"); + break; + } + + const batchSize = await processRecords(records); + processedRecords += records.length; + totalBatchRecords += batchSize; + + logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`); + } + + return { + success: true, + records_processed: processedRecords, + records_synced: totalBatchRecords, + message: "Data sync completed" + }; + } catch (err) { + console.error("Error during sync:", err); + return { + success: false, + error: err instanceof Error ? err.message : String(err), + stack: err instanceof Error ? err.stack : undefined + }; + } finally { + await client.close(); + console.log("MongoDB connection closed"); + } +} \ No newline at end of file diff --git a/windmill/sync_shorturl_event_from_mongo.ts b/windmill/sync_shorturl_event_from_mongo.ts deleted file mode 100644 index 345eaad..0000000 --- a/windmill/sync_shorturl_event_from_mongo.ts +++ /dev/null @@ -1,474 +0,0 @@ -// 从MongoDB的trace表同步数据到ClickHouse的link_events表 -import { getVariable, setVariable } from "npm:windmill-client@1"; -import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; - -interface MongoConfig { - host: string; - port: string; - db: string; - username: string; - password: string; -} - -interface ClickHouseConfig { - clickhouse_host: string; - clickhouse_port: number; - clickhouse_user: string; - clickhouse_password: string; - clickhouse_database: string; - clickhouse_url: string; -} - -interface TraceRecord { - _id: ObjectId; - slugId: ObjectId; - label: string | null; - ip: string; - type: number; - platform: string; - platformOS: string; - browser: string; - browserVersion: string; - url: string; - createTime: number; -} - -interface SyncState { - last_sync_time: number; - records_synced: number; - last_sync_id?: string; -} - -export async function main( - batch_size = 1000, - max_records = 9999999, - timeout_minutes = 60, - skip_clickhouse_check = false, - force_insert = false -) { - const logWithTimestamp = (message: string) => { - const now = new Date(); - console.log(`[${now.toISOString()}] ${message}`); - }; - - logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务"); - logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); - if (skip_clickhouse_check) { - logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); - } - if (force_insert) { - logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); - } - - // 设置超时 - const startTime = Date.now(); - const timeoutMs = timeout_minutes * 60 * 1000; - - // 检查是否超时 - const checkTimeout = () => { - if (Date.now() - startTime > timeoutMs) { - console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); - return true; - } - return false; - }; - - // 获取MongoDB和ClickHouse的连接信息 - let mongoConfig: MongoConfig; - let clickhouseConfig: ClickHouseConfig; - - try { - const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); - console.log("原始MongoDB配置:", JSON.stringify(rawMongoConfig)); - - // 尝试解析配置,如果是字符串形式 - if (typeof rawMongoConfig === "string") { - try { - mongoConfig = JSON.parse(rawMongoConfig); - } catch (e) { - console.error("MongoDB配置解析失败:", e); - throw e; - } - } else { - mongoConfig = rawMongoConfig as MongoConfig; - } - - const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); - console.log("原始ClickHouse配置:", JSON.stringify(rawClickhouseConfig)); - - // 尝试解析配置,如果是字符串形式 - if (typeof rawClickhouseConfig === "string") { - try { - clickhouseConfig = JSON.parse(rawClickhouseConfig); - } catch (e) { - console.error("ClickHouse配置解析失败:", e); - throw e; - } - } else { - clickhouseConfig = rawClickhouseConfig as ClickHouseConfig; - } - - console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig)); - console.log("ClickHouse配置解析为:", JSON.stringify(clickhouseConfig)); - } catch (error) { - console.error("获取配置失败:", error); - throw error; - } - - // 构建MongoDB连接URL - let mongoUrl = "mongodb://"; - if (mongoConfig.username && mongoConfig.password) { - mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; - } - mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; - - console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); - - // 连接MongoDB - const client = new MongoClient(); - try { - await client.connect(mongoUrl); - console.log("MongoDB连接成功"); - - const db = client.database(mongoConfig.db); - const traceCollection = db.collection("trace"); - - // 构建查询条件,获取所有记录 - const query: Record = { - type: 1 // 只同步type为1的记录 - }; - - // 计算总记录数 - const totalRecords = await traceCollection.countDocuments(query); - console.log(`找到 ${totalRecords} 条记录需要同步`); - - // 限制此次处理的记录数量 - const recordsToProcess = Math.min(totalRecords, max_records); - console.log(`本次将处理 ${recordsToProcess} 条记录`); - - if (totalRecords === 0) { - console.log("没有记录需要同步,任务完成"); - return { - success: true, - records_synced: 0, - message: "没有记录需要同步" - }; - } - - // 检查ClickHouse连接状态 - const checkClickHouseConnection = async (): Promise => { - if (skip_clickhouse_check) { - logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); - return true; - } - - try { - logWithTimestamp("测试ClickHouse连接..."); - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, - }, - body: "SELECT 1", - // 设置5秒超时 - signal: AbortSignal.timeout(5000) - }); - - if (response.ok) { - logWithTimestamp("ClickHouse连接测试成功"); - return true; - } else { - const errorText = await response.text(); - logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); - return false; - } - } catch (err) { - const error = err as Error; - logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`); - return false; - } - }; - - // 检查记录是否已经存在于ClickHouse中 - const checkExistingRecords = async (records: TraceRecord[]): Promise => { - if (records.length === 0) return []; - - // 如果跳过ClickHouse检查或强制插入,则直接返回所有记录 - if (skip_clickhouse_check || force_insert) { - logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`); - return records; - } - - logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`); - - try { - // 提取所有记录的ID - const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询 - logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`); - - // 构建查询SQL,检查记录是否已存在,确保添加FORMAT JSON来获取正确的JSON格式响应 - const query = ` - SELECT link_id - FROM ${clickhouseConfig.clickhouse_database}.link_events - WHERE link_id IN ('${recordIds.join("','")}') - FORMAT JSON - `; - - logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`); - - // 发送请求到ClickHouse,添加10秒超时 - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` - }, - body: query, - signal: AbortSignal.timeout(10000) - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); - } - - // 获取响应文本以便记录 - const responseText = await response.text(); - logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`); - - if (!responseText.trim()) { - logWithTimestamp("ClickHouse返回空响应,假定没有记录存在"); - return records; // 如果响应为空,假设没有记录 - } - - // 解析结果 - let result; - try { - result = JSON.parse(responseText); - } catch (err) { - logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`); - throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`); - } - - // 确保result有正确的结构 - if (!result.data) { - logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`); - return records; // 如果没有data字段,假设没有记录 - } - - // 提取已存在的记录ID - const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id)); - - logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); - if (existingIds.size > 0) { - logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`); - } - - // 过滤出不存在的记录 - const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id - logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); - - return newRecords; - } catch (err) { - const error = err as Error; - logWithTimestamp(`ClickHouse查询出错: ${error.message}`); - if (skip_clickhouse_check) { - logWithTimestamp("已启用跳过ClickHouse检查,将继续处理所有记录"); - return records; - } else { - throw error; // 如果没有启用跳过检查,则抛出错误 - } - } - }; - - // 在处理记录前先检查ClickHouse连接 - const clickhouseConnected = await checkClickHouseConnection(); - if (!clickhouseConnected && !skip_clickhouse_check) { - logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); - throw new Error("ClickHouse连接失败,无法继续同步"); - } - - // 处理记录的函数 - const processRecords = async (records: TraceRecord[]) => { - if (records.length === 0) return 0; - - logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); - - // 检查记录是否已存在 - let newRecords; - try { - newRecords = await checkExistingRecords(records); - } catch (err) { - const error = err as Error; - logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); - if (!skip_clickhouse_check && !force_insert) { - throw error; - } - // 如果跳过检查或强制插入,则使用所有记录 - logWithTimestamp("将使用所有记录进行处理"); - newRecords = records; - } - - if (newRecords.length === 0) { - logWithTimestamp("所有记录都已存在,跳过处理"); - return 0; - } - - logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`); - - // 准备ClickHouse插入数据 - const clickhouseData = newRecords.map(record => { - // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 - return { - // UUID将由ClickHouse自动生成 (event_id) - link_id: record.slugId.toString(), - channel_id: record.label || "", - visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID - session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID - event_type: record.type <= 4 ? record.type : 1, // 确保event_type在枚举范围内 - ip_address: record.ip, - country: "", // 这些字段在MongoDB中不存在,使用默认值 - city: "", - referrer: record.url || "", - utm_source: "", - utm_medium: "", - utm_campaign: "", - user_agent: record.browser + " " + record.browserVersion, - device_type: record.platform || "unknown", - browser: record.browser || "", - os: record.platformOS || "", - time_spent_sec: 0, - is_bounce: true, - is_qr_scan: false, - qr_code_id: "", - conversion_type: 1, // 默认为'visit' - conversion_value: 0, - custom_data: `{"mongo_id":"${record._id.toString()}"}` - }; - }); - - // 生成ClickHouse插入SQL - const insertSQL = ` - INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events - (link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city, - referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os, - time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data) - VALUES ${clickhouseData.map(record => { - // 确保所有字符串值都是字符串类型,并安全处理替换 - const safeReplace = (val: any): string => { - // 确保值是字符串,如果是null或undefined则使用空字符串 - const str = val === null || val === undefined ? "" : String(val); - // 安全替换单引号 - return str.replace(/'/g, "''"); - }; - - return `('${record.link_id}', '${safeReplace(record.channel_id)}', '${record.visitor_id}', '${record.session_id}', - ${record.event_type}, '${safeReplace(record.ip_address)}', '', '', - '${safeReplace(record.referrer)}', '', '', '', '${safeReplace(record.user_agent)}', '${safeReplace(record.device_type)}', - '${safeReplace(record.browser)}', '${safeReplace(record.os)}', - 0, true, false, '', 1, 0, '${safeReplace(record.custom_data)}')`; - }).join(", ")} - `; - - if (insertSQL.length === 0) { - console.log("没有新记录需要插入"); - return 0; - } - - // 发送请求到ClickHouse,添加20秒超时 - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - try { - logWithTimestamp("发送插入请求到ClickHouse..."); - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` - }, - body: insertSQL, - signal: AbortSignal.timeout(20000) - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); - } - - logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`); - return newRecords.length; - } catch (err) { - const error = err as Error; - logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`); - throw error; - } - }; - - // 批量处理记录 - let processedRecords = 0; - let totalBatchRecords = 0; - - for (let page = 0; processedRecords < recordsToProcess; page++) { - // 检查超时 - if (checkTimeout()) { - logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); - break; - } - - // 每批次都输出进度 - logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); - - logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); - const records = await traceCollection.find( - query, - { - allowDiskUse: true, - sort: { createTime: 1 }, - skip: page * batch_size, - limit: batch_size - } - ).toArray(); - - if (records.length === 0) { - logWithTimestamp("没有找到更多数据,同步结束"); - break; - } - - // 找到数据,开始处理 - logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); - // 输出当前批次的部分数据信息 - if (records.length > 0) { - logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`); - if (records.length > 1) { - logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); - } - } - - const batchSize = await processRecords(records); - processedRecords += records.length; - totalBatchRecords += batchSize; - - logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); - } - - return { - success: true, - records_processed: processedRecords, - records_synced: totalBatchRecords, - message: "数据同步完成" - }; - } catch (err) { - console.error("同步过程中发生错误:", err); - return { - success: false, - error: err instanceof Error ? err.message : String(err), - stack: err instanceof Error ? err.stack : undefined - }; - } finally { - // 关闭MongoDB连接 - await client.close(); - console.log("MongoDB连接已关闭"); - } -} diff --git a/windmill/sync_shorturl_from_mongo_to_clickhouse.ts b/windmill/sync_shorturl_from_mongo_to_clickhouse.ts deleted file mode 100644 index cf1065d..0000000 --- a/windmill/sync_shorturl_from_mongo_to_clickhouse.ts +++ /dev/null @@ -1,532 +0,0 @@ -// 从MongoDB的short表同步数据到ClickHouse的links表 -import { getVariable, setVariable } from "npm:windmill-client@1"; -import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; - -interface MongoConfig { - host: string; - port: string; - db: string; - username: string; - password: string; -} - -interface ClickHouseConfig { - clickhouse_host: string; - clickhouse_port: number; - clickhouse_user: string; - clickhouse_password: string; - clickhouse_database: string; - clickhouse_url: string; -} - -interface ShortRecord { - _id: ObjectId; - slug: string; // 短链接的slug部分 - origin: string; // 原始URL - domain?: string; // 域名 - createTime: number; // 创建时间戳 - user?: string; // 创建用户 - title?: string; // 标题 - description?: string; // 描述 - tags?: string[]; // 标签 - active?: boolean; // 是否活跃 - expiresAt?: number; // 过期时间戳 - teamId?: string; // 团队ID - projectId?: string; // 项目ID -} - -interface SyncState { - last_sync_time: number; - records_synced: number; - last_sync_id?: string; -} - -export async function main( - batch_size = 100, - initial_sync = false, - max_records = 999999, - timeout_minutes = 30, - skip_clickhouse_check = false, - force_insert = false -) { - const logWithTimestamp = (message: string) => { - const now = new Date(); - console.log(`[${now.toISOString()}] ${message}`); - }; - - logWithTimestamp("开始执行MongoDB到ClickHouse的短链接同步任务..."); - logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); - if (skip_clickhouse_check) { - logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式,不会检查记录是否已存在"); - } - if (force_insert) { - logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); - } - - // 设置超时 - const startTime = Date.now(); - const timeoutMs = timeout_minutes * 60 * 1000; - - // 检查是否超时 - const checkTimeout = () => { - if (Date.now() - startTime > timeoutMs) { - console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`); - return true; - } - return false; - }; - - // 获取MongoDB和ClickHouse的连接信息 - let mongoConfig: MongoConfig; - let clickhouseConfig: ClickHouseConfig; - - try { - const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); - console.log("原始MongoDB配置:", typeof rawMongoConfig === "string" ? rawMongoConfig : JSON.stringify(rawMongoConfig)); - - // 尝试解析配置,如果是字符串形式 - if (typeof rawMongoConfig === "string") { - try { - mongoConfig = JSON.parse(rawMongoConfig); - } catch (e) { - console.error("MongoDB配置解析失败:", e); - throw e; - } - } else { - mongoConfig = rawMongoConfig as MongoConfig; - } - - const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse"); - console.log("原始ClickHouse配置:", typeof rawClickhouseConfig === "string" ? rawClickhouseConfig : JSON.stringify(rawClickhouseConfig)); - - // 尝试解析配置,如果是字符串形式 - if (typeof rawClickhouseConfig === "string") { - try { - clickhouseConfig = JSON.parse(rawClickhouseConfig); - } catch (e) { - console.error("ClickHouse配置解析失败:", e); - throw e; - } - } else { - clickhouseConfig = rawClickhouseConfig as ClickHouseConfig; - } - - console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig)); - console.log("ClickHouse配置解析为:", JSON.stringify(clickhouseConfig)); - } catch (error) { - console.error("获取配置失败:", error); - throw error; - } - - // 构建MongoDB连接URL - let mongoUrl = "mongodb://"; - if (mongoConfig.username && mongoConfig.password) { - mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; - } - mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; - - console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); - - // 获取上次同步的状态 - let syncState: SyncState; - try { - const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_links_sync_state"); - try { - syncState = JSON.parse(rawSyncState); - console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`); - } catch (parseError) { - console.error("解析同步状态失败:", parseError); - throw parseError; - } - } catch (_unused_error) { - console.log("未找到同步状态,创建初始同步状态"); - syncState = { - last_sync_time: 0, - records_synced: 0, - }; - } - - // 如果强制从头开始同步 - if (initial_sync) { - console.log("强制从头开始同步"); - syncState = { - last_sync_time: 0, - records_synced: 0, - }; - } - - // 连接MongoDB - const client = new MongoClient(); - try { - await client.connect(mongoUrl); - console.log("MongoDB连接成功"); - - const db = client.database(mongoConfig.db); - const shortCollection = db.collection("short"); - - // 构建查询条件,只查询新的记录 - const query: Record = {}; - - if (syncState.last_sync_time > 0) { - query.createTime = { $gt: syncState.last_sync_time }; - } - - if (syncState.last_sync_id) { - // 如果有上次同步的ID,则从该ID之后开始查询 - query._id = { $gt: new ObjectId(syncState.last_sync_id) }; - } - - // 计算总记录数 - const totalRecords = await shortCollection.countDocuments(query); - console.log(`找到 ${totalRecords} 条新短链接记录需要同步`); - - // 限制此次处理的记录数量 - const recordsToProcess = Math.min(totalRecords, max_records); - console.log(`本次将处理 ${recordsToProcess} 条记录`); - - if (totalRecords === 0) { - console.log("没有新记录需要同步,任务完成"); - return { - success: true, - records_synced: 0, - total_synced: syncState.records_synced, - message: "没有新记录需要同步" - }; - } - - // 分批处理记录 - let processedRecords = 0; - let lastId: string | undefined; - let lastCreateTime = syncState.last_sync_time; - let totalBatchRecords = 0; - - // 检查ClickHouse连接状态 - const checkClickHouseConnection = async (): Promise => { - if (skip_clickhouse_check) { - logWithTimestamp("已启用跳过ClickHouse检查,不测试连接"); - return true; - } - - try { - logWithTimestamp("测试ClickHouse连接..."); - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`, - }, - body: "SELECT 1 FORMAT JSON", - signal: AbortSignal.timeout(5000) - }); - - if (response.ok) { - logWithTimestamp("ClickHouse连接测试成功"); - return true; - } else { - const errorText = await response.text(); - logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`); - return false; - } - } catch (err) { - const error = err as Error; - logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`); - return false; - } - }; - - // 检查记录是否已经存在于ClickHouse中 - const checkExistingRecords = async (records: ShortRecord[]): Promise => { - if (records.length === 0) return []; - - // 如果跳过ClickHouse检查或强制插入,则直接返回所有记录 - if (skip_clickhouse_check || force_insert) { - logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`); - return records; - } - - logWithTimestamp(`正在检查 ${records.length} 条短链接记录是否已存在于ClickHouse中...`); - - try { - // 提取所有记录的ID - const recordIds = records.map(record => record._id.toString()); - logWithTimestamp(`待检查的短链接ID: ${recordIds.join(', ')}`); - - // 构建查询SQL,检查记录是否已存在 - const query = ` - SELECT link_id - FROM ${clickhouseConfig.clickhouse_database}.links - WHERE link_id IN ('${recordIds.join("','")}') - FORMAT JSON - `; - - logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`); - - // 发送请求到ClickHouse - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` - }, - body: query, - signal: AbortSignal.timeout(10000) - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`); - } - - // 获取响应文本 - const responseText = await response.text(); - logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`); - - if (!responseText.trim()) { - logWithTimestamp("ClickHouse返回空响应,假定没有记录存在"); - return records; - } - - // 解析结果 - let result; - try { - result = JSON.parse(responseText); - } catch (err) { - logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`); - throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`); - } - - // 确保result有正确的结构 - if (!result.data) { - logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`); - return records; - } - - // 提取已存在的记录ID - const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id)); - - logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`); - if (existingIds.size > 0) { - logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`); - } - - // 过滤出不存在的记录 - const newRecords = records.filter(record => !existingIds.has(record._id.toString())); - logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); - - return newRecords; - } catch (err) { - const error = err as Error; - logWithTimestamp(`ClickHouse查询出错: ${error.message}`); - if (skip_clickhouse_check) { - logWithTimestamp("已启用跳过ClickHouse检查,将继续处理所有记录"); - return records; - } else { - throw error; - } - } - }; - - // 在处理记录前先检查ClickHouse连接 - const clickhouseConnected = await checkClickHouseConnection(); - if (!clickhouseConnected && !skip_clickhouse_check) { - logWithTimestamp("⚠️ ClickHouse连接测试失败,请启用skip_clickhouse_check=true参数来跳过连接检查"); - throw new Error("ClickHouse连接失败,无法继续同步"); - } - - // 处理记录的函数 - const processRecords = async (records: ShortRecord[]) => { - if (records.length === 0) return 0; - - logWithTimestamp(`开始处理批次数据,共 ${records.length} 条短链接记录...`); - - // 检查记录是否已存在 - let newRecords; - try { - newRecords = await checkExistingRecords(records); - } catch (err) { - const error = err as Error; - logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); - if (!skip_clickhouse_check && !force_insert) { - throw error; - } - // 如果跳过检查或强制插入,则使用所有记录 - logWithTimestamp("将使用所有记录进行处理"); - newRecords = records; - } - - if (newRecords.length === 0) { - logWithTimestamp("所有记录都已存在,跳过处理"); - // 更新同步状态,即使没有新增记录 - const lastRecord = records[records.length - 1]; - lastId = lastRecord._id.toString(); - lastCreateTime = lastRecord.createTime; - return 0; - } - - logWithTimestamp(`准备处理 ${newRecords.length} 条新短链接记录...`); - - // 准备ClickHouse插入数据 - const clickhouseData = newRecords.map(record => { - // 转换MongoDB记录为ClickHouse格式,匹配ClickHouse表结构 - // 处理日期时间,移除ISO格式中的Z以使ClickHouse正确解析 - const createdAtStr = new Date(record.createTime).toISOString().replace('Z', ''); - const expiresAtStr = record.expiresAt ? new Date(record.expiresAt).toISOString().replace('Z', '') : null; - - return { - link_id: record._id.toString(), // 使用MongoDB的_id作为link_id - original_url: record.origin || "", - created_at: createdAtStr, - created_by: record.user || "unknown", - title: record.slug, // 使用slug作为title - description: record.description || "", - tags: record.tags || [], - is_active: record.active !== undefined ? record.active : true, - expires_at: expiresAtStr, - team_id: record.teamId || "", - project_id: record.projectId || "" - }; - }); - - // 更新同步状态(使用原始records的最后一条,以确保进度正确) - const lastRecord = records[records.length - 1]; - lastId = lastRecord._id.toString(); - lastCreateTime = lastRecord.createTime; - logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`); - - // 生成ClickHouse插入SQL - // 注意:Array类型需要特殊处理,这里将tags作为JSON字符串处理 - const insertSQL = ` - INSERT INTO ${clickhouseConfig.clickhouse_database}.links - (link_id, original_url, created_at, created_by, title, description, tags, is_active, expires_at, team_id, project_id) - VALUES - ${clickhouseData.map(record => { - // 处理tags数组 - const tagsStr = JSON.stringify(record.tags || []); - - // 处理expires_at可能为null的情况 - const expiresAt = record.expires_at ? `'${record.expires_at}'` : "NULL"; - - // 确保所有字段在使用replace前都有默认值 - const safeOriginalUrl = (record.original_url || "").replace(/'/g, "''"); - const safeCreatedBy = (record.created_by || "unknown").replace(/'/g, "''"); - const safeTitle = (record.title || "无标题").replace(/'/g, "''"); - const safeDescription = (record.description || "").replace(/'/g, "''"); - const safeTeamId = record.team_id || ""; - const safeProjectId = record.project_id || ""; - - return `('${record.link_id}', '${safeOriginalUrl}', '${record.created_at}', '${safeCreatedBy}', '${safeTitle}', '${safeDescription}', ${tagsStr}, ${record.is_active}, ${expiresAt}, '${safeTeamId}', '${safeProjectId}')`; - }).join(", ")} - `; - - if (clickhouseData.length === 0) { - console.log("没有新记录需要插入"); - return 0; - } - - // 发送请求到ClickHouse - const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`; - try { - logWithTimestamp("发送插入请求到ClickHouse..."); - const response = await fetch(clickhouseUrl, { - method: "POST", - headers: { - "Content-Type": "application/x-www-form-urlencoded", - "Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}` - }, - body: insertSQL, - signal: AbortSignal.timeout(20000) - }); - - if (!response.ok) { - const errorText = await response.text(); - throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`); - } - - logWithTimestamp(`成功插入 ${newRecords.length} 条短链接记录到ClickHouse`); - return newRecords.length; - } catch (err) { - const error = err as Error; - logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`); - throw error; - } - }; - - // 批量处理记录 - for (let page = 0; processedRecords < recordsToProcess; page++) { - // 检查超时 - if (checkTimeout()) { - logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); - break; - } - - // 每批次都输出进度 - logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); - - logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); - const records = await shortCollection.find(query) - .sort({ createTime: 1, _id: 1 }) - .skip(page * batch_size) - .limit(batch_size) - .toArray(); - - if (records.length === 0) { - logWithTimestamp(`第 ${page+1} 批次没有找到数据,结束处理`); - break; - } - - logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); - // 输出当前批次的部分数据信息 - if (records.length > 0) { - logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, Slug=${records[0].slug}, 时间=${new Date(records[0].createTime).toISOString()}`); - if (records.length > 1) { - logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, Slug=${records[records.length-1].slug}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`); - } - } - - const batchSize = await processRecords(records); - processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在 - totalBatchRecords += batchSize; // 只增加实际插入的记录数 - - logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); - - // 更新查询条件,以便下一批次查询 - query.createTime = { $gt: lastCreateTime }; - if (lastId) { - query._id = { $gt: new ObjectId(lastId) }; - } - logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`); - } - - // 更新同步状态 - const newSyncState: SyncState = { - last_sync_time: lastCreateTime, - records_synced: syncState.records_synced + totalBatchRecords, - last_sync_id: lastId - }; - - await setVariable("f/shorturl_analytics/clickhouse/shorturl_links_sync_state", JSON.stringify(newSyncState)); - console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`); - - return { - success: true, - records_processed: processedRecords, - records_synced: totalBatchRecords, - total_synced: newSyncState.records_synced, - last_sync_time: new Date(newSyncState.last_sync_time).toISOString(), - message: "短链接数据同步完成" - }; - } catch (err) { - console.error("同步过程中发生错误:", err); - return { - success: false, - error: err instanceof Error ? err.message : String(err), - stack: err instanceof Error ? err.stack : undefined - }; - } finally { - // 关闭MongoDB连接 - await client.close(); - console.log("MongoDB连接已关闭"); - } -} \ No newline at end of file