sync trace & short to clickhouse events

This commit is contained in:
2025-03-25 14:35:01 +08:00
parent 231cf556b0
commit 1e9e5928d7
5 changed files with 779 additions and 1009 deletions

View File

@@ -10,8 +10,9 @@ DROP TABLE IF EXISTS shorturl_analytics.events;
-- 创建新表 -- 创建新表
CREATE TABLE IF NOT EXISTS shorturl_analytics.events ( CREATE TABLE IF NOT EXISTS shorturl_analytics.events (
-- 事件基础信息 -- 事件基础信息
event_id UUID DEFAULT generateUUIDv4(), event_id String,
event_time DateTime64(3) DEFAULT now64(), event_time DateTime64(3),
-- 精确到毫秒的时间戳
event_type String, event_type String,
-- click, redirect, conversion, error -- click, redirect, conversion, error
event_attributes String DEFAULT '{}', event_attributes String DEFAULT '{}',
@@ -25,7 +26,9 @@ CREATE TABLE IF NOT EXISTS shorturl_analytics.events (
link_original_url String, link_original_url String,
link_attributes String DEFAULT '{}', link_attributes String DEFAULT '{}',
link_created_at DateTime64(3), link_created_at DateTime64(3),
-- 精确到毫秒的时间戳
link_expires_at Nullable(DateTime64(3)), link_expires_at Nullable(DateTime64(3)),
-- 精确到毫秒的时间戳
link_tags String DEFAULT '[]', link_tags String DEFAULT '[]',
-- Array of {id, name, attributes} -- Array of {id, name, attributes}
-- 用户信息 -- 用户信息
@@ -68,6 +71,6 @@ CREATE TABLE IF NOT EXISTS shorturl_analytics.events (
conversion_type String, conversion_type String,
-- 改为String类型 -- 改为String类型
conversion_value Float64 DEFAULT 0 conversion_value Float64 DEFAULT 0
) ENGINE = MergeTree() PARTITION BY toYYYYMM(event_time) ) ENGINE = MergeTree() PARTITION BY toYYYYMM(event_time) -- 直接使用DateTime64进行分区
ORDER BY ORDER BY
(event_time, link_id, event_id) SETTINGS index_granularity = 8192; (event_time, link_id, event_id) SETTINGS index_granularity = 8192;

View File

@@ -0,0 +1,364 @@
// Sync data from MongoDB trace table to ClickHouse events table
import { getVariable } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
interface MongoConfig {
host: string;
port: string;
db: string;
username: string;
password: string;
}
interface ClickHouseConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_database: string;
clickhouse_url: string;
}
interface TraceRecord {
_id: ObjectId;
slugId: ObjectId;
label: string | null;
ip: string;
type: number;
platform: string;
platformOS: string;
browser: string;
browserVersion: string;
url: string;
createTime: number;
}
export async function main(
batch_size = 1000,
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("Starting sync from MongoDB to ClickHouse events table");
logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`);
// Set timeout
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`);
return true;
}
return false;
};
// Get MongoDB and ClickHouse connection info
let mongoConfig: MongoConfig;
let clickhouseConfig: ClickHouseConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig;
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig;
} catch (error) {
console.error("Failed to get config:", error);
throw error;
}
// Build MongoDB connection URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
// Connect to MongoDB
const client = new MongoClient();
try {
await client.connect(mongoUrl);
console.log("MongoDB connected successfully");
const db = client.database(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace");
// Build query conditions
const query: Record<string, unknown> = {
type: 1 // Only sync records with type 1
};
// Count total records
const totalRecords = await traceCollection.countDocuments(query);
console.log(`Found ${totalRecords} records to sync`);
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`Will process ${recordsToProcess} records`);
if (totalRecords === 0) {
console.log("No records to sync, task completed");
return {
success: true,
records_synced: 0,
message: "No records to sync"
};
}
// Check ClickHouse connection
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("Skipping ClickHouse connection check");
return true;
}
try {
logWithTimestamp("Testing ClickHouse connection...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
},
body: "SELECT 1",
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse connection test successful");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`);
return false;
}
};
// Check if records exist in ClickHouse
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
if (records.length === 0) return [];
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`);
return records;
}
try {
const recordIds = records.map(record => record._id.toString());
const query = `
SELECT event_id
FROM ${clickhouseConfig.clickhouse_database}.events
WHERE event_attributes LIKE '%"mongo_id":"%'
AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%')
FORMAT JSON
`;
const response = await fetch(clickhouseConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse query error: ${response.status} ${errorText}`);
}
const result = await response.json();
const existingIds = new Set(result.data.map((row: any) => {
const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/);
return matches ? matches[1] : null;
}).filter(Boolean));
return records.filter(record => !existingIds.has(record._id.toString()));
} catch (err) {
logWithTimestamp(`Error checking existing records: ${(err as Error).message}`);
return skip_clickhouse_check ? records : [];
}
};
// Process records function
const processRecords = async (records: TraceRecord[]) => {
if (records.length === 0) return 0;
const newRecords = await checkExistingRecords(records);
if (newRecords.length === 0) {
logWithTimestamp("All records already exist, skipping");
return 0;
}
// Prepare ClickHouse insert data
const clickhouseData = newRecords.map(record => {
const eventTime = new Date(record.createTime).toISOString();
return {
event_time: eventTime,
event_type: "click",
event_attributes: JSON.stringify({
mongo_id: record._id.toString(),
original_type: record.type
}),
// Link information
link_id: record.slugId.toString(),
link_slug: "",
link_label: record.label || "",
link_title: "",
link_original_url: record.url || "",
link_attributes: "{}",
link_created_at: eventTime,
link_expires_at: null,
link_tags: "[]",
// User information (empty as not available in trace)
user_id: "",
user_name: "",
user_email: "",
user_attributes: "{}",
// Team information (empty as not available in trace)
team_id: "",
team_name: "",
team_attributes: "{}",
// Project information (empty as not available in trace)
project_id: "",
project_name: "",
project_attributes: "{}",
// QR code information (empty as not available in trace)
qr_code_id: "",
qr_code_name: "",
qr_code_attributes: "{}",
// Visitor information
visitor_id: record._id.toString(),
session_id: `${record._id.toString()}-${record.createTime}`,
ip_address: record.ip || "",
country: "",
city: "",
device_type: record.platform || "unknown",
browser: record.browser || "",
os: record.platformOS || "",
user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(),
// Source information
referrer: record.url || "",
utm_source: "",
utm_medium: "",
utm_campaign: "",
// Interaction information
time_spent_sec: 0,
is_bounce: true,
is_qr_scan: false,
conversion_type: "visit",
conversion_value: 0
};
});
// Generate ClickHouse insert SQL
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.events
FORMAT JSONEachRow
${JSON.stringify(clickhouseData)}
`;
try {
const response = await fetch(clickhouseConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: insertSQL,
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`);
}
logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`);
return newRecords.length;
} catch (err) {
logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`);
throw err;
}
};
// Check ClickHouse connection before processing
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
throw new Error("ClickHouse connection failed, cannot continue sync");
}
// Process records in batches
let processedRecords = 0;
let totalBatchRecords = 0;
for (let page = 0; processedRecords < recordsToProcess; page++) {
if (checkTimeout()) {
logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`);
break;
}
logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`);
const records = await traceCollection.find(
query,
{
allowDiskUse: true,
sort: { createTime: 1 },
skip: page * batch_size,
limit: batch_size
}
).toArray();
if (records.length === 0) {
logWithTimestamp("No more records found, sync complete");
break;
}
const batchSize = await processRecords(records);
processedRecords += records.length;
totalBatchRecords += batchSize;
logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`);
}
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
message: "Data sync completed"
};
} catch (err) {
console.error("Error during sync:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
await client.close();
console.log("MongoDB connection closed");
}
}

View File

@@ -0,0 +1,409 @@
// Sync data from MongoDB trace table to ClickHouse events table
import { getVariable } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
interface MongoConfig {
host: string;
port: string;
db: string;
username: string;
password: string;
}
interface ClickHouseConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_url: string;
}
interface TraceRecord {
_id: ObjectId;
slugId: ObjectId;
label: string | null;
ip: string;
type: number;
platform: string;
platformOS: string;
browser: string;
browserVersion: string;
url: string;
createTime: number;
}
interface ShortRecord {
_id: ObjectId;
slug: string; // 短链接的slug部分
origin: string; // 原始URL
domain?: string; // 域名
createTime: number; // 创建时间戳
user?: string; // 创建用户
title?: string; // 标题
description?: string; // 描述
tags?: string[]; // 标签
active?: boolean; // 是否活跃
expiresAt?: number; // 过期时间戳
teamId?: string; // 团队ID
projectId?: string; // 项目ID
}
interface ClickHouseRow {
event_id: string;
event_attributes: string;
}
export async function main(
batch_size = 1000,
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("Starting sync from MongoDB to ClickHouse events table");
logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`);
// Set timeout
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`);
return true;
}
return false;
};
// Get MongoDB and ClickHouse connection info
let mongoConfig: MongoConfig;
let clickhouseConfig: ClickHouseConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig;
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig;
} catch (error) {
console.error("Failed to get config:", error);
throw error;
}
// Build MongoDB connection URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
// Connect to MongoDB
const client = new MongoClient();
try {
await client.connect(mongoUrl);
console.log("MongoDB connected successfully");
const db = client.database(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace");
const shortCollection = db.collection<ShortRecord>("short");
// Build query conditions
const query: Record<string, unknown> = {
type: 1 // Only sync records with type 1
};
// Count total records
const totalRecords = await traceCollection.countDocuments(query);
console.log(`Found ${totalRecords} records to sync`);
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`Will process ${recordsToProcess} records`);
if (totalRecords === 0) {
console.log("No records to sync, task completed");
return {
success: true,
records_synced: 0,
message: "No records to sync"
};
}
// Check ClickHouse connection
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("Skipping ClickHouse connection check");
return true;
}
try {
logWithTimestamp("Testing ClickHouse connection...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
},
body: "SELECT 1",
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse connection test successful");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`);
return false;
}
};
// Check if records exist in ClickHouse
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
if (records.length === 0) return [];
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`);
return records;
}
try {
const recordIds = records.map(record => record._id.toString());
const query = `
SELECT event_id
FROM shorturl_analytics.events
WHERE event_attributes LIKE '%"mongo_id":"%'
AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%')
FORMAT JSON
`;
const response = await fetch(clickhouseConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse query error: ${response.status} ${errorText}`);
}
const result = await response.json();
const existingIds = new Set(result.data.map((row: ClickHouseRow) => {
const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/);
return matches ? matches[1] : null;
}).filter(Boolean));
return records.filter(record => !existingIds.has(record._id.toString()));
} catch (err) {
logWithTimestamp(`Error checking existing records: ${(err as Error).message}`);
return skip_clickhouse_check ? records : [];
}
};
// Process records function
const processRecords = async (records: TraceRecord[]) => {
if (records.length === 0) return 0;
const newRecords = await checkExistingRecords(records);
if (newRecords.length === 0) {
logWithTimestamp("All records already exist, skipping");
return 0;
}
// Get link information for all records
const slugIds = newRecords.map(record => record.slugId);
const shortLinks = await shortCollection.find({
_id: { $in: slugIds }
}).toArray();
// Create a map for quick lookup
const shortLinksMap = new Map(shortLinks.map(link => [link._id.toString(), link]));
// Prepare ClickHouse insert data
const clickhouseData = newRecords.map(record => {
const shortLink = shortLinksMap.get(record.slugId.toString());
// 将毫秒时间戳转换为 DateTime64(3) 格式
const formatDateTime = (timestamp: number) => {
return new Date(timestamp).toISOString().replace('T', ' ').replace('Z', '');
};
return {
// Event base information
event_id: record._id.toString(),
event_time: formatDateTime(record.createTime),
event_type: "click",
event_attributes: JSON.stringify({
original_type: record.type
}),
// Link information from short collection
link_id: record.slugId.toString(),
link_slug: shortLink?.slug || "",
link_label: record.label || "",
link_title: "",
link_original_url: shortLink?.origin || "",
link_attributes: JSON.stringify({
domain: shortLink?.domain || null
}),
link_created_at: shortLink?.createTime ? formatDateTime(shortLink.createTime) : formatDateTime(record.createTime),
link_expires_at: shortLink?.expiresAt ? formatDateTime(shortLink.expiresAt) : null,
link_tags: "[]", // Empty array as default
// User information
user_id: shortLink?.user || "",
user_name: "",
user_email: "",
user_attributes: "{}",
// Team information
team_id: shortLink?.teamId || "",
team_name: "",
team_attributes: "{}",
// Project information
project_id: shortLink?.projectId || "",
project_name: "",
project_attributes: "{}",
// QR code information
qr_code_id: "",
qr_code_name: "",
qr_code_attributes: "{}",
// Visitor information
visitor_id: "", // Empty string as default
session_id: `${record.slugId.toString()}-${record.createTime}`,
ip_address: record.ip || "",
country: "",
city: "",
device_type: record.platform || "",
browser: record.browser || "",
os: record.platformOS || "",
user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(),
// Source information
referrer: record.url || "",
utm_source: "",
utm_medium: "",
utm_campaign: "",
// Interaction information
time_spent_sec: 0,
is_bounce: true,
is_qr_scan: false,
conversion_type: "visit",
conversion_value: 0
};
});
// Generate ClickHouse insert SQL
const rows = clickhouseData.map(row => {
// 只需要处理JSON字符串的转义
const formattedRow = {
...row,
event_attributes: row.event_attributes.replace(/\\/g, '\\\\'),
link_attributes: row.link_attributes.replace(/\\/g, '\\\\')
};
return JSON.stringify(formattedRow);
}).join('\n');
const insertSQL = `INSERT INTO shorturl_analytics.events FORMAT JSONEachRow\n${rows}`;
try {
const response = await fetch(clickhouseConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: insertSQL,
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`);
}
logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`);
return newRecords.length;
} catch (err) {
logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`);
throw err;
}
};
// Check ClickHouse connection before processing
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
throw new Error("ClickHouse connection failed, cannot continue sync");
}
// Process records in batches
let processedRecords = 0;
let totalBatchRecords = 0;
for (let page = 0; processedRecords < recordsToProcess; page++) {
if (checkTimeout()) {
logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`);
break;
}
logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`);
const records = await traceCollection.find(
query,
{
allowDiskUse: true,
sort: { createTime: 1 },
skip: page * batch_size,
limit: batch_size
}
).toArray();
if (records.length === 0) {
logWithTimestamp("No more records found, sync complete");
break;
}
const batchSize = await processRecords(records);
processedRecords += records.length;
totalBatchRecords += batchSize;
logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`);
}
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
message: "Data sync completed"
};
} catch (err) {
console.error("Error during sync:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
await client.close();
console.log("MongoDB connection closed");
}
}

View File

@@ -1,474 +0,0 @@
// 从MongoDB的trace表同步数据到ClickHouse的link_events表
import { getVariable, setVariable } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
interface MongoConfig {
host: string;
port: string;
db: string;
username: string;
password: string;
}
interface ClickHouseConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_database: string;
clickhouse_url: string;
}
interface TraceRecord {
_id: ObjectId;
slugId: ObjectId;
label: string | null;
ip: string;
type: number;
platform: string;
platformOS: string;
browser: string;
browserVersion: string;
url: string;
createTime: number;
}
interface SyncState {
last_sync_time: number;
records_synced: number;
last_sync_id?: string;
}
export async function main(
batch_size = 1000,
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务");
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
if (skip_clickhouse_check) {
logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式不会检查记录是否已存在");
}
if (force_insert) {
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
}
// 设置超时
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
// 检查是否超时
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`);
return true;
}
return false;
};
// 获取MongoDB和ClickHouse的连接信息
let mongoConfig: MongoConfig;
let clickhouseConfig: ClickHouseConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
console.log("原始MongoDB配置:", JSON.stringify(rawMongoConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawMongoConfig === "string") {
try {
mongoConfig = JSON.parse(rawMongoConfig);
} catch (e) {
console.error("MongoDB配置解析失败:", e);
throw e;
}
} else {
mongoConfig = rawMongoConfig as MongoConfig;
}
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
console.log("原始ClickHouse配置:", JSON.stringify(rawClickhouseConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawClickhouseConfig === "string") {
try {
clickhouseConfig = JSON.parse(rawClickhouseConfig);
} catch (e) {
console.error("ClickHouse配置解析失败:", e);
throw e;
}
} else {
clickhouseConfig = rawClickhouseConfig as ClickHouseConfig;
}
console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig));
console.log("ClickHouse配置解析为:", JSON.stringify(clickhouseConfig));
} catch (error) {
console.error("获取配置失败:", error);
throw error;
}
// 构建MongoDB连接URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 连接MongoDB
const client = new MongoClient();
try {
await client.connect(mongoUrl);
console.log("MongoDB连接成功");
const db = client.database(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace");
// 构建查询条件,获取所有记录
const query: Record<string, unknown> = {
type: 1 // 只同步type为1的记录
};
// 计算总记录数
const totalRecords = await traceCollection.countDocuments(query);
console.log(`找到 ${totalRecords} 条记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
console.log("没有记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
message: "没有记录需要同步"
};
}
// 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查不测试连接");
return true;
}
try {
logWithTimestamp("测试ClickHouse连接...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
},
body: "SELECT 1",
// 设置5秒超时
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse连接测试成功");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`);
return false;
}
};
// 检查记录是否已经存在于ClickHouse中
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
if (records.length === 0) return [];
// 如果跳过ClickHouse检查或强制插入则直接返回所有记录
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`已跳过ClickHouse重复检查准备处理所有 ${records.length} 条记录`);
return records;
}
logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`);
try {
// 提取所有记录的ID
const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询
logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`);
// 构建查询SQL检查记录是否已存在确保添加FORMAT JSON来获取正确的JSON格式响应
const query = `
SELECT link_id
FROM ${clickhouseConfig.clickhouse_database}.link_events
WHERE link_id IN ('${recordIds.join("','")}')
FORMAT JSON
`;
logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`);
// 发送请求到ClickHouse添加10秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`);
}
// 获取响应文本以便记录
const responseText = await response.text();
logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`);
if (!responseText.trim()) {
logWithTimestamp("ClickHouse返回空响应假定没有记录存在");
return records; // 如果响应为空,假设没有记录
}
// 解析结果
let result;
try {
result = JSON.parse(responseText);
} catch (err) {
logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`);
throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`);
}
// 确保result有正确的结构
if (!result.data) {
logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`);
return records; // 如果没有data字段假设没有记录
}
// 提取已存在的记录ID
const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id));
logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`);
if (existingIds.size > 0) {
logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`);
}
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse查询出错: ${error.message}`);
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查将继续处理所有记录");
return records;
} else {
throw error; // 如果没有启用跳过检查,则抛出错误
}
}
};
// 在处理记录前先检查ClickHouse连接
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
logWithTimestamp("⚠️ ClickHouse连接测试失败请启用skip_clickhouse_check=true参数来跳过连接检查");
throw new Error("ClickHouse连接失败无法继续同步");
}
// 处理记录的函数
const processRecords = async (records: TraceRecord[]) => {
if (records.length === 0) return 0;
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`);
// 检查记录是否已存在
let newRecords;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_clickhouse_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理");
return 0;
}
logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`);
// 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => {
// 转换MongoDB记录为ClickHouse格式匹配ClickHouse表结构
return {
// UUID将由ClickHouse自动生成 (event_id)
link_id: record.slugId.toString(),
channel_id: record.label || "",
visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID
session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID
event_type: record.type <= 4 ? record.type : 1, // 确保event_type在枚举范围内
ip_address: record.ip,
country: "", // 这些字段在MongoDB中不存在使用默认值
city: "",
referrer: record.url || "",
utm_source: "",
utm_medium: "",
utm_campaign: "",
user_agent: record.browser + " " + record.browserVersion,
device_type: record.platform || "unknown",
browser: record.browser || "",
os: record.platformOS || "",
time_spent_sec: 0,
is_bounce: true,
is_qr_scan: false,
qr_code_id: "",
conversion_type: 1, // 默认为'visit'
conversion_value: 0,
custom_data: `{"mongo_id":"${record._id.toString()}"}`
};
});
// 生成ClickHouse插入SQL
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events
(link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city,
referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os,
time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data)
VALUES ${clickhouseData.map(record => {
// 确保所有字符串值都是字符串类型,并安全处理替换
const safeReplace = (val: any): string => {
// 确保值是字符串如果是null或undefined则使用空字符串
const str = val === null || val === undefined ? "" : String(val);
// 安全替换单引号
return str.replace(/'/g, "''");
};
return `('${record.link_id}', '${safeReplace(record.channel_id)}', '${record.visitor_id}', '${record.session_id}',
${record.event_type}, '${safeReplace(record.ip_address)}', '', '',
'${safeReplace(record.referrer)}', '', '', '', '${safeReplace(record.user_agent)}', '${safeReplace(record.device_type)}',
'${safeReplace(record.browser)}', '${safeReplace(record.os)}',
0, true, false, '', 1, 0, '${safeReplace(record.custom_data)}')`;
}).join(", ")}
`;
if (insertSQL.length === 0) {
console.log("没有新记录需要插入");
return 0;
}
// 发送请求到ClickHouse添加20秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
try {
logWithTimestamp("发送插入请求到ClickHouse...");
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: insertSQL,
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`);
}
logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`);
return newRecords.length;
} catch (err) {
const error = err as Error;
logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`);
throw error;
}
};
// 批量处理记录
let processedRecords = 0;
let totalBatchRecords = 0;
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
break;
}
// 每批次都输出进度
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await traceCollection.find(
query,
{
allowDiskUse: true,
sort: { createTime: 1 },
skip: page * batch_size,
limit: batch_size
}
).toArray();
if (records.length === 0) {
logWithTimestamp("没有找到更多数据,同步结束");
break;
}
// 找到数据,开始处理
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`);
if (records.length > 1) {
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`);
}
}
const batchSize = await processRecords(records);
processedRecords += records.length;
totalBatchRecords += batchSize;
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
}
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
message: "数据同步完成"
};
} catch (err) {
console.error("同步过程中发生错误:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
// 关闭MongoDB连接
await client.close();
console.log("MongoDB连接已关闭");
}
}

View File

@@ -1,532 +0,0 @@
// 从MongoDB的short表同步数据到ClickHouse的links表
import { getVariable, setVariable } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
interface MongoConfig {
host: string;
port: string;
db: string;
username: string;
password: string;
}
interface ClickHouseConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_database: string;
clickhouse_url: string;
}
interface ShortRecord {
_id: ObjectId;
slug: string; // 短链接的slug部分
origin: string; // 原始URL
domain?: string; // 域名
createTime: number; // 创建时间戳
user?: string; // 创建用户
title?: string; // 标题
description?: string; // 描述
tags?: string[]; // 标签
active?: boolean; // 是否活跃
expiresAt?: number; // 过期时间戳
teamId?: string; // 团队ID
projectId?: string; // 项目ID
}
interface SyncState {
last_sync_time: number;
records_synced: number;
last_sync_id?: string;
}
export async function main(
batch_size = 100,
initial_sync = false,
max_records = 999999,
timeout_minutes = 30,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("开始执行MongoDB到ClickHouse的短链接同步任务...");
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
if (skip_clickhouse_check) {
logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式不会检查记录是否已存在");
}
if (force_insert) {
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
}
// 设置超时
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
// 检查是否超时
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`);
return true;
}
return false;
};
// 获取MongoDB和ClickHouse的连接信息
let mongoConfig: MongoConfig;
let clickhouseConfig: ClickHouseConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
console.log("原始MongoDB配置:", typeof rawMongoConfig === "string" ? rawMongoConfig : JSON.stringify(rawMongoConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawMongoConfig === "string") {
try {
mongoConfig = JSON.parse(rawMongoConfig);
} catch (e) {
console.error("MongoDB配置解析失败:", e);
throw e;
}
} else {
mongoConfig = rawMongoConfig as MongoConfig;
}
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
console.log("原始ClickHouse配置:", typeof rawClickhouseConfig === "string" ? rawClickhouseConfig : JSON.stringify(rawClickhouseConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawClickhouseConfig === "string") {
try {
clickhouseConfig = JSON.parse(rawClickhouseConfig);
} catch (e) {
console.error("ClickHouse配置解析失败:", e);
throw e;
}
} else {
clickhouseConfig = rawClickhouseConfig as ClickHouseConfig;
}
console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig));
console.log("ClickHouse配置解析为:", JSON.stringify(clickhouseConfig));
} catch (error) {
console.error("获取配置失败:", error);
throw error;
}
// 构建MongoDB连接URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 获取上次同步的状态
let syncState: SyncState;
try {
const rawSyncState = await getVariable<string>("f/shorturl_analytics/clickhouse/shorturl_links_sync_state");
try {
syncState = JSON.parse(rawSyncState);
console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`);
} catch (parseError) {
console.error("解析同步状态失败:", parseError);
throw parseError;
}
} catch (_unused_error) {
console.log("未找到同步状态,创建初始同步状态");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 如果强制从头开始同步
if (initial_sync) {
console.log("强制从头开始同步");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 连接MongoDB
const client = new MongoClient();
try {
await client.connect(mongoUrl);
console.log("MongoDB连接成功");
const db = client.database(mongoConfig.db);
const shortCollection = db.collection<ShortRecord>("short");
// 构建查询条件,只查询新的记录
const query: Record<string, unknown> = {};
if (syncState.last_sync_time > 0) {
query.createTime = { $gt: syncState.last_sync_time };
}
if (syncState.last_sync_id) {
// 如果有上次同步的ID则从该ID之后开始查询
query._id = { $gt: new ObjectId(syncState.last_sync_id) };
}
// 计算总记录数
const totalRecords = await shortCollection.countDocuments(query);
console.log(`找到 ${totalRecords} 条新短链接记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
console.log("没有新记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
total_synced: syncState.records_synced,
message: "没有新记录需要同步"
};
}
// 分批处理记录
let processedRecords = 0;
let lastId: string | undefined;
let lastCreateTime = syncState.last_sync_time;
let totalBatchRecords = 0;
// 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查不测试连接");
return true;
}
try {
logWithTimestamp("测试ClickHouse连接...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
},
body: "SELECT 1 FORMAT JSON",
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse连接测试成功");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`);
return false;
}
};
// 检查记录是否已经存在于ClickHouse中
const checkExistingRecords = async (records: ShortRecord[]): Promise<ShortRecord[]> => {
if (records.length === 0) return [];
// 如果跳过ClickHouse检查或强制插入则直接返回所有记录
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`已跳过ClickHouse重复检查准备处理所有 ${records.length} 条记录`);
return records;
}
logWithTimestamp(`正在检查 ${records.length} 条短链接记录是否已存在于ClickHouse中...`);
try {
// 提取所有记录的ID
const recordIds = records.map(record => record._id.toString());
logWithTimestamp(`待检查的短链接ID: ${recordIds.join(', ')}`);
// 构建查询SQL检查记录是否已存在
const query = `
SELECT link_id
FROM ${clickhouseConfig.clickhouse_database}.links
WHERE link_id IN ('${recordIds.join("','")}')
FORMAT JSON
`;
logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`);
// 发送请求到ClickHouse
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`);
}
// 获取响应文本
const responseText = await response.text();
logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`);
if (!responseText.trim()) {
logWithTimestamp("ClickHouse返回空响应假定没有记录存在");
return records;
}
// 解析结果
let result;
try {
result = JSON.parse(responseText);
} catch (err) {
logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`);
throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`);
}
// 确保result有正确的结构
if (!result.data) {
logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`);
return records;
}
// 提取已存在的记录ID
const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id));
logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`);
if (existingIds.size > 0) {
logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`);
}
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingIds.has(record._id.toString()));
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse查询出错: ${error.message}`);
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查将继续处理所有记录");
return records;
} else {
throw error;
}
}
};
// 在处理记录前先检查ClickHouse连接
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
logWithTimestamp("⚠️ ClickHouse连接测试失败请启用skip_clickhouse_check=true参数来跳过连接检查");
throw new Error("ClickHouse连接失败无法继续同步");
}
// 处理记录的函数
const processRecords = async (records: ShortRecord[]) => {
if (records.length === 0) return 0;
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条短链接记录...`);
// 检查记录是否已存在
let newRecords;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_clickhouse_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理");
// 更新同步状态,即使没有新增记录
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
return 0;
}
logWithTimestamp(`准备处理 ${newRecords.length} 条新短链接记录...`);
// 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => {
// 转换MongoDB记录为ClickHouse格式匹配ClickHouse表结构
// 处理日期时间移除ISO格式中的Z以使ClickHouse正确解析
const createdAtStr = new Date(record.createTime).toISOString().replace('Z', '');
const expiresAtStr = record.expiresAt ? new Date(record.expiresAt).toISOString().replace('Z', '') : null;
return {
link_id: record._id.toString(), // 使用MongoDB的_id作为link_id
original_url: record.origin || "",
created_at: createdAtStr,
created_by: record.user || "unknown",
title: record.slug, // 使用slug作为title
description: record.description || "",
tags: record.tags || [],
is_active: record.active !== undefined ? record.active : true,
expires_at: expiresAtStr,
team_id: record.teamId || "",
project_id: record.projectId || ""
};
});
// 更新同步状态使用原始records的最后一条以确保进度正确
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`);
// 生成ClickHouse插入SQL
// 注意Array类型需要特殊处理这里将tags作为JSON字符串处理
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.links
(link_id, original_url, created_at, created_by, title, description, tags, is_active, expires_at, team_id, project_id)
VALUES
${clickhouseData.map(record => {
// 处理tags数组
const tagsStr = JSON.stringify(record.tags || []);
// 处理expires_at可能为null的情况
const expiresAt = record.expires_at ? `'${record.expires_at}'` : "NULL";
// 确保所有字段在使用replace前都有默认值
const safeOriginalUrl = (record.original_url || "").replace(/'/g, "''");
const safeCreatedBy = (record.created_by || "unknown").replace(/'/g, "''");
const safeTitle = (record.title || "无标题").replace(/'/g, "''");
const safeDescription = (record.description || "").replace(/'/g, "''");
const safeTeamId = record.team_id || "";
const safeProjectId = record.project_id || "";
return `('${record.link_id}', '${safeOriginalUrl}', '${record.created_at}', '${safeCreatedBy}', '${safeTitle}', '${safeDescription}', ${tagsStr}, ${record.is_active}, ${expiresAt}, '${safeTeamId}', '${safeProjectId}')`;
}).join(", ")}
`;
if (clickhouseData.length === 0) {
console.log("没有新记录需要插入");
return 0;
}
// 发送请求到ClickHouse
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
try {
logWithTimestamp("发送插入请求到ClickHouse...");
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: insertSQL,
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`);
}
logWithTimestamp(`成功插入 ${newRecords.length} 条短链接记录到ClickHouse`);
return newRecords.length;
} catch (err) {
const error = err as Error;
logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`);
throw error;
}
};
// 批量处理记录
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
break;
}
// 每批次都输出进度
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await shortCollection.find(query)
.sort({ createTime: 1, _id: 1 })
.skip(page * batch_size)
.limit(batch_size)
.toArray();
if (records.length === 0) {
logWithTimestamp(`${page+1} 批次没有找到数据,结束处理`);
break;
}
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, Slug=${records[0].slug}, 时间=${new Date(records[0].createTime).toISOString()}`);
if (records.length > 1) {
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, Slug=${records[records.length-1].slug}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`);
}
}
const batchSize = await processRecords(records);
processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在
totalBatchRecords += batchSize; // 只增加实际插入的记录数
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
// 更新查询条件,以便下一批次查询
query.createTime = { $gt: lastCreateTime };
if (lastId) {
query._id = { $gt: new ObjectId(lastId) };
}
logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`);
}
// 更新同步状态
const newSyncState: SyncState = {
last_sync_time: lastCreateTime,
records_synced: syncState.records_synced + totalBatchRecords,
last_sync_id: lastId
};
await setVariable("f/shorturl_analytics/clickhouse/shorturl_links_sync_state", JSON.stringify(newSyncState));
console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`);
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
total_synced: newSyncState.records_synced,
last_sync_time: new Date(newSyncState.last_sync_time).toISOString(),
message: "短链接数据同步完成"
};
} catch (err) {
console.error("同步过程中发生错误:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
// 关闭MongoDB连接
await client.close();
console.log("MongoDB连接已关闭");
}
}