409 lines
14 KiB
TypeScript
409 lines
14 KiB
TypeScript
// Sync data from MongoDB trace table to ClickHouse events table
|
|
import { getVariable } from "npm:windmill-client@1";
|
|
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
|
|
|
|
interface MongoConfig {
|
|
host: string;
|
|
port: string;
|
|
db: string;
|
|
username: string;
|
|
password: string;
|
|
}
|
|
|
|
interface ClickHouseConfig {
|
|
clickhouse_host: string;
|
|
clickhouse_port: number;
|
|
clickhouse_user: string;
|
|
clickhouse_password: string;
|
|
clickhouse_url: string;
|
|
}
|
|
|
|
interface TraceRecord {
|
|
_id: ObjectId;
|
|
slugId: ObjectId;
|
|
label: string | null;
|
|
ip: string;
|
|
type: number;
|
|
platform: string;
|
|
platformOS: string;
|
|
browser: string;
|
|
browserVersion: string;
|
|
url: string;
|
|
createTime: number;
|
|
}
|
|
|
|
interface ShortRecord {
|
|
_id: ObjectId;
|
|
slug: string; // 短链接的slug部分
|
|
origin: string; // 原始URL
|
|
domain?: string; // 域名
|
|
createTime: number; // 创建时间戳
|
|
user?: string; // 创建用户
|
|
title?: string; // 标题
|
|
description?: string; // 描述
|
|
tags?: string[]; // 标签
|
|
active?: boolean; // 是否活跃
|
|
expiresAt?: number; // 过期时间戳
|
|
teamId?: string; // 团队ID
|
|
projectId?: string; // 项目ID
|
|
}
|
|
|
|
interface ClickHouseRow {
|
|
event_id: string;
|
|
event_attributes: string;
|
|
}
|
|
|
|
export async function main(
|
|
batch_size = 1000,
|
|
max_records = 9999999,
|
|
timeout_minutes = 60,
|
|
skip_clickhouse_check = false,
|
|
force_insert = false
|
|
) {
|
|
const logWithTimestamp = (message: string) => {
|
|
const now = new Date();
|
|
console.log(`[${now.toISOString()}] ${message}`);
|
|
};
|
|
|
|
logWithTimestamp("Starting sync from MongoDB to ClickHouse events table");
|
|
logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`);
|
|
|
|
// Set timeout
|
|
const startTime = Date.now();
|
|
const timeoutMs = timeout_minutes * 60 * 1000;
|
|
|
|
const checkTimeout = () => {
|
|
if (Date.now() - startTime > timeoutMs) {
|
|
console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`);
|
|
return true;
|
|
}
|
|
return false;
|
|
};
|
|
|
|
// Get MongoDB and ClickHouse connection info
|
|
let mongoConfig: MongoConfig;
|
|
let clickhouseConfig: ClickHouseConfig;
|
|
|
|
try {
|
|
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
|
|
mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig;
|
|
|
|
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
|
|
clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig;
|
|
} catch (error) {
|
|
console.error("Failed to get config:", error);
|
|
throw error;
|
|
}
|
|
|
|
// Build MongoDB connection URL
|
|
let mongoUrl = "mongodb://";
|
|
if (mongoConfig.username && mongoConfig.password) {
|
|
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
|
|
}
|
|
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
|
|
|
|
// Connect to MongoDB
|
|
const client = new MongoClient();
|
|
try {
|
|
await client.connect(mongoUrl);
|
|
console.log("MongoDB connected successfully");
|
|
|
|
const db = client.database(mongoConfig.db);
|
|
const traceCollection = db.collection<TraceRecord>("trace");
|
|
const shortCollection = db.collection<ShortRecord>("short");
|
|
|
|
// Build query conditions
|
|
const query: Record<string, unknown> = {
|
|
type: 1 // Only sync records with type 1
|
|
};
|
|
|
|
// Count total records
|
|
const totalRecords = await traceCollection.countDocuments(query);
|
|
console.log(`Found ${totalRecords} records to sync`);
|
|
|
|
const recordsToProcess = Math.min(totalRecords, max_records);
|
|
console.log(`Will process ${recordsToProcess} records`);
|
|
|
|
if (totalRecords === 0) {
|
|
console.log("No records to sync, task completed");
|
|
return {
|
|
success: true,
|
|
records_synced: 0,
|
|
message: "No records to sync"
|
|
};
|
|
}
|
|
|
|
// Check ClickHouse connection
|
|
const checkClickHouseConnection = async (): Promise<boolean> => {
|
|
if (skip_clickhouse_check) {
|
|
logWithTimestamp("Skipping ClickHouse connection check");
|
|
return true;
|
|
}
|
|
|
|
try {
|
|
logWithTimestamp("Testing ClickHouse connection...");
|
|
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
|
|
const response = await fetch(clickhouseUrl, {
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
|
|
},
|
|
body: "SELECT 1",
|
|
signal: AbortSignal.timeout(5000)
|
|
});
|
|
|
|
if (response.ok) {
|
|
logWithTimestamp("ClickHouse connection test successful");
|
|
return true;
|
|
} else {
|
|
const errorText = await response.text();
|
|
logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`);
|
|
return false;
|
|
}
|
|
} catch (err) {
|
|
logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`);
|
|
return false;
|
|
}
|
|
};
|
|
|
|
// Check if records exist in ClickHouse
|
|
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
|
|
if (records.length === 0) return [];
|
|
|
|
if (skip_clickhouse_check || force_insert) {
|
|
logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`);
|
|
return records;
|
|
}
|
|
|
|
try {
|
|
const recordIds = records.map(record => record._id.toString());
|
|
|
|
const query = `
|
|
SELECT event_id
|
|
FROM shorturl_analytics.events
|
|
WHERE event_attributes LIKE '%"mongo_id":"%'
|
|
AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%')
|
|
FORMAT JSON
|
|
`;
|
|
|
|
const response = await fetch(clickhouseConfig.clickhouse_url, {
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
|
|
},
|
|
body: query,
|
|
signal: AbortSignal.timeout(10000)
|
|
});
|
|
|
|
if (!response.ok) {
|
|
const errorText = await response.text();
|
|
throw new Error(`ClickHouse query error: ${response.status} ${errorText}`);
|
|
}
|
|
|
|
const result = await response.json();
|
|
const existingIds = new Set(result.data.map((row: ClickHouseRow) => {
|
|
const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/);
|
|
return matches ? matches[1] : null;
|
|
}).filter(Boolean));
|
|
|
|
return records.filter(record => !existingIds.has(record._id.toString()));
|
|
} catch (err) {
|
|
logWithTimestamp(`Error checking existing records: ${(err as Error).message}`);
|
|
return skip_clickhouse_check ? records : [];
|
|
}
|
|
};
|
|
|
|
// Process records function
|
|
const processRecords = async (records: TraceRecord[]) => {
|
|
if (records.length === 0) return 0;
|
|
|
|
const newRecords = await checkExistingRecords(records);
|
|
if (newRecords.length === 0) {
|
|
logWithTimestamp("All records already exist, skipping");
|
|
return 0;
|
|
}
|
|
|
|
// Get link information for all records
|
|
const slugIds = newRecords.map(record => record.slugId);
|
|
const shortLinks = await shortCollection.find({
|
|
_id: { $in: slugIds }
|
|
}).toArray();
|
|
|
|
// Create a map for quick lookup
|
|
const shortLinksMap = new Map(shortLinks.map(link => [link._id.toString(), link]));
|
|
|
|
// Prepare ClickHouse insert data
|
|
const clickhouseData = newRecords.map(record => {
|
|
const shortLink = shortLinksMap.get(record.slugId.toString());
|
|
|
|
// 将毫秒时间戳转换为 DateTime64(3) 格式
|
|
const formatDateTime = (timestamp: number) => {
|
|
return new Date(timestamp).toISOString().replace('T', ' ').replace('Z', '');
|
|
};
|
|
|
|
return {
|
|
// Event base information
|
|
event_id: record._id.toString(),
|
|
event_time: formatDateTime(record.createTime),
|
|
event_type: "click",
|
|
event_attributes: JSON.stringify({
|
|
original_type: record.type
|
|
}),
|
|
|
|
// Link information from short collection
|
|
link_id: record.slugId.toString(),
|
|
link_slug: shortLink?.slug || "",
|
|
link_label: record.label || "",
|
|
link_title: "",
|
|
link_original_url: shortLink?.origin || "",
|
|
link_attributes: JSON.stringify({
|
|
domain: shortLink?.domain || null
|
|
}),
|
|
link_created_at: shortLink?.createTime ? formatDateTime(shortLink.createTime) : formatDateTime(record.createTime),
|
|
link_expires_at: shortLink?.expiresAt ? formatDateTime(shortLink.expiresAt) : null,
|
|
link_tags: "[]", // Empty array as default
|
|
|
|
// User information
|
|
user_id: shortLink?.user || "",
|
|
user_name: "",
|
|
user_email: "",
|
|
user_attributes: "{}",
|
|
|
|
// Team information
|
|
team_id: shortLink?.teamId || "",
|
|
team_name: "",
|
|
team_attributes: "{}",
|
|
|
|
// Project information
|
|
project_id: shortLink?.projectId || "",
|
|
project_name: "",
|
|
project_attributes: "{}",
|
|
|
|
// QR code information
|
|
qr_code_id: "",
|
|
qr_code_name: "",
|
|
qr_code_attributes: "{}",
|
|
|
|
// Visitor information
|
|
visitor_id: "", // Empty string as default
|
|
session_id: `${record.slugId.toString()}-${record.createTime}`,
|
|
ip_address: record.ip || "",
|
|
country: "",
|
|
city: "",
|
|
device_type: record.platform || "",
|
|
browser: record.browser || "",
|
|
os: record.platformOS || "",
|
|
user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(),
|
|
|
|
// Source information
|
|
referrer: record.url || "",
|
|
utm_source: "",
|
|
utm_medium: "",
|
|
utm_campaign: "",
|
|
|
|
// Interaction information
|
|
time_spent_sec: 0,
|
|
is_bounce: true,
|
|
is_qr_scan: false,
|
|
conversion_type: "visit",
|
|
conversion_value: 0
|
|
};
|
|
});
|
|
|
|
// Generate ClickHouse insert SQL
|
|
const rows = clickhouseData.map(row => {
|
|
// 只需要处理JSON字符串的转义
|
|
const formattedRow = {
|
|
...row,
|
|
event_attributes: row.event_attributes.replace(/\\/g, '\\\\'),
|
|
link_attributes: row.link_attributes.replace(/\\/g, '\\\\')
|
|
};
|
|
return JSON.stringify(formattedRow);
|
|
}).join('\n');
|
|
|
|
const insertSQL = `INSERT INTO shorturl_analytics.events FORMAT JSONEachRow\n${rows}`;
|
|
|
|
try {
|
|
const response = await fetch(clickhouseConfig.clickhouse_url, {
|
|
method: "POST",
|
|
headers: {
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
|
|
},
|
|
body: insertSQL,
|
|
signal: AbortSignal.timeout(20000)
|
|
});
|
|
|
|
if (!response.ok) {
|
|
const errorText = await response.text();
|
|
throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`);
|
|
}
|
|
|
|
logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`);
|
|
return newRecords.length;
|
|
} catch (err) {
|
|
logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`);
|
|
throw err;
|
|
}
|
|
};
|
|
|
|
// Check ClickHouse connection before processing
|
|
const clickhouseConnected = await checkClickHouseConnection();
|
|
if (!clickhouseConnected && !skip_clickhouse_check) {
|
|
throw new Error("ClickHouse connection failed, cannot continue sync");
|
|
}
|
|
|
|
// Process records in batches
|
|
let processedRecords = 0;
|
|
let totalBatchRecords = 0;
|
|
|
|
for (let page = 0; processedRecords < recordsToProcess; page++) {
|
|
if (checkTimeout()) {
|
|
logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`);
|
|
break;
|
|
}
|
|
|
|
logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`);
|
|
|
|
const records = await traceCollection.find(
|
|
query,
|
|
{
|
|
allowDiskUse: true,
|
|
sort: { createTime: 1 },
|
|
skip: page * batch_size,
|
|
limit: batch_size
|
|
}
|
|
).toArray();
|
|
|
|
if (records.length === 0) {
|
|
logWithTimestamp("No more records found, sync complete");
|
|
break;
|
|
}
|
|
|
|
const batchSize = await processRecords(records);
|
|
processedRecords += records.length;
|
|
totalBatchRecords += batchSize;
|
|
|
|
logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`);
|
|
}
|
|
|
|
return {
|
|
success: true,
|
|
records_processed: processedRecords,
|
|
records_synced: totalBatchRecords,
|
|
message: "Data sync completed"
|
|
};
|
|
} catch (err) {
|
|
console.error("Error during sync:", err);
|
|
return {
|
|
success: false,
|
|
error: err instanceof Error ? err.message : String(err),
|
|
stack: err instanceof Error ? err.stack : undefined
|
|
};
|
|
} finally {
|
|
await client.close();
|
|
console.log("MongoDB connection closed");
|
|
}
|
|
}
|