Files
shorturl-analytics/windmill/sync_shorturl_schema_to_clickhouse.ts
2025-04-17 18:28:08 +08:00

527 lines
17 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 文件名: sync_shorturl_schema_to_clickhouse.ts
// 描述: 此脚本用于同步PostgreSQL中的short_url.shorturl表数据到ClickHouse
// 创建日期: 2023-11-21
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
// 同步状态接口
interface SyncState {
last_sync_time: string; // 上次同步的结束时间
records_synced: number; // 累计同步的记录数
last_run: string; // 上次运行的时间
}
// 同步状态键名
const SYNC_STATE_KEY = "f/shorturl_analytics/shorturl_to_clickhouse_state";
// PostgreSQL配置接口
interface PgConfig {
host: string;
port: number;
user: string;
password: string;
dbname?: string;
[key: string]: unknown;
}
// ClickHouse配置接口
interface ChConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_url?: string;
}
// Shorturl数据接口
interface ShortUrlData {
id: string;
slug: string;
origin: string; // 对应ClickHouse中的original_url
title?: string;
description?: string;
created_at?: string;
updated_at?: string;
deleted_at?: string;
expires_at?: string; // 注意这里已更正为expires_at
domain?: string; // 添加domain字段
}
/**
* 同步PostgreSQL short_url.shorturl表数据到ClickHouse
*/
export async function main(
/** 是否为测试模式(不执行实际更新) */
dry_run = false,
/** 是否显示详细日志 */
verbose = false,
/** 是否重置同步状态(从头开始同步) */
reset_sync_state = false,
/** 如果没有同步状态往前查询多少小时的数据默认1小时 */
default_hours_back = 1
) {
// 初始化日志函数
const log = (message: string, isVerbose = false) => {
if (!isVerbose || verbose) {
console.log(message);
}
};
// 获取同步状态
let syncState: SyncState | null = null;
if (!reset_sync_state) {
try {
log("获取同步状态...", true);
const rawState = await getVariable(SYNC_STATE_KEY);
if (rawState) {
if (typeof rawState === "string") {
syncState = JSON.parse(rawState);
} else {
syncState = rawState as SyncState;
}
log(`找到上次同步状态: 最后同步时间 ${syncState.last_sync_time}, 已同步记录数 ${syncState.records_synced}`, true);
}
} catch (error) {
log(`获取同步状态失败: ${error}, 将使用默认设置`, true);
}
} else {
log("重置同步状态,从头开始同步", true);
}
// 设置时间范围
const oneHourAgo = new Date(Date.now() - default_hours_back * 60 * 60 * 1000).toISOString();
// 如果有同步状态,使用上次同步时间作为开始时间;否则使用默认时间
const start_time = syncState ? syncState.last_sync_time : oneHourAgo;
const end_time = new Date().toISOString();
log(`开始同步shorturl表数据: ${start_time}${end_time}`);
let pgPool: Pool | null = null;
try {
// 1. 获取数据库配置
log("获取PostgreSQL数据库配置...", true);
const pgConfig = await getResource('f/limq/production_supabase') as PgConfig;
// 2. 创建PostgreSQL连接池
pgPool = new Pool({
hostname: pgConfig.host,
port: pgConfig.port,
user: pgConfig.user,
password: pgConfig.password,
database: pgConfig.dbname || 'postgres'
}, 3);
// 3. 获取需要更新的数据
const shorturlData = await getShortUrlData(pgPool, start_time, end_time, log);
log(`成功获取 ${shorturlData.length} 条shorturl数据`);
if (shorturlData.length === 0) {
// 更新同步状态,即使没有新数据
if (!dry_run) {
await updateSyncState(end_time, syncState ? syncState.records_synced : 0, log);
}
return { success: true, message: "没有找到需要更新的数据", updated: 0 };
}
// 4. 获取ClickHouse配置
const chConfig = await getClickHouseConfig();
// 5. 执行更新
if (!dry_run) {
const shorturlUpdated = await updateClickHouseShortUrl(shorturlData, chConfig, log);
// 更新同步状态
const totalSynced = (syncState ? syncState.records_synced : 0) + shorturlUpdated;
await updateSyncState(end_time, totalSynced, log);
return {
success: true,
message: "shorturl表数据同步完成",
shorturl_updated: shorturlUpdated,
total_synced: totalSynced,
sync_state: {
last_sync_time: end_time,
records_synced: totalSynced
}
};
} else {
log("测试模式: 不执行实际更新");
return {
success: true,
dry_run: true,
shorturl_count: shorturlData.length,
shorturl_sample: shorturlData.slice(0, 1)
};
}
} catch (error) {
const errorMessage = `同步过程中发生错误: ${(error as Error).message}`;
log(errorMessage);
if ((error as Error).stack) {
log(`错误堆栈: ${(error as Error).stack}`, true);
}
return { success: false, message: errorMessage };
} finally {
if (pgPool) {
await pgPool.end();
log("PostgreSQL连接池已关闭", true);
}
}
}
/**
* 更新同步状态
*/
async function updateSyncState(lastSyncTime: string, recordsSynced: number, log: (message: string, isVerbose?: boolean) => void): Promise<void> {
try {
const newState: SyncState = {
last_sync_time: lastSyncTime,
records_synced: recordsSynced,
last_run: new Date().toISOString()
};
await setVariable(SYNC_STATE_KEY, newState);
log(`同步状态已更新: 最后同步时间 ${lastSyncTime}, 累计同步记录数 ${recordsSynced}`, true);
} catch (error) {
log(`更新同步状态失败: ${error}`, true);
// 继续执行,不中断同步过程
}
}
/**
* 从PostgreSQL获取shorturl数据
*/
async function getShortUrlData(
pgPool: Pool,
startTime: string,
endTime: string,
log: (message: string, isVerbose?: boolean) => void
): Promise<ShortUrlData[]> {
const client = await pgPool.connect();
try {
log(`获取shorturl数据 (${startTime}${endTime})`, true);
const query = `
SELECT
id,
slug,
origin,
title,
description,
domain,
created_at,
updated_at,
deleted_at,
expired_at as expires_at
FROM
short_url.shorturl
WHERE
(created_at >= $1 AND created_at <= $2)
OR (updated_at >= $1 AND updated_at <= $2)
`;
const result = await client.queryObject(query, [startTime, endTime]);
return result.rows as ShortUrlData[];
} finally {
client.release();
}
}
/**
* 格式化日期时间为ClickHouse可接受的格式
*/
function formatDateTime(dateStr: string | null | undefined): string {
if (!dateStr) return 'NULL';
try {
// 将日期字符串转换为ISO格式
const date = new Date(dateStr);
if (isNaN(date.getTime())) {
return 'NULL';
}
// 返回ISO格式的日期字符串ClickHouse可以解析
return `parseDateTimeBestEffort('${date.toISOString()}')`;
} catch (error) {
console.error(`日期格式化错误: ${error}`);
return 'NULL';
}
}
/**
* 格式化进度显示
*/
function formatProgress(current: number, total: number): string {
const percent = Math.round((current / total) * 100);
const progressBar = '[' + '='.repeat(Math.floor(percent / 5)) + ' '.repeat(20 - Math.floor(percent / 5)) + ']';
return `${progressBar} ${percent}% (${current}/${total})`;
}
/**
* 更新ClickHouse中的shorturl表数据
*/
async function updateClickHouseShortUrl(
shorturls: ShortUrlData[],
chConfig: ChConfig,
log: (message: string, isVerbose?: boolean) => void
): Promise<number> {
if (shorturls.length === 0) {
log('没有找到shorturl数据跳过shorturl表更新');
return 0;
}
log(`准备更新 ${shorturls.length} 条shorturl数据`);
// 检查ClickHouse中是否存在shorturl表
const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.shorturl');
if (!tableExists) {
log('ClickHouse中未找到shorturl表请先创建表');
return 0;
}
let updatedCount = 0;
const startTime = Date.now();
// 使用批量插入更高效
const batchSize = 50; // 降低批次大小,使查询更稳定
for (let i = 0; i < shorturls.length; i += batchSize) {
const batch = shorturls.slice(i, i + batchSize);
let successCount = 0;
// 显示批处理进度信息
const batchNumber = Math.floor(i / batchSize) + 1;
const totalBatches = Math.ceil(shorturls.length / batchSize);
log(`处理批次 ${batchNumber}/${totalBatches}: ${formatProgress(i, shorturls.length)}`);
// 对每条记录使用单独的INSERT ... SELECT ... WHERE NOT EXISTS语句
for (let j = 0; j < batch.length; j++) {
const shorturl = batch[j];
// 显示记录处理细节进度
const overallProgress = i + j + 1;
if (overallProgress % 10 === 0 || overallProgress === shorturls.length) {
// 每10条记录或最后一条记录显示一次进度
const elapsedSeconds = (Date.now() - startTime) / 1000;
const recordsPerSecond = overallProgress / elapsedSeconds;
const remainingRecords = shorturls.length - overallProgress;
const estimatedSecondsRemaining = remainingRecords / recordsPerSecond;
log(`总进度: ${formatProgress(overallProgress, shorturls.length)} - 速率: ${recordsPerSecond.toFixed(1)}条/秒 - 预计剩余时间: ${formatTime(estimatedSecondsRemaining)}`);
}
try {
const insertQuery = `
INSERT INTO shorturl_analytics.shorturl
SELECT
'${escapeString(shorturl.id)}' AS id,
'${escapeString(shorturl.id)}' AS external_id,
'shorturl' AS type,
'${escapeString(shorturl.slug)}' AS slug,
'${escapeString(shorturl.origin)}' AS original_url,
${shorturl.title ? `'${escapeString(shorturl.title)}'` : 'NULL'} AS title,
${shorturl.description ? `'${escapeString(shorturl.description)}'` : 'NULL'} AS description,
'{}' AS attributes,
1 AS schema_version,
'' AS creator_id,
'' AS creator_email,
'' AS creator_name,
${formatDateTime(shorturl.created_at)} AS created_at,
${formatDateTime(shorturl.updated_at)} AS updated_at,
${formatDateTime(shorturl.deleted_at)} AS deleted_at,
'[]' AS projects,
'[]' AS teams,
'[]' AS tags,
'[]' AS qr_codes,
'[]' AS channels,
'[]' AS favorites,
${formatDateTime(shorturl.expires_at)} AS expires_at,
0 AS click_count,
0 AS unique_visitors,
${shorturl.domain ? `'${escapeString(shorturl.domain)}'` : 'NULL'} AS domain
WHERE NOT EXISTS (
SELECT 1 FROM shorturl_analytics.shorturl WHERE id = '${escapeString(shorturl.id)}'
)
`;
await executeClickHouseQuery(chConfig, insertQuery);
successCount++;
log(`成功处理shorturl: ${shorturl.id}`, true);
} catch (error) {
log(`处理shorturl ${shorturl.id} 失败: ${(error as Error).message}`);
// 尝试使用简单插入作为备选方案
try {
log(`尝试替代方法更新: ${shorturl.id}`, true);
// 先检查记录是否存在
const checkQuery = `SELECT count() FROM shorturl_analytics.shorturl WHERE id = '${escapeString(shorturl.id)}'`;
const existsResult = await executeClickHouseQuery(chConfig, checkQuery);
const exists = parseInt(existsResult.trim()) > 0;
if (!exists) {
const fallbackQuery = `
INSERT INTO shorturl_analytics.shorturl (
id, external_id, type, slug, original_url,
title, description, attributes, schema_version,
creator_id, creator_email, creator_name,
created_at, updated_at, deleted_at,
projects, teams, tags, qr_codes, channels, favorites,
expires_at, click_count, unique_visitors, domain
) VALUES (
'${escapeString(shorturl.id)}',
'${escapeString(shorturl.id)}',
'shorturl',
'${escapeString(shorturl.slug)}',
'${escapeString(shorturl.origin)}',
${shorturl.title ? `'${escapeString(shorturl.title)}'` : 'NULL'},
${shorturl.description ? `'${escapeString(shorturl.description)}'` : 'NULL'},
'{}',
1,
'',
'',
'',
${formatDateTime(shorturl.created_at)},
${formatDateTime(shorturl.updated_at)},
${formatDateTime(shorturl.deleted_at)},
'[]',
'[]',
'[]',
'[]',
'[]',
'[]',
${formatDateTime(shorturl.expires_at)},
0,
0,
${shorturl.domain ? `'${escapeString(shorturl.domain)}'` : 'NULL'}
)
`;
await executeClickHouseQuery(chConfig, fallbackQuery);
successCount++;
log(`备选方式插入成功: ${shorturl.id}`, true);
} else {
log(`记录已存在,跳过: ${shorturl.id}`, true);
}
} catch (fallbackError) {
log(`备选方式失败 ${shorturl.id}: ${(fallbackError as Error).message}`);
}
}
}
updatedCount += successCount;
log(`批次 ${batchNumber}/${totalBatches} 完成: ${successCount}/${batch.length} 条成功 (总计: ${updatedCount}/${shorturls.length})`);
}
const totalTime = (Date.now() - startTime) / 1000;
log(`同步完成! 总计处理: ${updatedCount}/${shorturls.length} 条记录, 耗时: ${formatTime(totalTime)}, 平均速率: ${(updatedCount / totalTime).toFixed(1)}条/秒`);
return updatedCount;
}
/**
* 获取ClickHouse配置
*/
async function getClickHouseConfig(): Promise<ChConfig> {
try {
const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse");
// 确保配置不为空
if (!chConfigJson) {
throw new Error("未找到ClickHouse配置");
}
// 解析JSON字符串为对象
let chConfig: ChConfig;
if (typeof chConfigJson === 'string') {
try {
chConfig = JSON.parse(chConfigJson);
} catch {
throw new Error("ClickHouse配置不是有效的JSON");
}
} else {
chConfig = chConfigJson as ChConfig;
}
// 验证并构建URL
if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) {
chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`;
}
if (!chConfig.clickhouse_url) {
throw new Error("ClickHouse配置缺少URL");
}
return chConfig;
} catch (error) {
throw new Error(`获取ClickHouse配置失败: ${(error as Error).message}`);
}
}
/**
* 检查ClickHouse中是否存在指定表
*/
async function checkClickHouseTable(chConfig: ChConfig, tableName: string): Promise<boolean> {
try {
const query = `EXISTS TABLE ${tableName}`;
const result = await executeClickHouseQuery(chConfig, query);
return result.trim() === '1';
} catch (error) {
console.error(`检查表 ${tableName} 失败:`, error);
return false;
}
}
/**
* 执行ClickHouse查询
*/
async function executeClickHouseQuery(chConfig: ChConfig, query: string): Promise<string> {
// 确保URL有效
if (!chConfig.clickhouse_url) {
throw new Error("无效的ClickHouse URL: 未定义");
}
// 执行HTTP请求
try {
const response = await fetch(chConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}`
},
body: query,
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询失败 (${response.status}): ${errorText}`);
}
return await response.text();
} catch (error) {
throw new Error(`执行ClickHouse查询失败: ${(error as Error).message}`);
}
}
/**
* 转义字符串避免SQL注入
*/
function escapeString(str: string): string {
if (!str) return '';
return str.replace(/'/g, "''");
}
/**
* 格式化时间(秒)为可读格式
*/
function formatTime(seconds: number): string {
const mins = Math.floor(seconds / 60);
const secs = Math.floor(seconds % 60);
if (mins === 0) {
return `${secs}`;
} else {
return `${mins}${secs}`;
}
}