467 lines
15 KiB
TypeScript
467 lines
15 KiB
TypeScript
// 文件名: sync_shorturl_schema_to_clickhouse.ts
|
||
// 描述: 此脚本用于同步PostgreSQL中的short_url.shorturl表数据到ClickHouse
|
||
// 创建日期: 2023-11-21
|
||
|
||
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
|
||
import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
|
||
|
||
// PostgreSQL配置接口
|
||
interface PgConfig {
|
||
host: string;
|
||
port: number;
|
||
user: string;
|
||
password: string;
|
||
dbname?: string;
|
||
[key: string]: unknown;
|
||
}
|
||
|
||
// ClickHouse配置接口
|
||
interface ChConfig {
|
||
clickhouse_host: string;
|
||
clickhouse_port: number;
|
||
clickhouse_user: string;
|
||
clickhouse_password: string;
|
||
clickhouse_url?: string;
|
||
}
|
||
|
||
// Shorturl数据接口
|
||
interface ShortUrlData {
|
||
id: string;
|
||
slug: string;
|
||
origin: string; // 对应ClickHouse中的original_url
|
||
title?: string;
|
||
description?: string;
|
||
created_at?: string;
|
||
updated_at?: string;
|
||
deleted_at?: string;
|
||
expires_at?: string; // 注意这里已更正为expires_at
|
||
domain?: string; // 添加domain字段
|
||
}
|
||
|
||
/**
|
||
* 同步PostgreSQL short_url.shorturl表数据到ClickHouse
|
||
*/
|
||
export async function main(
|
||
params: {
|
||
/** 同步数据的开始时间(ISO 8601格式)。默认为1小时前 */
|
||
start_time?: string;
|
||
/** 同步数据的结束时间(ISO 8601格式)。默认为当前时间 */
|
||
end_time?: string;
|
||
/** 是否为测试模式(不执行实际更新) */
|
||
dry_run?: boolean;
|
||
/** 是否显示详细日志 */
|
||
verbose?: boolean;
|
||
}
|
||
) {
|
||
// 设置默认参数
|
||
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000).toISOString();
|
||
const start_time = params.start_time || oneHourAgo;
|
||
const end_time = params.end_time || new Date().toISOString();
|
||
const dry_run = params.dry_run || false;
|
||
const verbose = params.verbose || false;
|
||
|
||
// 初始化日志函数
|
||
const log = (message: string, isVerbose = false) => {
|
||
if (!isVerbose || verbose) {
|
||
console.log(message);
|
||
}
|
||
};
|
||
|
||
log(`开始同步shorturl表数据: ${start_time} 至 ${end_time}`);
|
||
|
||
let pgPool: Pool | null = null;
|
||
|
||
try {
|
||
// 1. 获取数据库配置
|
||
log("获取PostgreSQL数据库配置...", true);
|
||
const pgConfig = await getResource('f/limq/production_supabase') as PgConfig;
|
||
|
||
// 2. 创建PostgreSQL连接池
|
||
pgPool = new Pool({
|
||
hostname: pgConfig.host,
|
||
port: pgConfig.port,
|
||
user: pgConfig.user,
|
||
password: pgConfig.password,
|
||
database: pgConfig.dbname || 'postgres'
|
||
}, 3);
|
||
|
||
// 3. 获取需要更新的数据
|
||
const shorturlData = await getShortUrlData(pgPool, start_time, end_time, log);
|
||
log(`成功获取 ${shorturlData.length} 条shorturl数据`);
|
||
|
||
if (shorturlData.length === 0) {
|
||
return { success: true, message: "没有找到需要更新的数据", updated: 0 };
|
||
}
|
||
|
||
// 4. 获取ClickHouse配置
|
||
const chConfig = await getClickHouseConfig();
|
||
|
||
// 5. 执行更新
|
||
if (!dry_run) {
|
||
const shorturlUpdated = await updateClickHouseShortUrl(shorturlData, chConfig, log);
|
||
|
||
return {
|
||
success: true,
|
||
message: "shorturl表数据同步完成",
|
||
shorturl_updated: shorturlUpdated
|
||
};
|
||
} else {
|
||
log("测试模式: 不执行实际更新");
|
||
return {
|
||
success: true,
|
||
dry_run: true,
|
||
shorturl_count: shorturlData.length,
|
||
shorturl_sample: shorturlData.slice(0, 1)
|
||
};
|
||
}
|
||
} catch (error) {
|
||
const errorMessage = `同步过程中发生错误: ${(error as Error).message}`;
|
||
log(errorMessage);
|
||
if ((error as Error).stack) {
|
||
log(`错误堆栈: ${(error as Error).stack}`, true);
|
||
}
|
||
return { success: false, message: errorMessage };
|
||
} finally {
|
||
if (pgPool) {
|
||
await pgPool.end();
|
||
log("PostgreSQL连接池已关闭", true);
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 从PostgreSQL获取shorturl数据
|
||
*/
|
||
async function getShortUrlData(
|
||
pgPool: Pool,
|
||
startTime: string,
|
||
endTime: string,
|
||
log: (message: string, isVerbose?: boolean) => void
|
||
): Promise<ShortUrlData[]> {
|
||
const client = await pgPool.connect();
|
||
|
||
try {
|
||
log(`获取shorturl数据 (${startTime} 至 ${endTime})`, true);
|
||
|
||
const query = `
|
||
SELECT
|
||
id,
|
||
slug,
|
||
origin,
|
||
title,
|
||
description,
|
||
domain,
|
||
created_at,
|
||
updated_at,
|
||
deleted_at,
|
||
expired_at as expires_at
|
||
FROM
|
||
short_url.shorturl
|
||
WHERE
|
||
(created_at >= $1 AND created_at <= $2)
|
||
OR (updated_at >= $1 AND updated_at <= $2)
|
||
`;
|
||
|
||
const result = await client.queryObject(query, [startTime, endTime]);
|
||
return result.rows as ShortUrlData[];
|
||
} finally {
|
||
client.release();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 格式化日期时间为ClickHouse可接受的格式
|
||
*/
|
||
function formatDateTime(dateStr: string | null | undefined): string {
|
||
if (!dateStr) return 'NULL';
|
||
|
||
try {
|
||
// 将日期字符串转换为ISO格式
|
||
const date = new Date(dateStr);
|
||
if (isNaN(date.getTime())) {
|
||
return 'NULL';
|
||
}
|
||
|
||
// 返回ISO格式的日期字符串,ClickHouse可以解析
|
||
return `parseDateTimeBestEffort('${date.toISOString()}')`;
|
||
} catch (error) {
|
||
console.error(`日期格式化错误: ${error}`);
|
||
return 'NULL';
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 格式化进度显示
|
||
*/
|
||
function formatProgress(current: number, total: number): string {
|
||
const percent = Math.round((current / total) * 100);
|
||
const progressBar = '[' + '='.repeat(Math.floor(percent / 5)) + ' '.repeat(20 - Math.floor(percent / 5)) + ']';
|
||
return `${progressBar} ${percent}% (${current}/${total})`;
|
||
}
|
||
|
||
/**
|
||
* 更新ClickHouse中的shorturl表数据
|
||
*/
|
||
async function updateClickHouseShortUrl(
|
||
shorturls: ShortUrlData[],
|
||
chConfig: ChConfig,
|
||
log: (message: string, isVerbose?: boolean) => void
|
||
): Promise<number> {
|
||
if (shorturls.length === 0) {
|
||
log('没有找到shorturl数据,跳过shorturl表更新');
|
||
return 0;
|
||
}
|
||
|
||
log(`准备更新 ${shorturls.length} 条shorturl数据`);
|
||
|
||
// 检查ClickHouse中是否存在shorturl表
|
||
const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.shorturl');
|
||
|
||
if (!tableExists) {
|
||
log('ClickHouse中未找到shorturl表,请先创建表');
|
||
return 0;
|
||
}
|
||
|
||
let updatedCount = 0;
|
||
const startTime = Date.now();
|
||
|
||
// 使用批量插入更高效
|
||
const batchSize = 50; // 降低批次大小,使查询更稳定
|
||
for (let i = 0; i < shorturls.length; i += batchSize) {
|
||
const batch = shorturls.slice(i, i + batchSize);
|
||
let successCount = 0;
|
||
|
||
// 显示批处理进度信息
|
||
const batchNumber = Math.floor(i / batchSize) + 1;
|
||
const totalBatches = Math.ceil(shorturls.length / batchSize);
|
||
log(`处理批次 ${batchNumber}/${totalBatches}: ${formatProgress(i, shorturls.length)}`);
|
||
|
||
// 对每条记录使用单独的INSERT ... SELECT ... WHERE NOT EXISTS语句
|
||
for (let j = 0; j < batch.length; j++) {
|
||
const shorturl = batch[j];
|
||
// 显示记录处理细节进度
|
||
const overallProgress = i + j + 1;
|
||
if (overallProgress % 10 === 0 || overallProgress === shorturls.length) {
|
||
// 每10条记录或最后一条记录显示一次进度
|
||
const elapsedSeconds = (Date.now() - startTime) / 1000;
|
||
const recordsPerSecond = overallProgress / elapsedSeconds;
|
||
const remainingRecords = shorturls.length - overallProgress;
|
||
const estimatedSecondsRemaining = remainingRecords / recordsPerSecond;
|
||
|
||
log(`总进度: ${formatProgress(overallProgress, shorturls.length)} - 速率: ${recordsPerSecond.toFixed(1)}条/秒 - 预计剩余时间: ${formatTime(estimatedSecondsRemaining)}`);
|
||
}
|
||
|
||
try {
|
||
const insertQuery = `
|
||
INSERT INTO shorturl_analytics.shorturl
|
||
SELECT
|
||
'${escapeString(shorturl.id)}' AS id,
|
||
'${escapeString(shorturl.id)}' AS external_id,
|
||
'shorturl' AS type,
|
||
'${escapeString(shorturl.slug)}' AS slug,
|
||
'${escapeString(shorturl.origin)}' AS original_url,
|
||
${shorturl.title ? `'${escapeString(shorturl.title)}'` : 'NULL'} AS title,
|
||
${shorturl.description ? `'${escapeString(shorturl.description)}'` : 'NULL'} AS description,
|
||
'{}' AS attributes,
|
||
1 AS schema_version,
|
||
'' AS creator_id,
|
||
'' AS creator_email,
|
||
'' AS creator_name,
|
||
${formatDateTime(shorturl.created_at)} AS created_at,
|
||
${formatDateTime(shorturl.updated_at)} AS updated_at,
|
||
${formatDateTime(shorturl.deleted_at)} AS deleted_at,
|
||
'[]' AS projects,
|
||
'[]' AS teams,
|
||
'[]' AS tags,
|
||
'[]' AS qr_codes,
|
||
'[]' AS channels,
|
||
'[]' AS favorites,
|
||
${formatDateTime(shorturl.expires_at)} AS expires_at,
|
||
0 AS click_count,
|
||
0 AS unique_visitors,
|
||
${shorturl.domain ? `'${escapeString(shorturl.domain)}'` : 'NULL'} AS domain
|
||
WHERE NOT EXISTS (
|
||
SELECT 1 FROM shorturl_analytics.shorturl WHERE id = '${escapeString(shorturl.id)}'
|
||
)
|
||
`;
|
||
|
||
await executeClickHouseQuery(chConfig, insertQuery);
|
||
successCount++;
|
||
log(`成功处理shorturl: ${shorturl.id}`, true);
|
||
} catch (error) {
|
||
log(`处理shorturl ${shorturl.id} 失败: ${(error as Error).message}`);
|
||
|
||
// 尝试使用简单插入作为备选方案
|
||
try {
|
||
log(`尝试替代方法更新: ${shorturl.id}`, true);
|
||
|
||
// 先检查记录是否存在
|
||
const checkQuery = `SELECT count() FROM shorturl_analytics.shorturl WHERE id = '${escapeString(shorturl.id)}'`;
|
||
const existsResult = await executeClickHouseQuery(chConfig, checkQuery);
|
||
const exists = parseInt(existsResult.trim()) > 0;
|
||
|
||
if (!exists) {
|
||
const fallbackQuery = `
|
||
INSERT INTO shorturl_analytics.shorturl (
|
||
id, external_id, type, slug, original_url,
|
||
title, description, attributes, schema_version,
|
||
creator_id, creator_email, creator_name,
|
||
created_at, updated_at, deleted_at,
|
||
projects, teams, tags, qr_codes, channels, favorites,
|
||
expires_at, click_count, unique_visitors, domain
|
||
) VALUES (
|
||
'${escapeString(shorturl.id)}',
|
||
'${escapeString(shorturl.id)}',
|
||
'shorturl',
|
||
'${escapeString(shorturl.slug)}',
|
||
'${escapeString(shorturl.origin)}',
|
||
${shorturl.title ? `'${escapeString(shorturl.title)}'` : 'NULL'},
|
||
${shorturl.description ? `'${escapeString(shorturl.description)}'` : 'NULL'},
|
||
'{}',
|
||
1,
|
||
'',
|
||
'',
|
||
'',
|
||
${formatDateTime(shorturl.created_at)},
|
||
${formatDateTime(shorturl.updated_at)},
|
||
${formatDateTime(shorturl.deleted_at)},
|
||
'[]',
|
||
'[]',
|
||
'[]',
|
||
'[]',
|
||
'[]',
|
||
'[]',
|
||
${formatDateTime(shorturl.expires_at)},
|
||
0,
|
||
0,
|
||
${shorturl.domain ? `'${escapeString(shorturl.domain)}'` : 'NULL'}
|
||
)
|
||
`;
|
||
|
||
await executeClickHouseQuery(chConfig, fallbackQuery);
|
||
successCount++;
|
||
log(`备选方式插入成功: ${shorturl.id}`, true);
|
||
} else {
|
||
log(`记录已存在,跳过: ${shorturl.id}`, true);
|
||
}
|
||
} catch (fallbackError) {
|
||
log(`备选方式失败 ${shorturl.id}: ${(fallbackError as Error).message}`);
|
||
}
|
||
}
|
||
}
|
||
|
||
updatedCount += successCount;
|
||
log(`批次 ${batchNumber}/${totalBatches} 完成: ${successCount}/${batch.length} 条成功 (总计: ${updatedCount}/${shorturls.length})`);
|
||
}
|
||
|
||
const totalTime = (Date.now() - startTime) / 1000;
|
||
log(`同步完成! 总计处理: ${updatedCount}/${shorturls.length} 条记录, 耗时: ${formatTime(totalTime)}, 平均速率: ${(updatedCount / totalTime).toFixed(1)}条/秒`);
|
||
|
||
return updatedCount;
|
||
}
|
||
|
||
/**
|
||
* 获取ClickHouse配置
|
||
*/
|
||
async function getClickHouseConfig(): Promise<ChConfig> {
|
||
try {
|
||
const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse");
|
||
|
||
// 确保配置不为空
|
||
if (!chConfigJson) {
|
||
throw new Error("未找到ClickHouse配置");
|
||
}
|
||
|
||
// 解析JSON字符串为对象
|
||
let chConfig: ChConfig;
|
||
if (typeof chConfigJson === 'string') {
|
||
try {
|
||
chConfig = JSON.parse(chConfigJson);
|
||
} catch {
|
||
throw new Error("ClickHouse配置不是有效的JSON");
|
||
}
|
||
} else {
|
||
chConfig = chConfigJson as ChConfig;
|
||
}
|
||
|
||
// 验证并构建URL
|
||
if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) {
|
||
chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`;
|
||
}
|
||
|
||
if (!chConfig.clickhouse_url) {
|
||
throw new Error("ClickHouse配置缺少URL");
|
||
}
|
||
|
||
return chConfig;
|
||
} catch (error) {
|
||
throw new Error(`获取ClickHouse配置失败: ${(error as Error).message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 检查ClickHouse中是否存在指定表
|
||
*/
|
||
async function checkClickHouseTable(chConfig: ChConfig, tableName: string): Promise<boolean> {
|
||
try {
|
||
const query = `EXISTS TABLE ${tableName}`;
|
||
const result = await executeClickHouseQuery(chConfig, query);
|
||
return result.trim() === '1';
|
||
} catch (error) {
|
||
console.error(`检查表 ${tableName} 失败:`, error);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 执行ClickHouse查询
|
||
*/
|
||
async function executeClickHouseQuery(chConfig: ChConfig, query: string): Promise<string> {
|
||
// 确保URL有效
|
||
if (!chConfig.clickhouse_url) {
|
||
throw new Error("无效的ClickHouse URL: 未定义");
|
||
}
|
||
|
||
// 执行HTTP请求
|
||
try {
|
||
const response = await fetch(chConfig.clickhouse_url, {
|
||
method: "POST",
|
||
headers: {
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
"Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}`
|
||
},
|
||
body: query,
|
||
});
|
||
|
||
if (!response.ok) {
|
||
const errorText = await response.text();
|
||
throw new Error(`ClickHouse查询失败 (${response.status}): ${errorText}`);
|
||
}
|
||
|
||
return await response.text();
|
||
} catch (error) {
|
||
throw new Error(`执行ClickHouse查询失败: ${(error as Error).message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 转义字符串,避免SQL注入
|
||
*/
|
||
function escapeString(str: string): string {
|
||
if (!str) return '';
|
||
return str.replace(/'/g, "''");
|
||
}
|
||
|
||
/**
|
||
* 格式化时间(秒)为可读格式
|
||
*/
|
||
function formatTime(seconds: number): string {
|
||
const mins = Math.floor(seconds / 60);
|
||
const secs = Math.floor(seconds % 60);
|
||
|
||
if (mins === 0) {
|
||
return `${secs}秒`;
|
||
} else {
|
||
return `${mins}分${secs}秒`;
|
||
}
|
||
}
|