709 lines
24 KiB
TypeScript
709 lines
24 KiB
TypeScript
// Windmill script to sync shorturl data from PostgreSQL to ClickHouse
|
||
// 作者: AI Assistant
|
||
// 创建日期: 2023-10-30
|
||
// 描述: 此脚本从PostgreSQL数据库获取所有shorturl类型的资源及其关联数据,并同步到ClickHouse
|
||
|
||
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
|
||
import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
|
||
|
||
// 资源属性接口
|
||
interface ResourceAttributes {
|
||
slug?: string;
|
||
original_url?: string;
|
||
originalUrl?: string;
|
||
title?: string;
|
||
description?: string;
|
||
expires_at?: string;
|
||
expiresAt?: string;
|
||
[key: string]: unknown;
|
||
}
|
||
|
||
// ClickHouse配置接口
|
||
interface ChConfig {
|
||
clickhouse_host: string;
|
||
clickhouse_port: number;
|
||
clickhouse_user: string;
|
||
clickhouse_password: string;
|
||
clickhouse_url: string;
|
||
}
|
||
|
||
// PostgreSQL配置接口
|
||
interface PgConfig {
|
||
host: string;
|
||
port: number;
|
||
user: string;
|
||
password: string;
|
||
dbname?: string;
|
||
[key: string]: unknown;
|
||
}
|
||
|
||
// 上次同步状态接口
|
||
interface SyncState {
|
||
lastSyncTime: string;
|
||
lastRunTime: string;
|
||
}
|
||
|
||
// 状态变量名称
|
||
const STATE_VARIABLE_PATH = "f/shorturl_analytics/shorturl_sync_state";
|
||
|
||
// Windmill函数定义
|
||
export async function main(
|
||
/** PostgreSQL和ClickHouse同步脚本 */
|
||
params: {
|
||
/** 同步的资源数量限制,默认500 */
|
||
limit?: number;
|
||
/** 是否包含已删除资源 */
|
||
includeDeleted?: boolean;
|
||
/** 是否执行实际写入操作 */
|
||
dryRun?: boolean;
|
||
/** 是否强制全量同步 */
|
||
forceFullSync?: boolean;
|
||
/** 手动指定开始时间(ISO格式)- 会覆盖自动增量设置 */
|
||
startTime?: string;
|
||
/** 手动指定结束时间(ISO格式)*/
|
||
endTime?: string;
|
||
}
|
||
) {
|
||
// 设置默认参数
|
||
const limit = params.limit || 500;
|
||
const includeDeleted = params.includeDeleted || false;
|
||
const dryRun = params.dryRun || false;
|
||
const forceFullSync = params.forceFullSync || false;
|
||
|
||
// 获取当前时间作为本次运行时间
|
||
const currentRunTime = new Date().toISOString();
|
||
|
||
// 初始化同步状态
|
||
let syncState: SyncState;
|
||
let startTime: Date | undefined;
|
||
const endTime: Date | undefined = params.endTime ? new Date(params.endTime) : new Date();
|
||
|
||
// 如果强制全量同步或手动指定了开始时间,则使用指定的开始时间
|
||
if (forceFullSync || params.startTime) {
|
||
startTime = params.startTime ? new Date(params.startTime) : undefined;
|
||
console.log(`使用${params.startTime ? '手动指定' : '全量同步'} - 开始时间: ${startTime ? startTime.toISOString() : '无限制'}`);
|
||
}
|
||
// 否则尝试获取上次同步时间作为增量同步的开始时间点
|
||
else {
|
||
try {
|
||
// 获取上次同步状态
|
||
const stateStr = await getVariable(STATE_VARIABLE_PATH);
|
||
if (stateStr) {
|
||
syncState = JSON.parse(stateStr);
|
||
console.log(`获取到上次同步状态: 同步时间=${syncState.lastSyncTime}, 运行时间=${syncState.lastRunTime}`);
|
||
|
||
// 使用上次运行时间作为本次的开始时间 (减去1分钟防止边界问题)
|
||
const lastRunTime = new Date(syncState.lastRunTime);
|
||
lastRunTime.setMinutes(lastRunTime.getMinutes() - 1);
|
||
startTime = lastRunTime;
|
||
} else {
|
||
console.log("未找到上次同步状态,将执行全量同步");
|
||
}
|
||
} catch (error: unknown) {
|
||
console.log(`获取同步状态出错: ${error instanceof Error ? error.message : String(error)},将执行全量同步`);
|
||
}
|
||
}
|
||
|
||
console.log(`开始同步PostgreSQL shorturl数据到ClickHouse`);
|
||
console.log(`参数: limit=${limit}, includeDeleted=${includeDeleted}, dryRun=${dryRun}`);
|
||
if (startTime) console.log(`开始时间: ${startTime.toISOString()}`);
|
||
if (endTime) console.log(`结束时间: ${endTime.toISOString()}`);
|
||
|
||
// 获取数据库配置
|
||
console.log("获取PostgreSQL数据库配置...");
|
||
const pgConfig = await getResource('f/limq/production_supabase') as PgConfig;
|
||
console.log(`数据库连接配置: host=${pgConfig.host}, port=${pgConfig.port}, database=${pgConfig.dbname || 'postgres'}, user=${pgConfig.user}`);
|
||
|
||
let pgPool: Pool | null = null;
|
||
|
||
try {
|
||
console.log("创建PostgreSQL连接池...");
|
||
|
||
pgPool = new Pool({
|
||
hostname: pgConfig.host,
|
||
port: pgConfig.port,
|
||
user: pgConfig.user,
|
||
password: pgConfig.password,
|
||
database: pgConfig.dbname || 'postgres'
|
||
}, 3);
|
||
|
||
console.log("PostgreSQL连接池创建完成,尝试连接...");
|
||
|
||
// 测试连接
|
||
const client = await pgPool.connect();
|
||
try {
|
||
console.log("连接成功,执行测试查询...");
|
||
const testResult = await client.queryObject(`SELECT 1 AS test`);
|
||
console.log(`测试查询结果: ${JSON.stringify(testResult.rows)}`);
|
||
} finally {
|
||
client.release();
|
||
}
|
||
|
||
// 获取所有shorturl类型的资源
|
||
const shorturls = await fetchShorturlResources(pgPool, {
|
||
limit,
|
||
includeDeleted,
|
||
startTime,
|
||
endTime,
|
||
});
|
||
|
||
console.log(`获取到 ${shorturls.length} 个shorturl资源`);
|
||
|
||
if (shorturls.length === 0) {
|
||
// 即使没有数据也更新状态
|
||
await updateSyncState(currentRunTime);
|
||
return { synced: 0, message: "没有找到需要同步的shorturl资源" };
|
||
}
|
||
|
||
// 为每个资源获取关联数据
|
||
const enrichedShorturls = await enrichShorturlData(pgPool, shorturls);
|
||
console.log(`已丰富 ${enrichedShorturls.length} 个shorturl资源的关联数据`);
|
||
|
||
// 转换为ClickHouse格式
|
||
const clickhouseData = formatForClickhouse(enrichedShorturls);
|
||
|
||
if (!dryRun) {
|
||
// 写入ClickHouse
|
||
const inserted = await insertToClickhouse(clickhouseData);
|
||
console.log(`成功写入 ${inserted} 条记录到ClickHouse`);
|
||
|
||
// 更新同步状态
|
||
await updateSyncState(currentRunTime);
|
||
|
||
return { synced: inserted, message: "同步完成", lastSyncTime: currentRunTime };
|
||
} else {
|
||
console.log("Dry run模式 - 不执行实际写入");
|
||
console.log(`将写入 ${clickhouseData.length} 条记录到ClickHouse`);
|
||
// 输出示例数据
|
||
if (clickhouseData.length > 0) {
|
||
console.log("示例数据:");
|
||
console.log(JSON.stringify(clickhouseData[0], null, 2));
|
||
}
|
||
return { synced: 0, dryRun: true, sampleData: clickhouseData.slice(0, 1) };
|
||
}
|
||
} catch (error: unknown) {
|
||
console.error(`同步过程中发生错误: ${(error as Error).message}`);
|
||
console.error(`错误类型: ${(error as Error).name}`);
|
||
if ((error as Error).stack) {
|
||
console.error(`错误堆栈: ${(error as Error).stack}`);
|
||
}
|
||
throw error;
|
||
} finally {
|
||
if (pgPool) {
|
||
await pgPool.end();
|
||
console.log("PostgreSQL连接池已关闭");
|
||
}
|
||
}
|
||
}
|
||
|
||
// 更新同步状态
|
||
async function updateSyncState(currentRunTime: string): Promise<void> {
|
||
try {
|
||
const syncState: SyncState = {
|
||
lastSyncTime: new Date().toISOString(), // 记录数据同步完成的时间
|
||
lastRunTime: currentRunTime // 记录本次运行的时间点
|
||
};
|
||
|
||
console.log(`更新同步状态: ${JSON.stringify(syncState)}`);
|
||
await setVariable(STATE_VARIABLE_PATH, JSON.stringify(syncState));
|
||
} catch (error: unknown) {
|
||
console.error(`更新同步状态失败: ${error instanceof Error ? error.message : String(error)}`);
|
||
// 不中断主流程,即使状态更新失败
|
||
}
|
||
}
|
||
|
||
// 从PostgreSQL获取所有shorturl资源
|
||
async function fetchShorturlResources(
|
||
pgPool: Pool,
|
||
options: {
|
||
limit: number;
|
||
includeDeleted: boolean;
|
||
startTime?: Date;
|
||
endTime?: Date;
|
||
}
|
||
) {
|
||
let query = `
|
||
SELECT
|
||
r.id,
|
||
r.external_id,
|
||
r.type,
|
||
r.attributes,
|
||
r.schema_version,
|
||
r.creator_id,
|
||
r.created_at,
|
||
r.updated_at,
|
||
r.deleted_at,
|
||
u.email as creator_email,
|
||
u.first_name as creator_first_name,
|
||
u.last_name as creator_last_name
|
||
FROM
|
||
limq.resources r
|
||
LEFT JOIN
|
||
limq.users u ON r.creator_id = u.id
|
||
WHERE
|
||
r.type = 'shorturl'
|
||
`;
|
||
|
||
const params = [];
|
||
let paramCount = 1;
|
||
|
||
if (!options.includeDeleted) {
|
||
query += ` AND r.deleted_at IS NULL`;
|
||
}
|
||
|
||
// 修改为同时考虑created_at和updated_at,确保捕获自上次同步以来创建或更新的记录
|
||
if (options.startTime) {
|
||
query += ` AND (r.created_at >= $${paramCount} OR r.updated_at >= $${paramCount})`;
|
||
params.push(options.startTime);
|
||
paramCount++;
|
||
}
|
||
|
||
if (options.endTime) {
|
||
query += ` AND r.created_at <= $${paramCount}`;
|
||
params.push(options.endTime);
|
||
paramCount++;
|
||
}
|
||
|
||
// 优先按更新时间排序,确保最近更新的记录先处理
|
||
query += ` ORDER BY r.updated_at DESC, r.created_at DESC LIMIT $${paramCount}`;
|
||
params.push(options.limit);
|
||
|
||
const client = await pgPool.connect();
|
||
try {
|
||
const result = await client.queryObject(query, params);
|
||
|
||
// 添加调试日志 - 显示获取的数据样本
|
||
if (result.rows.length > 0) {
|
||
console.log(`获取到 ${result.rows.length} 条shorturl记录`);
|
||
console.log(`第一条记录ID: ${result.rows[0].id}`);
|
||
console.log(`attributes类型: ${typeof result.rows[0].attributes}`);
|
||
console.log(`attributes内容示例: ${JSON.stringify(String(result.rows[0].attributes)).substring(0, 100)}...`);
|
||
}
|
||
|
||
return result.rows;
|
||
} finally {
|
||
client.release();
|
||
}
|
||
}
|
||
|
||
// 为每个shorturl资源获取关联数据
|
||
async function enrichShorturlData(pgPool: Pool, shorturls: Record<string, unknown>[]) {
|
||
const client = await pgPool.connect();
|
||
const enriched = [];
|
||
|
||
try {
|
||
for (const shorturl of shorturls) {
|
||
// 1. 获取项目关联
|
||
const projectsResult = await client.queryObject(`
|
||
SELECT
|
||
pr.resource_id, pr.project_id,
|
||
p.name as project_name, p.description as project_description,
|
||
pr.assigned_at
|
||
FROM
|
||
limq.project_resources pr
|
||
JOIN
|
||
limq.projects p ON pr.project_id = p.id
|
||
WHERE
|
||
pr.resource_id = $1
|
||
`, [shorturl.id]);
|
||
|
||
// 2. 获取团队关联(通过项目)
|
||
const teamIds = projectsResult.rows.map((p: Record<string, unknown>) => p.project_id);
|
||
const teamsResult = teamIds.length > 0 ? await client.queryObject(`
|
||
SELECT
|
||
tp.team_id, tp.project_id,
|
||
t.name as team_name, t.description as team_description
|
||
FROM
|
||
limq.team_projects tp
|
||
JOIN
|
||
limq.teams t ON tp.team_id = t.id
|
||
WHERE
|
||
tp.project_id = ANY($1::uuid[])
|
||
`, [teamIds]) : { rows: [] };
|
||
|
||
// 3. 获取标签关联
|
||
const tagsResult = await client.queryObject(`
|
||
SELECT
|
||
rt.resource_id, rt.tag_id, rt.created_at,
|
||
t.name as tag_name, t.type as tag_type
|
||
FROM
|
||
limq.resource_tags rt
|
||
JOIN
|
||
limq.tags t ON rt.tag_id = t.id
|
||
WHERE
|
||
rt.resource_id = $1
|
||
`, [shorturl.id]);
|
||
|
||
// 4. 获取QR码关联
|
||
const qrCodesResult = await client.queryObject(`
|
||
SELECT
|
||
id as qr_id, scan_count, url, template_name, created_at
|
||
FROM
|
||
limq.qr_code
|
||
WHERE
|
||
resource_id = $1
|
||
`, [shorturl.id]);
|
||
|
||
// 5. 获取渠道关联
|
||
const channelsResult = await client.queryObject(`
|
||
SELECT
|
||
id as channel_id, name as channel_name, path as channel_path,
|
||
"isUserCreated" as is_user_created
|
||
FROM
|
||
limq.channel
|
||
WHERE
|
||
"shortUrlId" = $1
|
||
`, [shorturl.id]);
|
||
|
||
// 6. 获取收藏关联
|
||
const favoritesResult = await client.queryObject(`
|
||
SELECT
|
||
f.id as favorite_id, f.user_id, f.created_at,
|
||
u.first_name, u.last_name
|
||
FROM
|
||
limq.favorite f
|
||
JOIN
|
||
limq.users u ON f.user_id = u.id
|
||
WHERE
|
||
f.favoritable_id = $1 AND f.favoritable_type = 'resource'
|
||
`, [shorturl.id]);
|
||
|
||
// 调试日志
|
||
console.log(`\n处理资源ID: ${shorturl.id}`);
|
||
console.log(`attributes类型: ${typeof shorturl.attributes}`);
|
||
|
||
// 改进的attributes解析逻辑
|
||
let attributes: ResourceAttributes = {};
|
||
try {
|
||
if (typeof shorturl.attributes === 'string') {
|
||
// 如果是字符串,尝试解析为JSON
|
||
console.log(`尝试解析attributes字符串,长度: ${shorturl.attributes.length}`);
|
||
attributes = JSON.parse(shorturl.attributes);
|
||
} else if (typeof shorturl.attributes === 'object' && shorturl.attributes !== null) {
|
||
// 如果已经是对象,直接使用
|
||
console.log('attributes已经是对象类型');
|
||
attributes = shorturl.attributes as ResourceAttributes;
|
||
} else {
|
||
console.log(`无效的attributes类型: ${typeof shorturl.attributes}`);
|
||
attributes = {};
|
||
}
|
||
} catch (err) {
|
||
const error = err as Error;
|
||
console.warn(`无法解析资源 ${shorturl.id} 的attributes JSON:`, error.message);
|
||
// 尝试进行更多原始数据分析
|
||
if (typeof shorturl.attributes === 'string') {
|
||
console.log(`原始字符串前100字符: ${shorturl.attributes.substring(0, 100)}`);
|
||
}
|
||
attributes = {};
|
||
}
|
||
|
||
// 尝试从QR码获取数据
|
||
let slugFromQr = "";
|
||
const urlFromQr = "";
|
||
|
||
if (qrCodesResult.rows.length > 0 && qrCodesResult.rows[0].url) {
|
||
const qrUrl = qrCodesResult.rows[0].url as string;
|
||
console.log(`找到QR码URL: ${qrUrl}`);
|
||
|
||
try {
|
||
const urlParts = qrUrl.split('/');
|
||
slugFromQr = urlParts[urlParts.length - 1];
|
||
console.log(`从QR码URL提取的slug: ${slugFromQr}`);
|
||
} catch (err) {
|
||
const error = err as Error;
|
||
console.log('无法从QR码URL提取slug:', error.message);
|
||
}
|
||
}
|
||
|
||
// 日志输出实际字段值
|
||
console.log(`提取字段 - name: ${attributes.name || 'N/A'}, slug: ${attributes.slug || 'N/A'}`);
|
||
console.log(`提取字段 - originalUrl: ${attributes.originalUrl || 'N/A'}, original_url: ${attributes.original_url || 'N/A'}`);
|
||
|
||
// 整合所有数据
|
||
const slug = attributes.slug || attributes.name || slugFromQr || "";
|
||
const originalUrl = attributes.originalUrl || attributes.original_url || urlFromQr || "";
|
||
|
||
console.log(`最终使用的slug: ${slug}`);
|
||
console.log(`最终使用的originalUrl: ${originalUrl}`);
|
||
|
||
enriched.push({
|
||
...shorturl,
|
||
attributes,
|
||
projects: projectsResult.rows,
|
||
teams: teamsResult.rows,
|
||
tags: tagsResult.rows,
|
||
qrCodes: qrCodesResult.rows,
|
||
channels: channelsResult.rows,
|
||
favorites: favoritesResult.rows,
|
||
// 从attributes中提取特定字段 - 使用改进的顺序和QR码备选
|
||
slug,
|
||
originalUrl,
|
||
title: attributes.title || "",
|
||
description: attributes.description || "",
|
||
expiresAt: attributes.expires_at || attributes.expiresAt || null
|
||
});
|
||
}
|
||
} finally {
|
||
client.release();
|
||
}
|
||
|
||
return enriched;
|
||
}
|
||
|
||
// 将PostgreSQL数据格式化为ClickHouse格式
|
||
function formatForClickhouse(shorturls: Record<string, unknown>[]) {
|
||
// 将日期格式化为ClickHouse兼容的DateTime64(3)格式
|
||
const formatDateTime = (date: Date | string | number | null | undefined): string | null => {
|
||
if (!date) return null;
|
||
// 转换为Date对象
|
||
const dateObj = date instanceof Date ? date : new Date(date);
|
||
// 返回格式化的字符串: YYYY-MM-DD HH:MM:SS.SSS
|
||
return dateObj.toISOString().replace('T', ' ').replace('Z', '');
|
||
};
|
||
|
||
console.log(`\n准备格式化 ${shorturls.length} 条记录为ClickHouse格式`);
|
||
|
||
return shorturls.map(shorturl => {
|
||
// 调试日志:输出关键字段
|
||
console.log(`处理资源: ${shorturl.id}`);
|
||
console.log(`slug: ${shorturl.slug || 'EMPTY'}`);
|
||
console.log(`originalUrl: ${shorturl.originalUrl || 'EMPTY'}`);
|
||
|
||
// 记录attributes状态
|
||
const attributesStr = JSON.stringify(shorturl.attributes || {});
|
||
const attributesPrev = attributesStr.length > 100 ?
|
||
attributesStr.substring(0, 100) + '...' :
|
||
attributesStr;
|
||
console.log(`attributes: ${attributesPrev}`);
|
||
|
||
const creatorName = [shorturl.creator_first_name, shorturl.creator_last_name]
|
||
.filter(Boolean)
|
||
.join(" ");
|
||
|
||
// 格式化项目数据为JSON数组
|
||
const projects = JSON.stringify((shorturl.projects as Record<string, unknown>[]).map((p) => ({
|
||
project_id: p.project_id,
|
||
project_name: p.project_name,
|
||
project_description: p.project_description,
|
||
assigned_at: p.assigned_at
|
||
})));
|
||
|
||
// 格式化团队数据为JSON数组
|
||
const teams = JSON.stringify((shorturl.teams as Record<string, unknown>[]).map((t) => ({
|
||
team_id: t.team_id,
|
||
team_name: t.team_name,
|
||
team_description: t.team_description,
|
||
via_project_id: t.project_id
|
||
})));
|
||
|
||
// 格式化标签数据为JSON数组
|
||
const tags = JSON.stringify((shorturl.tags as Record<string, unknown>[]).map((t) => ({
|
||
tag_id: t.tag_id,
|
||
tag_name: t.tag_name,
|
||
tag_type: t.tag_type,
|
||
created_at: t.created_at
|
||
})));
|
||
|
||
// 格式化QR码数据为JSON数组
|
||
const qrCodes = JSON.stringify((shorturl.qrCodes as Record<string, unknown>[]).map((q) => ({
|
||
qr_id: q.qr_id,
|
||
scan_count: q.scan_count,
|
||
url: q.url,
|
||
template_name: q.template_name,
|
||
created_at: q.created_at
|
||
})));
|
||
|
||
// 格式化渠道数据为JSON数组
|
||
const channels = JSON.stringify((shorturl.channels as Record<string, unknown>[]).map((c) => ({
|
||
channel_id: c.channel_id,
|
||
channel_name: c.channel_name,
|
||
channel_path: c.channel_path,
|
||
is_user_created: c.is_user_created
|
||
})));
|
||
|
||
// 格式化收藏数据为JSON数组
|
||
const favorites = JSON.stringify((shorturl.favorites as Record<string, unknown>[]).map((f) => ({
|
||
favorite_id: f.favorite_id,
|
||
user_id: f.user_id,
|
||
user_name: `${f.first_name || ""} ${f.last_name || ""}`.trim(),
|
||
created_at: f.created_at
|
||
})));
|
||
|
||
// 统计信息(可通过events表聚合或在其他地方设置)
|
||
const clickCount = (shorturl.attributes as ResourceAttributes).click_count as number || 0;
|
||
const uniqueVisitors = 0;
|
||
|
||
// 返回ClickHouse格式数据
|
||
return {
|
||
id: shorturl.id,
|
||
external_id: shorturl.external_id || "",
|
||
type: shorturl.type,
|
||
slug: shorturl.slug || "",
|
||
original_url: shorturl.originalUrl || "",
|
||
title: shorturl.title || "",
|
||
description: shorturl.description || "",
|
||
attributes: JSON.stringify(shorturl.attributes || {}),
|
||
schema_version: shorturl.schema_version || 1,
|
||
creator_id: shorturl.creator_id || "",
|
||
creator_email: shorturl.creator_email || "",
|
||
creator_name: creatorName,
|
||
created_at: formatDateTime(shorturl.created_at as Date),
|
||
updated_at: formatDateTime(shorturl.updated_at as Date),
|
||
deleted_at: formatDateTime(shorturl.deleted_at as Date | null),
|
||
projects,
|
||
teams,
|
||
tags,
|
||
qr_codes: qrCodes,
|
||
channels,
|
||
favorites,
|
||
expires_at: formatDateTime(shorturl.expiresAt as Date | null),
|
||
click_count: clickCount,
|
||
unique_visitors: uniqueVisitors
|
||
};
|
||
});
|
||
}
|
||
|
||
// 获取ClickHouse配置
|
||
async function getClickHouseConfig(): Promise<ChConfig> {
|
||
try {
|
||
// 使用getVariable而不是getResource获取ClickHouse配置
|
||
const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse");
|
||
console.log("原始ClickHouse配置:", typeof chConfigJson);
|
||
|
||
// 确保配置不为空
|
||
if (!chConfigJson) {
|
||
throw new Error("未找到ClickHouse配置");
|
||
}
|
||
|
||
// 解析JSON字符串为对象
|
||
let chConfig: ChConfig;
|
||
if (typeof chConfigJson === 'string') {
|
||
try {
|
||
chConfig = JSON.parse(chConfigJson);
|
||
} catch (parseError) {
|
||
console.error("解析JSON失败:", parseError);
|
||
throw new Error("ClickHouse配置不是有效的JSON");
|
||
}
|
||
} else {
|
||
chConfig = chConfigJson as ChConfig;
|
||
}
|
||
|
||
// 验证配置
|
||
if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) {
|
||
chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`;
|
||
console.log(`已构建ClickHouse URL: ${chConfig.clickhouse_url}`);
|
||
}
|
||
|
||
if (!chConfig.clickhouse_url) {
|
||
throw new Error("ClickHouse配置缺少URL");
|
||
}
|
||
|
||
return chConfig;
|
||
} catch (error) {
|
||
console.error("获取ClickHouse配置失败:", error);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
// 写入数据到ClickHouse
|
||
async function insertToClickhouse(data: Record<string, unknown>[]) {
|
||
if (data.length === 0) return 0;
|
||
|
||
// 获取ClickHouse连接信息
|
||
const chConfig = await getClickHouseConfig();
|
||
|
||
// 确保URL有效
|
||
if (!chConfig.clickhouse_url) {
|
||
throw new Error("无效的ClickHouse URL: 未定义");
|
||
}
|
||
|
||
console.log(`准备写入数据到ClickHouse: ${chConfig.clickhouse_url}`);
|
||
|
||
// 构建INSERT查询
|
||
const columns = Object.keys(data[0]).join(", ");
|
||
|
||
// 收集所有记录的ID
|
||
const recordIds = data.map(record => record.id as string);
|
||
console.log(`需要处理的记录数: ${recordIds.length}`);
|
||
|
||
// 先删除可能存在的重复记录
|
||
try {
|
||
console.log(`删除可能存在的重复记录...`);
|
||
|
||
// 按批次处理删除,避免请求过大
|
||
const deleteBatchSize = 100;
|
||
for (let i = 0; i < recordIds.length; i += deleteBatchSize) {
|
||
const idBatch = recordIds.slice(i, i + deleteBatchSize);
|
||
const formattedIds = idBatch.map(id => `'${id}'`).join(', ');
|
||
|
||
const deleteQuery = `
|
||
ALTER TABLE shorturl_analytics.shorturl
|
||
DELETE WHERE id IN (${formattedIds})
|
||
`;
|
||
|
||
const response = await fetch(chConfig.clickhouse_url, {
|
||
method: "POST",
|
||
headers: {
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
"Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}`
|
||
},
|
||
body: deleteQuery,
|
||
});
|
||
|
||
if (!response.ok) {
|
||
const errorText = await response.text();
|
||
console.warn(`删除记录时出错 (批次 ${i/deleteBatchSize + 1}): ${errorText}`);
|
||
// 继续执行,不中断流程
|
||
} else {
|
||
console.log(`成功删除批次 ${i/deleteBatchSize + 1}/${Math.ceil(recordIds.length/deleteBatchSize)}的潜在重复记录`);
|
||
}
|
||
}
|
||
} catch (error) {
|
||
console.warn(`删除重复记录时出错: ${(error as Error).message}`);
|
||
// 继续执行,不因为删除失败而中断整个过程
|
||
}
|
||
|
||
const query = `
|
||
INSERT INTO shorturl_analytics.shorturl (${columns})
|
||
FORMAT JSONEachRow
|
||
`;
|
||
|
||
// 批量插入
|
||
let inserted = 0;
|
||
const batchSize = 100;
|
||
|
||
for (let i = 0; i < data.length; i += batchSize) {
|
||
const batch = data.slice(i, i + batchSize);
|
||
|
||
// 使用JSONEachRow格式
|
||
const rows = batch.map(row => JSON.stringify(row)).join('\n');
|
||
|
||
// 使用HTTP接口执行查询
|
||
try {
|
||
console.log(`正在发送请求到: ${chConfig.clickhouse_url}`);
|
||
console.log(`认证信息: ${chConfig.clickhouse_user}:***`);
|
||
|
||
const response = await fetch(chConfig.clickhouse_url, {
|
||
method: "POST",
|
||
headers: {
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
"Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}`
|
||
},
|
||
body: `${query}\n${rows}`,
|
||
});
|
||
|
||
if (!response.ok) {
|
||
const errorText = await response.text();
|
||
throw new Error(`ClickHouse插入失败: ${errorText}`);
|
||
}
|
||
|
||
inserted += batch.length;
|
||
console.log(`已插入 ${inserted}/${data.length} 条记录`);
|
||
} catch (error) {
|
||
console.error(`请求ClickHouse时出错:`, error);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
return inserted;
|
||
}
|