sync & read me

This commit is contained in:
2025-04-09 19:20:40 +08:00
parent ede83068af
commit 8b407975e5
4 changed files with 1111 additions and 520 deletions

View File

@@ -0,0 +1,175 @@
# 事件跟踪接口说明
## 概述
该接口用于跟踪用户交互事件并将数据存储到 ClickHouse 数据库中。支持记录各种类型的事件,并可包含与链接、用户、团队、项目等相关的详细信息。
## 接口信息
- **URL**: `/api/events/track`
- **方法**: `POST`
- **Content-Type**: `application/json`
## 请求参数
### 必填字段
| 参数 | 类型 | 描述 |
|------|------|------|
| `event_type` | string | 事件类型,如 'click', 'view', 'conversion' |
### 核心事件字段
| 参数 | 类型 | 必填 | 描述 |
|------|------|------|------|
| `event_id` | string | 否 | 事件唯一标识符不提供时自动生成UUID |
| `event_time` | string/Date | 否 | 事件发生时间格式为ISO日期字符串默认为当前时间 |
| `event_attributes` | object/string | 否 | 事件相关的其他属性可以是JSON对象或JSON字符串 |
### 链接信息
| 参数 | 类型 | 必填 | 描述 |
|------|------|------|------|
| `link_id` | string | 否 | 短链接的唯一ID |
| `link_slug` | string | 否 | 短链接的slug部分 |
| `link_label` | string | 否 | 短链接的显示名称 |
| `link_title` | string | 否 | 短链接的标题 |
| `link_original_url` | string | 否 | 原始目标URL |
| `link_attributes` | object/string | 否 | 链接相关的额外属性 |
| `link_created_at` | string/Date | 否 | 链接创建时间 |
| `link_expires_at` | string/Date | 否 | 链接过期时间 |
| `link_tags` | array/string | 否 | 链接标签可以是数组或JSON字符串 |
### 用户信息
| 参数 | 类型 | 必填 | 描述 |
|------|------|------|------|
| `user_id` | string | 否 | 用户ID |
| `user_name` | string | 否 | 用户名称 |
| `user_email` | string | 否 | 用户邮箱 |
| `user_attributes` | object/string | 否 | 用户相关的其他属性 |
### 团队和项目信息
| 参数 | 类型 | 必填 | 描述 |
|------|------|------|------|
| `team_id` | string | 否 | 团队ID |
| `team_name` | string | 否 | 团队名称 |
| `team_attributes` | object/string | 否 | 团队相关的其他属性 |
| `project_id` | string | 否 | 项目ID |
| `project_name` | string | 否 | 项目名称 |
| `project_attributes` | object/string | 否 | 项目相关的其他属性 |
### 二维码信息
| 参数 | 类型 | 必填 | 描述 |
|------|------|------|------|
| `qr_code_id` | string | 否 | 二维码ID |
| `qr_code_name` | string | 否 | 二维码名称 |
| `qr_code_attributes` | object/string | 否 | 二维码相关的其他属性 |
### 访问者信息
| 参数 | 类型 | 必填 | 描述 |
|------|------|------|------|
| `visitor_id` | string | 否 | 访问者唯一标识符,不提供时自动生成 |
| `session_id` | string | 否 | 会话ID不提供时自动生成 |
| `ip_address` | string | 否 | 访问者IP地址默认从请求头获取 |
| `country` | string | 否 | 访问者所在国家 |
| `city` | string | 否 | 访问者所在城市 |
| `device_type` | string | 否 | 设备类型 (如 desktop, mobile, tablet) |
| `browser` | string | 否 | 浏览器名称 |
| `os` | string | 否 | 操作系统 |
| `user_agent` | string | 否 | 用户代理字符串,默认从请求头获取 |
### 引荐来源信息
| 参数 | 类型 | 必填 | 描述 |
|------|------|------|------|
| `referrer` | string | 否 | 引荐URL默认从请求头获取 |
| `utm_source` | string | 否 | UTM来源参数 |
| `utm_medium` | string | 否 | UTM媒介参数 |
| `utm_campaign` | string | 否 | UTM活动参数 |
| `utm_term` | string | 否 | UTM术语参数 |
| `utm_content` | string | 否 | UTM内容参数 |
### 交互信息
| 参数 | 类型 | 必填 | 描述 |
|------|------|------|------|
| `time_spent_sec` | number | 否 | 用户在页面上停留的时间默认0 |
| `is_bounce` | boolean | 否 | 是否是跳出只访问一个页面默认true |
| `is_qr_scan` | boolean | 否 | 是否来自二维码扫描默认false |
| `conversion_type` | string | 否 | 转化类型 |
| `conversion_value` | number | 否 | 转化价值默认0 |
## 响应格式
### 成功响应 (201 Created)
```json
{
"success": true,
"message": "Event tracked successfully",
"event_id": "uuid-of-tracked-event"
}
```
### 错误响应
#### 缺少必填字段 (400 Bad Request)
```json
{
"error": "Missing required field: event_type"
}
```
#### 服务器错误 (500 Internal Server Error)
```json
{
"error": "Failed to track event",
"details": "具体错误信息"
}
```
## 使用示例
### 基本事件跟踪请求
```javascript
fetch('/api/events/track', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
event_type: 'click',
link_id: 'abc123',
link_slug: 'promo-summer',
link_original_url: 'https://example.com/summer-promotion'
})
})
```
### 详细事件跟踪请求
```javascript
fetch('/api/events/track', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
event_type: 'conversion',
link_id: 'abc123',
link_slug: 'promo-summer',
link_original_url: 'https://example.com/summer-promotion',
event_attributes: {
page: '/checkout',
product_id: 'xyz789'
},
user_id: 'user123',
team_id: 'team456',
project_id: 'proj789',
visitor_id: 'vis987',
is_bounce: false,
time_spent_sec: 120,
conversion_type: 'purchase',
conversion_value: 99.99,
utm_source: 'email',
utm_campaign: 'summer_sale'
})
})
```
## 注意事项
- 所有对象类型的字段(如 `event_attributes`可以作为对象或预先格式化的JSON字符串传递
- 如果不提供 `event_id``visitor_id``session_id`,系统将自动生成
- 时间戳字段接受ISO格式的日期字符串并会被转换为ClickHouse兼容的格式

View File

@@ -1,364 +0,0 @@
// Sync data from MongoDB trace table to ClickHouse events table
import { getVariable } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
interface MongoConfig {
host: string;
port: string;
db: string;
username: string;
password: string;
}
interface ClickHouseConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_database: string;
clickhouse_url: string;
}
interface TraceRecord {
_id: ObjectId;
slugId: ObjectId;
label: string | null;
ip: string;
type: number;
platform: string;
platformOS: string;
browser: string;
browserVersion: string;
url: string;
createTime: number;
}
export async function main(
batch_size = 1000,
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("Starting sync from MongoDB to ClickHouse events table");
logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`);
// Set timeout
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`);
return true;
}
return false;
};
// Get MongoDB and ClickHouse connection info
let mongoConfig: MongoConfig;
let clickhouseConfig: ClickHouseConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig;
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig;
} catch (error) {
console.error("Failed to get config:", error);
throw error;
}
// Build MongoDB connection URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
// Connect to MongoDB
const client = new MongoClient();
try {
await client.connect(mongoUrl);
console.log("MongoDB connected successfully");
const db = client.database(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace");
// Build query conditions
const query: Record<string, unknown> = {
type: 1 // Only sync records with type 1
};
// Count total records
const totalRecords = await traceCollection.countDocuments(query);
console.log(`Found ${totalRecords} records to sync`);
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`Will process ${recordsToProcess} records`);
if (totalRecords === 0) {
console.log("No records to sync, task completed");
return {
success: true,
records_synced: 0,
message: "No records to sync"
};
}
// Check ClickHouse connection
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("Skipping ClickHouse connection check");
return true;
}
try {
logWithTimestamp("Testing ClickHouse connection...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
},
body: "SELECT 1",
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse connection test successful");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`);
return false;
}
};
// Check if records exist in ClickHouse
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
if (records.length === 0) return [];
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`);
return records;
}
try {
const recordIds = records.map(record => record._id.toString());
const query = `
SELECT event_id
FROM ${clickhouseConfig.clickhouse_database}.events
WHERE event_attributes LIKE '%"mongo_id":"%'
AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%')
FORMAT JSON
`;
const response = await fetch(clickhouseConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse query error: ${response.status} ${errorText}`);
}
const result = await response.json();
const existingIds = new Set(result.data.map((row: any) => {
const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/);
return matches ? matches[1] : null;
}).filter(Boolean));
return records.filter(record => !existingIds.has(record._id.toString()));
} catch (err) {
logWithTimestamp(`Error checking existing records: ${(err as Error).message}`);
return skip_clickhouse_check ? records : [];
}
};
// Process records function
const processRecords = async (records: TraceRecord[]) => {
if (records.length === 0) return 0;
const newRecords = await checkExistingRecords(records);
if (newRecords.length === 0) {
logWithTimestamp("All records already exist, skipping");
return 0;
}
// Prepare ClickHouse insert data
const clickhouseData = newRecords.map(record => {
const eventTime = new Date(record.createTime).toISOString();
return {
event_time: eventTime,
event_type: "click",
event_attributes: JSON.stringify({
mongo_id: record._id.toString(),
original_type: record.type
}),
// Link information
link_id: record.slugId.toString(),
link_slug: "",
link_label: record.label || "",
link_title: "",
link_original_url: record.url || "",
link_attributes: "{}",
link_created_at: eventTime,
link_expires_at: null,
link_tags: "[]",
// User information (empty as not available in trace)
user_id: "",
user_name: "",
user_email: "",
user_attributes: "{}",
// Team information (empty as not available in trace)
team_id: "",
team_name: "",
team_attributes: "{}",
// Project information (empty as not available in trace)
project_id: "",
project_name: "",
project_attributes: "{}",
// QR code information (empty as not available in trace)
qr_code_id: "",
qr_code_name: "",
qr_code_attributes: "{}",
// Visitor information
visitor_id: record._id.toString(),
session_id: `${record._id.toString()}-${record.createTime}`,
ip_address: record.ip || "",
country: "",
city: "",
device_type: record.platform || "unknown",
browser: record.browser || "",
os: record.platformOS || "",
user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(),
// Source information
referrer: record.url || "",
utm_source: "",
utm_medium: "",
utm_campaign: "",
// Interaction information
time_spent_sec: 0,
is_bounce: true,
is_qr_scan: false,
conversion_type: "visit",
conversion_value: 0
};
});
// Generate ClickHouse insert SQL
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.events
FORMAT JSONEachRow
${JSON.stringify(clickhouseData)}
`;
try {
const response = await fetch(clickhouseConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: insertSQL,
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`);
}
logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`);
return newRecords.length;
} catch (err) {
logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`);
throw err;
}
};
// Check ClickHouse connection before processing
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
throw new Error("ClickHouse connection failed, cannot continue sync");
}
// Process records in batches
let processedRecords = 0;
let totalBatchRecords = 0;
for (let page = 0; processedRecords < recordsToProcess; page++) {
if (checkTimeout()) {
logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`);
break;
}
logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`);
const records = await traceCollection.find(
query,
{
allowDiskUse: true,
sort: { createTime: 1 },
skip: page * batch_size,
limit: batch_size
}
).toArray();
if (records.length === 0) {
logWithTimestamp("No more records found, sync complete");
break;
}
const batchSize = await processRecords(records);
processedRecords += records.length;
totalBatchRecords += batchSize;
logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`);
}
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
message: "Data sync completed"
};
} catch (err) {
console.error("Error during sync:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
await client.close();
console.log("MongoDB connection closed");
}
}

View File

@@ -1,5 +1,5 @@
// Sync data from MongoDB trace table to ClickHouse events table
import { getVariable } from "npm:windmill-client@1";
// MongoDBtrace表同步数据到ClickHouseevents
import { getVariable, setVariable } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
interface MongoConfig {
@@ -15,6 +15,7 @@ interface ClickHouseConfig {
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_database: string;
clickhouse_url: string;
}
@@ -32,25 +33,10 @@ interface TraceRecord {
createTime: number;
}
interface ShortRecord {
_id: ObjectId;
slug: string; // 短链接的slug部分
origin: string; // 原始URL
domain?: string; // 域名
createTime: number; // 创建时间戳
user?: string; // 创建用户
title?: string; // 标题
description?: string; // 描述
tags?: string[]; // 标签
active?: boolean; // 是否活跃
expiresAt?: number; // 过期时间戳
teamId?: string; // 团队ID
projectId?: string; // 项目ID
}
interface ClickHouseRow {
event_id: string;
event_attributes: string;
interface SyncState {
last_sync_time: number;
records_synced: number;
last_sync_id?: string;
}
export async function main(
@@ -58,90 +44,138 @@ export async function main(
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false
force_insert = false,
database_override = "shorturl_analytics" // 添加数据库名称参数默认为shorturl_analytics
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("Starting sync from MongoDB to ClickHouse events table");
logWithTimestamp(`Batch size: ${batch_size}, Max records: ${max_records}, Timeout: ${timeout_minutes} minutes`);
logWithTimestamp("开始执行MongoDBClickHouse的同步任务");
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
if (skip_clickhouse_check) {
logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式不会检查记录是否已存在");
}
if (force_insert) {
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
}
// Set timeout
// 设置超时
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
// 检查是否超时
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`Execution time exceeded ${timeout_minutes} minutes, stopping`);
console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`);
return true;
}
return false;
};
// Get MongoDB and ClickHouse connection info
// 获取MongoDBClickHouse的连接信息
let mongoConfig: MongoConfig;
let clickhouseConfig: ClickHouseConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
mongoConfig = typeof rawMongoConfig === "string" ? JSON.parse(rawMongoConfig) : rawMongoConfig;
console.log("原始MongoDB配置:", JSON.stringify(rawMongoConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawMongoConfig === "string") {
try {
mongoConfig = JSON.parse(rawMongoConfig);
} catch (e) {
console.error("MongoDB配置解析失败:", e);
throw e;
}
} else {
mongoConfig = rawMongoConfig as MongoConfig;
}
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
clickhouseConfig = typeof rawClickhouseConfig === "string" ? JSON.parse(rawClickhouseConfig) : rawClickhouseConfig;
console.log("原始ClickHouse配置:", JSON.stringify(rawClickhouseConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawClickhouseConfig === "string") {
try {
clickhouseConfig = JSON.parse(rawClickhouseConfig);
} catch (e) {
console.error("ClickHouse配置解析失败:", e);
throw e;
}
} else {
clickhouseConfig = rawClickhouseConfig as ClickHouseConfig;
}
// 检查并修复数据库配置
if (!clickhouseConfig.clickhouse_database || clickhouseConfig.clickhouse_database === "undefined") {
logWithTimestamp(`⚠️ 警告: 数据库名称未定义或为'undefined',使用提供的默认值: ${database_override}`);
clickhouseConfig.clickhouse_database = database_override;
}
console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig));
console.log("ClickHouse配置解析为:", JSON.stringify({
...clickhouseConfig,
clickhouse_password: "****" // 隐藏密码
}));
logWithTimestamp(`将使用ClickHouse数据库: ${clickhouseConfig.clickhouse_database}`);
} catch (error) {
console.error("Failed to get config:", error);
console.error("获取配置失败:", error);
throw error;
}
// Build MongoDB connection URL
// 构建MongoDB连接URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
// Connect to MongoDB
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 连接MongoDB
const client = new MongoClient();
try {
await client.connect(mongoUrl);
console.log("MongoDB connected successfully");
console.log("MongoDB连接成功");
const db = client.database(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace");
const shortCollection = db.collection<ShortRecord>("short");
// Build query conditions
// 构建查询条件,获取所有记录
const query: Record<string, unknown> = {
type: 1 // Only sync records with type 1
type: 1 // 只同步type为1的记录
};
// Count total records
// 计算总记录数
const totalRecords = await traceCollection.countDocuments(query);
console.log(`Found ${totalRecords} records to sync`);
console.log(`找到 ${totalRecords} 条记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`Will process ${recordsToProcess} records`);
console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
console.log("No records to sync, task completed");
console.log("没有记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
message: "No records to sync"
message: "没有记录需要同步"
};
}
// Check ClickHouse connection
// 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("Skipping ClickHouse connection check");
logWithTimestamp("已启用跳过ClickHouse检查,不测试连接");
return true;
}
try {
logWithTimestamp("Testing ClickHouse connection...");
logWithTimestamp("测试ClickHouse连接...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
@@ -149,45 +183,61 @@ export async function main(
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
},
body: "SELECT 1",
body: `SELECT 1 FROM ${clickhouseConfig.clickhouse_database}.events LIMIT 1`,
// 设置5秒超时
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse connection test successful");
logWithTimestamp("ClickHouse连接测试成功");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse connection test failed: ${response.status} ${errorText}`);
logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
logWithTimestamp(`ClickHouse connection test failed: ${(err as Error).message}`);
const error = err as Error;
logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`);
return false;
}
};
// Check if records exist in ClickHouse
// 检查记录是否已经存在于ClickHouse
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
if (records.length === 0) return [];
// 如果跳过ClickHouse检查或强制插入则直接返回所有记录
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`Skipping ClickHouse duplicate check, will process all ${records.length} records`);
logWithTimestamp(`已跳过ClickHouse重复检查,准备处理所有 ${records.length} 条记录`);
return records;
}
try {
const recordIds = records.map(record => record._id.toString());
logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`);
try {
// 验证数据库名称
if (!clickhouseConfig.clickhouse_database || clickhouseConfig.clickhouse_database === "undefined") {
throw new Error("数据库名称未定义或无效,请检查配置");
}
// 提取所有记录的ID
const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询
logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`);
// 构建查询SQL检查记录是否已存在确保添加FORMAT JSON来获取正确的JSON格式响应
const query = `
SELECT event_id
FROM shorturl_analytics.events
WHERE event_attributes LIKE '%"mongo_id":"%'
AND event_attributes LIKE ANY ('%${recordIds.join("%' OR '%")}%')
SELECT link_id, visitor_id
FROM ${clickhouseConfig.clickhouse_database}.events
WHERE link_id IN ('${recordIds.join("','")}')
FORMAT JSON
`;
const response = await fetch(clickhouseConfig.clickhouse_url, {
logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`);
// 发送请求到ClickHouse添加10秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
@@ -199,134 +249,195 @@ export async function main(
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse query error: ${response.status} ${errorText}`);
throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`);
}
const result = await response.json();
const existingIds = new Set(result.data.map((row: ClickHouseRow) => {
const matches = row.event_attributes.match(/"mongo_id":"([^"]+)"/);
return matches ? matches[1] : null;
}).filter(Boolean));
// 获取响应文本以便记录
const responseText = await response.text();
logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`);
return records.filter(record => !existingIds.has(record._id.toString()));
if (!responseText.trim()) {
logWithTimestamp("ClickHouse返回空响应假定没有记录存在");
return records; // 如果响应为空,假设没有记录
}
// 解析结果
let result;
try {
result = JSON.parse(responseText);
} catch (err) {
logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`);
throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`);
}
// 确保result有正确的结构
if (!result.data) {
logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`);
return records; // 如果没有data字段假设没有记录
}
// 提取已存在的记录ID
const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id));
logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`);
if (existingIds.size > 0) {
logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`);
}
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
logWithTimestamp(`Error checking existing records: ${(err as Error).message}`);
return skip_clickhouse_check ? records : [];
const error = err as Error;
logWithTimestamp(`ClickHouse查询出错: ${error.message}`);
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查将继续处理所有记录");
return records;
} else {
throw error; // 如果没有启用跳过检查,则抛出错误
}
}
};
// Process records function
// 在处理记录前先检查ClickHouse连接
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
logWithTimestamp("⚠️ ClickHouse连接测试失败请启用skip_clickhouse_check=true参数来跳过连接检查");
throw new Error("ClickHouse连接失败无法继续同步");
}
// 处理记录的函数
const processRecords = async (records: TraceRecord[]) => {
if (records.length === 0) return 0;
const newRecords = await checkExistingRecords(records);
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`);
// 检查记录是否已存在
let newRecords;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_clickhouse_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
if (newRecords.length === 0) {
logWithTimestamp("All records already exist, skipping");
logWithTimestamp("所有记录都已存在,跳过处理");
return 0;
}
// Get link information for all records
const slugIds = newRecords.map(record => record.slugId);
const shortLinks = await shortCollection.find({
_id: { $in: slugIds }
}).toArray();
logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`);
// Create a map for quick lookup
const shortLinksMap = new Map(shortLinks.map(link => [link._id.toString(), link]));
// Prepare ClickHouse insert data
// 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => {
const shortLink = shortLinksMap.get(record.slugId.toString());
// 将毫秒时间戳转换为 DateTime64(3) 格式
const formatDateTime = (timestamp: number) => {
return new Date(timestamp).toISOString().replace('T', ' ').replace('Z', '');
};
const eventTime = new Date(record.createTime);
// 转换MongoDB记录为ClickHouse格式匹配ClickHouse表结构
return {
// Event base information
event_id: record._id.toString(),
event_time: formatDateTime(record.createTime),
event_type: "click",
event_attributes: JSON.stringify({
original_type: record.type
}),
// Link information from short collection
// UUID将由ClickHouse自动生成 (event_id)
event_time: eventTime.toISOString().replace('T', ' ').replace('Z', ''),
event_type: record.type === 1 ? "visit" : "custom",
event_attributes: `{"mongo_id":"${record._id.toString()}"}`,
link_id: record.slugId.toString(),
link_slug: shortLink?.slug || "",
link_slug: "", // 这些字段可能需要从其他表获取
link_label: record.label || "",
link_title: "",
link_original_url: shortLink?.origin || "",
link_attributes: JSON.stringify({
domain: shortLink?.domain || null
}),
link_created_at: shortLink?.createTime ? formatDateTime(shortLink.createTime) : formatDateTime(record.createTime),
link_expires_at: shortLink?.expiresAt ? formatDateTime(shortLink.expiresAt) : null,
link_tags: "[]", // Empty array as default
// User information
user_id: shortLink?.user || "",
link_original_url: "",
link_attributes: "{}",
link_created_at: eventTime.toISOString().replace('T', ' ').replace('Z', ''), // 暂用访问时间代替,可能需要从其他表获取
link_expires_at: null,
link_tags: "[]",
user_id: "",
user_name: "",
user_email: "",
user_attributes: "{}",
// Team information
team_id: shortLink?.teamId || "",
team_id: "",
team_name: "",
team_attributes: "{}",
// Project information
project_id: shortLink?.projectId || "",
project_id: "",
project_name: "",
project_attributes: "{}",
// QR code information
qr_code_id: "",
qr_code_name: "",
qr_code_attributes: "{}",
// Visitor information
visitor_id: "", // Empty string as default
session_id: `${record.slugId.toString()}-${record.createTime}`,
ip_address: record.ip || "",
country: "",
visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID
session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID
ip_address: record.ip,
country: "", // 这些字段在MongoDB中不存在使用默认值
city: "",
device_type: record.platform || "",
device_type: record.platform || "unknown",
browser: record.browser || "",
os: record.platformOS || "",
user_agent: `${record.browser || ""} ${record.browserVersion || ""}`.trim(),
// Source information
user_agent: record.browser + " " + record.browserVersion,
referrer: record.url || "",
utm_source: "",
utm_medium: "",
utm_campaign: "",
// Interaction information
utm_term: "",
utm_content: "",
time_spent_sec: 0,
is_bounce: true,
is_qr_scan: false,
conversion_type: "visit",
conversion_value: 0
conversion_value: 0,
req_full_path: record.url || ""
};
});
// Generate ClickHouse insert SQL
const rows = clickhouseData.map(row => {
// 只需要处理JSON字符串的转义
const formattedRow = {
...row,
event_attributes: row.event_attributes.replace(/\\/g, '\\\\'),
link_attributes: row.link_attributes.replace(/\\/g, '\\\\')
};
return JSON.stringify(formattedRow);
}).join('\n');
// 生成ClickHouse插入SQL
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.events
(event_time, event_type, event_attributes, link_id, link_slug, link_label, link_title,
link_original_url, link_attributes, link_created_at, link_expires_at, link_tags,
user_id, user_name, user_email, user_attributes, team_id, team_name, team_attributes,
project_id, project_name, project_attributes, qr_code_id, qr_code_name, qr_code_attributes,
visitor_id, session_id, ip_address, country, city, device_type, browser, os, user_agent,
referrer, utm_source, utm_medium, utm_campaign, utm_term, utm_content, time_spent_sec,
is_bounce, is_qr_scan, conversion_type, conversion_value, req_full_path)
VALUES ${clickhouseData.map(record => {
// 确保所有字符串值都是字符串类型,并安全处理替换
const safeReplace = (val: unknown): string => {
// 确保值是字符串如果是null或undefined则使用空字符串
const str = val === null || val === undefined ? "" : String(val);
// 安全替换单引号
return str.replace(/'/g, "''");
};
const insertSQL = `INSERT INTO shorturl_analytics.events FORMAT JSONEachRow\n${rows}`;
return `('${record.event_time}', '${safeReplace(record.event_type)}', '${safeReplace(record.event_attributes)}',
'${record.link_id}', '${safeReplace(record.link_slug)}', '${safeReplace(record.link_label)}', '${safeReplace(record.link_title)}',
'${safeReplace(record.link_original_url)}', '${safeReplace(record.link_attributes)}', '${record.link_created_at}',
${record.link_expires_at === null ? 'NULL' : `'${record.link_expires_at}'`}, '${safeReplace(record.link_tags)}',
'${safeReplace(record.user_id)}', '${safeReplace(record.user_name)}', '${safeReplace(record.user_email)}',
'${safeReplace(record.user_attributes)}', '${safeReplace(record.team_id)}', '${safeReplace(record.team_name)}',
'${safeReplace(record.team_attributes)}', '${safeReplace(record.project_id)}', '${safeReplace(record.project_name)}',
'${safeReplace(record.project_attributes)}', '${safeReplace(record.qr_code_id)}', '${safeReplace(record.qr_code_name)}',
'${safeReplace(record.qr_code_attributes)}', '${safeReplace(record.visitor_id)}', '${safeReplace(record.session_id)}',
'${safeReplace(record.ip_address)}', '${safeReplace(record.country)}', '${safeReplace(record.city)}',
'${safeReplace(record.device_type)}', '${safeReplace(record.browser)}', '${safeReplace(record.os)}',
'${safeReplace(record.user_agent)}', '${safeReplace(record.referrer)}', '${safeReplace(record.utm_source)}',
'${safeReplace(record.utm_medium)}', '${safeReplace(record.utm_campaign)}', '${safeReplace(record.utm_term)}',
'${safeReplace(record.utm_content)}', ${record.time_spent_sec}, ${record.is_bounce}, ${record.is_qr_scan},
'${safeReplace(record.conversion_type)}', ${record.conversion_value}, '${safeReplace(record.req_full_path)}')`;
}).join(", ")}
`;
if (insertSQL.length === 0) {
console.log("没有新记录需要插入");
return 0;
}
// 发送请求到ClickHouse添加20秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
try {
const response = await fetch(clickhouseConfig.clickhouse_url, {
logWithTimestamp("发送插入请求到ClickHouse...");
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
@@ -338,35 +449,33 @@ export async function main(
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse insert error: ${response.status} ${errorText}`);
throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`);
}
logWithTimestamp(`Successfully inserted ${newRecords.length} records to ClickHouse`);
logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`);
return newRecords.length;
} catch (err) {
logWithTimestamp(`Failed to insert data to ClickHouse: ${(err as Error).message}`);
throw err;
const error = err as Error;
logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`);
throw error;
}
};
// Check ClickHouse connection before processing
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
throw new Error("ClickHouse connection failed, cannot continue sync");
}
// Process records in batches
// 批量处理记录
let processedRecords = 0;
let totalBatchRecords = 0;
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
logWithTimestamp(`Processed ${processedRecords}/${recordsToProcess} records, stopping due to timeout`);
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
break;
}
logWithTimestamp(`Processing batch ${page+1}, completed ${processedRecords}/${recordsToProcess} records (${Math.round(processedRecords/recordsToProcess*100)}%)`);
// 每批次都输出进度
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await traceCollection.find(
query,
{
@@ -378,32 +487,43 @@ export async function main(
).toArray();
if (records.length === 0) {
logWithTimestamp("No more records found, sync complete");
logWithTimestamp("没有找到更多数据,同步结束");
break;
}
// 找到数据,开始处理
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`);
if (records.length > 1) {
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`);
}
}
const batchSize = await processRecords(records);
processedRecords += records.length;
totalBatchRecords += batchSize;
logWithTimestamp(`Batch ${page+1} complete. Processed ${processedRecords}/${recordsToProcess} records, inserted ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(` ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} (${Math.round(processedRecords/recordsToProcess*100)}%)`);
}
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
message: "Data sync completed"
message: "数据同步完成"
};
} catch (err) {
console.error("Error during sync:", err);
console.error("同步过程中发生错误:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
// 关闭MongoDB连接
await client.close();
console.log("MongoDB connection closed");
console.log("MongoDB连接已关闭");
}
}

View File

@@ -0,0 +1,660 @@
// 文件名: sync_resource_relations.ts
// 描述: 此脚本用于同步PostgreSQL中资源关联数据到ClickHouse
// 作者: AI Assistant
// 创建日期: 2023-10-31
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
// PostgreSQL配置接口
interface PgConfig {
host: string;
port: number;
user: string;
password: string;
dbname?: string;
[key: string]: unknown;
}
// ClickHouse配置接口
interface ChConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_url?: string;
}
// 资源相关接口定义
interface TeamData {
team_id: string;
team_name: string;
team_description?: string;
project_id?: string;
}
interface ProjectData {
project_id: string;
project_name: string;
project_description?: string;
assigned_at?: string;
resource_id?: string;
}
interface TagData {
tag_id: string;
tag_name: string;
tag_type?: string;
created_at?: string;
resource_id?: string;
}
interface FavoriteData {
favorite_id: string;
user_id: string;
first_name?: string;
last_name?: string;
email?: string;
created_at?: string;
}
// 资源关联数据接口
interface ResourceRelations {
resource_id: string;
teams?: TeamData[];
projects?: ProjectData[];
tags?: TagData[];
favorites?: FavoriteData[];
external_id?: string;
type?: string;
attributes?: Record<string, unknown>;
}
/**
* 同步PostgreSQL资源关联数据到ClickHouse
*/
export async function main(
params: {
/** 要同步的资源ID列表 */
resource_ids: string[];
/** 是否同步teams数据 */
sync_teams?: boolean;
/** 是否同步projects数据 */
sync_projects?: boolean;
/** 是否同步tags数据 */
sync_tags?: boolean;
/** 是否同步favorites数据 */
sync_favorites?: boolean;
/** 是否为测试模式(不执行实际更新) */
dry_run?: boolean;
/** 是否显示详细日志 */
verbose?: boolean;
}
) {
// 设置默认参数
const resource_ids = params.resource_ids || [];
const sync_teams = params.sync_teams !== false;
const sync_projects = params.sync_projects !== false;
const sync_tags = params.sync_tags !== false;
const sync_favorites = params.sync_favorites !== false;
const dry_run = params.dry_run || false;
const verbose = params.verbose || false;
if (resource_ids.length === 0) {
return { success: false, message: "至少需要提供一个资源ID" };
}
// 初始化日志函数
const log = (message: string, isVerbose = false) => {
if (!isVerbose || verbose) {
console.log(message);
}
};
log(`开始同步资源关联数据: ${resource_ids.join(", ")}`);
log(`同步选项: teams=${sync_teams}, projects=${sync_projects}, tags=${sync_tags}, favorites=${sync_favorites}`, true);
let pgPool: Pool | null = null;
try {
// 1. 获取数据库配置
log("获取PostgreSQL数据库配置...", true);
const pgConfig = await getResource('f/limq/postgresql') as PgConfig;
// 2. 创建PostgreSQL连接池
pgPool = new Pool({
hostname: pgConfig.host,
port: pgConfig.port,
user: pgConfig.user,
password: pgConfig.password,
database: pgConfig.dbname || 'postgres'
}, 3);
// 3. 获取需要更新的资源完整数据
const resourcesData = await getResourcesWithRelations(pgPool, resource_ids, {
sync_teams,
sync_projects,
sync_tags,
sync_favorites
}, log);
log(`成功获取 ${resourcesData.length} 个资源的关联数据`);
if (resourcesData.length === 0) {
return { success: true, message: "没有找到需要更新的资源数据", updated: 0 };
}
// 4. 获取ClickHouse配置
const chConfig = await getClickHouseConfig();
// 5. 对每个资源执行更新
if (!dry_run) {
// 5a. 更新shorturl表数据
const shorturlUpdated = await updateClickHouseShorturl(resourcesData, chConfig, log);
// 5b. 更新events表数据
const eventsUpdated = await updateClickHouseEvents(resourcesData, chConfig, log);
return {
success: true,
message: "资源关联数据同步完成",
shorturl_updated: shorturlUpdated,
events_updated: eventsUpdated,
total_updated: shorturlUpdated + eventsUpdated
};
} else {
log("测试模式: 不执行实际更新");
if (resourcesData.length > 0) {
log("示例数据:");
log(JSON.stringify(resourcesData[0], null, 2));
}
return { success: true, dry_run: true, resources: resourcesData };
}
} catch (error) {
const errorMessage = `同步过程中发生错误: ${(error as Error).message}`;
log(errorMessage);
if ((error as Error).stack) {
log(`错误堆栈: ${(error as Error).stack}`, true);
}
return { success: false, message: errorMessage };
} finally {
if (pgPool) {
await pgPool.end();
log("PostgreSQL连接池已关闭", true);
}
}
}
/**
* 从PostgreSQL获取资源及其关联数据
*/
async function getResourcesWithRelations(
pgPool: Pool,
resourceIds: string[],
options: {
sync_teams: boolean;
sync_projects: boolean;
sync_tags: boolean;
sync_favorites: boolean;
},
log: (message: string, isVerbose?: boolean) => void
): Promise<ResourceRelations[]> {
const client = await pgPool.connect();
try {
// 准备资源IDs参数
const resourceIdsParam = resourceIds.map(id => `'${id}'`).join(',');
// 1. 获取基本资源信息
log(`获取资源基本信息: ${resourceIdsParam}`, true);
const resourcesQuery = `
SELECT
r.id,
r.external_id,
r.type,
r.attributes,
r.schema_version,
r.created_at,
r.updated_at
FROM
limq.resources r
WHERE
r.id IN (${resourceIdsParam})
AND r.deleted_at IS NULL
`;
const resourcesResult = await client.queryObject(resourcesQuery);
if (resourcesResult.rows.length === 0) {
log(`未找到有效的资源数据`, true);
return [];
}
// 处理每个资源
const enrichedResources: ResourceRelations[] = [];
for (const resource of resourcesResult.rows) {
const resourceId = resource.id as string;
log(`处理资源ID: ${resourceId}`, true);
// 初始化关联数据对象
const relationData: ResourceRelations = {
resource_id: resourceId,
external_id: resource.external_id as string,
type: resource.type as string,
attributes: parseJsonField(resource.attributes)
};
// 2. 获取项目关联
if (options.sync_projects) {
const projectsQuery = `
SELECT
pr.resource_id, pr.project_id,
p.name as project_name, p.description as project_description,
pr.assigned_at
FROM
limq.project_resources pr
JOIN
limq.projects p ON pr.project_id = p.id
WHERE
pr.resource_id = $1
AND p.deleted_at IS NULL
`;
const projectsResult = await client.queryObject(projectsQuery, [resourceId]);
relationData.projects = projectsResult.rows as ProjectData[];
log(`找到 ${projectsResult.rows.length} 个关联项目`, true);
}
// 3. 获取标签关联
if (options.sync_tags) {
const tagsQuery = `
SELECT
rt.resource_id, rt.tag_id, rt.created_at,
t.name as tag_name, t.type as tag_type
FROM
limq.resource_tags rt
JOIN
limq.tags t ON rt.tag_id = t.id
WHERE
rt.resource_id = $1
AND t.deleted_at IS NULL
`;
const tagsResult = await client.queryObject(tagsQuery, [resourceId]);
relationData.tags = tagsResult.rows as TagData[];
log(`找到 ${tagsResult.rows.length} 个关联标签`, true);
}
// 4. 获取团队关联(通过项目)
if (options.sync_teams && relationData.projects && relationData.projects.length > 0) {
const projectIds = relationData.projects.map((p: ProjectData) => p.project_id);
if (projectIds.length > 0) {
const teamsQuery = `
SELECT
tp.team_id, tp.project_id,
t.name as team_name, t.description as team_description
FROM
limq.team_projects tp
JOIN
limq.teams t ON tp.team_id = t.id
WHERE
tp.project_id = ANY($1::uuid[])
AND t.deleted_at IS NULL
`;
const teamsResult = await client.queryObject(teamsQuery, [projectIds]);
relationData.teams = teamsResult.rows as TeamData[];
log(`找到 ${teamsResult.rows.length} 个关联团队`, true);
}
}
// 5. 获取收藏关联
if (options.sync_favorites) {
const favoritesQuery = `
SELECT
f.id as favorite_id, f.user_id, f.created_at,
u.first_name, u.last_name, u.email
FROM
limq.favorite f
JOIN
limq.users u ON f.user_id = u.id
WHERE
f.favoritable_id = $1
AND f.favoritable_type = 'resource'
AND f.deleted_at IS NULL
`;
const favoritesResult = await client.queryObject(favoritesQuery, [resourceId]);
relationData.favorites = favoritesResult.rows as FavoriteData[];
log(`找到 ${favoritesResult.rows.length} 个收藏记录`, true);
}
// 添加到结果集
enrichedResources.push(relationData);
}
return enrichedResources;
} finally {
client.release();
}
}
/**
* 更新ClickHouse中的shorturl表数据
*/
async function updateClickHouseShorturl(
resources: ResourceRelations[],
chConfig: ChConfig,
log: (message: string, isVerbose?: boolean) => void
): Promise<number> {
// 只处理类型为shorturl的资源
const shorturls = resources.filter(r => r.type === 'shorturl');
if (shorturls.length === 0) {
log('没有找到shorturl类型的资源跳过shorturl表更新');
return 0;
}
log(`准备更新 ${shorturls.length} 个shorturl资源`);
let updatedCount = 0;
// 检查ClickHouse中是否存在shorturl表
const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.shorturl');
if (!tableExists) {
log('ClickHouse中未找到shorturl表请先创建表');
return 0;
}
// 对每个资源执行更新
for (const shorturl of shorturls) {
try {
// 格式化团队数据
const teams = JSON.stringify(shorturl.teams || []);
// 格式化项目数据
const projects = JSON.stringify(shorturl.projects || []);
// 格式化标签数据
const tags = JSON.stringify((shorturl.tags || []).map((t: TagData) => ({
tag_id: t.tag_id,
tag_name: t.tag_name,
tag_type: t.tag_type,
created_at: t.created_at
})));
// 格式化收藏数据
const favorites = JSON.stringify((shorturl.favorites || []).map((f: FavoriteData) => ({
favorite_id: f.favorite_id,
user_id: f.user_id,
user_name: `${f.first_name || ""} ${f.last_name || ""}`.trim(),
created_at: f.created_at
})));
// 尝试更新ClickHouse数据
const updateQuery = `
ALTER TABLE shorturl_analytics.shorturl
UPDATE
teams = '${escapeString(teams)}',
projects = '${escapeString(projects)}',
tags = '${escapeString(tags)}',
favorites = '${escapeString(favorites)}'
WHERE id = '${shorturl.resource_id}'
`;
await executeClickHouseQuery(chConfig, updateQuery);
log(`更新shorturl完成: ${shorturl.resource_id}`, true);
updatedCount++;
} catch (error) {
log(`更新shorturl ${shorturl.resource_id} 失败: ${(error as Error).message}`);
}
}
return updatedCount;
}
/**
* 更新ClickHouse中的events表数据
*/
async function updateClickHouseEvents(
resources: ResourceRelations[],
chConfig: ChConfig,
log: (message: string, isVerbose?: boolean) => void
): Promise<number> {
// 过滤出有external_id的资源
const resourcesWithExternalId = resources.filter(r => r.external_id && r.external_id.trim() !== '');
if (resourcesWithExternalId.length === 0) {
log('没有找到具有external_id的资源跳过events表更新');
return 0;
}
log(`准备更新events表中与 ${resourcesWithExternalId.length} 个外部ID相关的记录`);
// 检查ClickHouse中是否存在events表
const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.events');
if (!tableExists) {
log('ClickHouse中未找到events表请先创建表');
return 0;
}
// 提取所有的external_id
const externalIds = resourcesWithExternalId.map(r => r.external_id).filter(Boolean) as string[];
// 构建资源数据映射使用external_id作为键
const resourceMapByExternalId = resourcesWithExternalId.reduce((map, resource) => {
if (resource.external_id) {
map[resource.external_id] = resource;
}
return map;
}, {} as Record<string, ResourceRelations>);
// 获取ClickHouse中相关资源的事件记录数量
let updatedCount = 0;
try {
// 格式化外部ID列表
const formattedExternalIds = externalIds.map(id => `'${id}'`).join(', ');
// 先查询是否有相关事件
const countQuery = `
SELECT COUNT(*) as count
FROM shorturl_analytics.events
WHERE event_id IN (${formattedExternalIds})
`;
const countResult = await executeClickHouseQuery(chConfig, countQuery);
const eventCount = parseInt(countResult.trim(), 10);
if (eventCount === 0) {
// 尝试另一种查询方式
const alternateCountQuery = `
SELECT COUNT(*) as count
FROM shorturl_analytics.events
WHERE link_id IN (${formattedExternalIds})
`;
const alternateCountResult = await executeClickHouseQuery(chConfig, alternateCountQuery);
const alternateEventCount = parseInt(alternateCountResult.trim(), 10);
if (alternateEventCount === 0) {
log('没有找到相关事件记录跳过events表更新');
log(`已尝试的匹配字段: event_idlink_id`, true);
return 0;
} else {
log(`找到 ${alternateEventCount} 条以link_id匹配的事件记录需要更新`);
}
} else {
log(`找到 ${eventCount} 条以event_id匹配的事件记录需要更新`);
}
// 批量更新每个资源相关的事件记录
for (const externalId of externalIds) {
const resource = resourceMapByExternalId[externalId];
if (!resource) continue;
// 获取关联数据
const tags = resource.tags ? JSON.stringify(resource.tags) : null;
if (tags) {
// 尝试通过event_id更新事件标签
const updateTagsQueryByEventId = `
ALTER TABLE shorturl_analytics.events
UPDATE link_tags = '${escapeString(tags)}'
WHERE event_id = '${externalId}'
`;
await executeClickHouseQuery(chConfig, updateTagsQueryByEventId);
log(`尝试通过event_id更新事件标签: ${externalId}`, true);
// 尝试通过link_id更新事件标签
const updateTagsQueryByLinkId = `
ALTER TABLE shorturl_analytics.events
UPDATE link_tags = '${escapeString(tags)}'
WHERE link_id = '${externalId}'
`;
await executeClickHouseQuery(chConfig, updateTagsQueryByLinkId);
log(`尝试通过link_id更新事件标签: ${externalId}`, true);
}
// 如果资源有resource_id也尝试使用它来更新
if (resource.resource_id) {
const updateByResourceId = `
ALTER TABLE shorturl_analytics.events
UPDATE link_tags = '${escapeString(tags || '[]')}'
WHERE link_id = '${resource.resource_id}'
`;
await executeClickHouseQuery(chConfig, updateByResourceId);
log(`尝试通过resource_id更新事件标签: ${resource.resource_id}`, true);
}
updatedCount++;
}
log(`已尝试更新 ${updatedCount} 个资源的事件记录`);
} catch (error) {
log(`更新events表失败: ${(error as Error).message}`);
}
return updatedCount;
}
/**
* 获取ClickHouse配置
*/
async function getClickHouseConfig(): Promise<ChConfig> {
try {
const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse");
// 确保配置不为空
if (!chConfigJson) {
throw new Error("未找到ClickHouse配置");
}
// 解析JSON字符串为对象
let chConfig: ChConfig;
if (typeof chConfigJson === 'string') {
try {
chConfig = JSON.parse(chConfigJson);
} catch (_) {
throw new Error("ClickHouse配置不是有效的JSON");
}
} else {
chConfig = chConfigJson as ChConfig;
}
// 验证并构建URL
if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) {
chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`;
}
if (!chConfig.clickhouse_url) {
throw new Error("ClickHouse配置缺少URL");
}
return chConfig;
} catch (error) {
throw new Error(`获取ClickHouse配置失败: ${(error as Error).message}`);
}
}
/**
* 检查ClickHouse中是否存在指定表
*/
async function checkClickHouseTable(chConfig: ChConfig, tableName: string): Promise<boolean> {
try {
const query = `EXISTS TABLE ${tableName}`;
const result = await executeClickHouseQuery(chConfig, query);
return result.trim() === '1';
} catch (error) {
console.error(`检查表 ${tableName} 失败:`, error);
return false;
}
}
/**
* 执行ClickHouse查询
*/
async function executeClickHouseQuery(chConfig: ChConfig, query: string): Promise<string> {
// 确保URL有效
if (!chConfig.clickhouse_url) {
throw new Error("无效的ClickHouse URL: 未定义");
}
// 执行HTTP请求
try {
const response = await fetch(chConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}`
},
body: query,
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询失败 (${response.status}): ${errorText}`);
}
return await response.text();
} catch (error) {
throw new Error(`执行ClickHouse查询失败: ${(error as Error).message}`);
}
}
/**
* 解析JSON字段
*/
function parseJsonField(field: unknown): Record<string, unknown> {
if (!field) return {};
try {
if (typeof field === 'string') {
return JSON.parse(field);
} else if (typeof field === 'object') {
return field as Record<string, unknown>;
}
} catch (error) {
console.warn(`无法解析JSON字段:`, error);
}
return {};
}
/**
* 转义字符串避免SQL注入
*/
function escapeString(str: string): string {
if (!str) return '';
return str.replace(/'/g, "''");
}