Files
shorturl-analytics/windmill/sync_shorturl_to_clickhouse.ts
2025-04-07 21:58:28 +08:00

641 lines
21 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Windmill script to sync shorturl data from PostgreSQL to ClickHouse
// 作者: AI Assistant
// 创建日期: 2023-10-30
// 描述: 此脚本从PostgreSQL数据库获取所有shorturl类型的资源及其关联数据并同步到ClickHouse
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
// 资源属性接口
interface ResourceAttributes {
slug?: string;
original_url?: string;
originalUrl?: string;
title?: string;
description?: string;
expires_at?: string;
expiresAt?: string;
[key: string]: unknown;
}
// ClickHouse配置接口
interface ChConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_url: string;
}
// PostgreSQL配置接口
interface PgConfig {
host: string;
port: number;
user: string;
password: string;
dbname?: string;
[key: string]: unknown;
}
// Windmill函数定义
export async function main(
/** PostgreSQL和ClickHouse同步脚本 */
params: {
/** 同步的资源数量限制默认500 */
limit?: number;
/** 是否包含已删除资源 */
includeDeleted?: boolean;
/** 是否执行实际写入操作 */
dryRun?: boolean;
/** 开始时间ISO格式*/
startTime?: string;
/** 结束时间ISO格式*/
endTime?: string;
}
) {
// 设置默认参数
const limit = params.limit || 500;
const includeDeleted = params.includeDeleted || false;
const dryRun = params.dryRun || false;
const startTime = params.startTime ? new Date(params.startTime) : undefined;
const endTime = params.endTime ? new Date(params.endTime) : undefined;
console.log(`开始同步PostgreSQL shorturl数据到ClickHouse`);
console.log(`参数: limit=${limit}, includeDeleted=${includeDeleted}, dryRun=${dryRun}`);
if (startTime) console.log(`开始时间: ${startTime.toISOString()}`);
if (endTime) console.log(`结束时间: ${endTime.toISOString()}`);
// 获取数据库配置
console.log("获取PostgreSQL数据库配置...");
const pgConfig = await getResource('f/limq/postgresql') as PgConfig;
console.log(`数据库连接配置: host=${pgConfig.host}, port=${pgConfig.port}, database=${pgConfig.dbname || 'postgres'}, user=${pgConfig.user}`);
let pgPool: Pool | null = null;
try {
console.log("创建PostgreSQL连接池...");
pgPool = new Pool({
hostname: pgConfig.host,
port: pgConfig.port,
user: pgConfig.user,
password: pgConfig.password,
database: pgConfig.dbname || 'postgres'
}, 3);
console.log("PostgreSQL连接池创建完成尝试连接...");
// 测试连接
const client = await pgPool.connect();
try {
console.log("连接成功,执行测试查询...");
const testResult = await client.queryObject(`SELECT 1 AS test`);
console.log(`测试查询结果: ${JSON.stringify(testResult.rows)}`);
} finally {
client.release();
}
// 获取所有shorturl类型的资源
const shorturls = await fetchShorturlResources(pgPool, {
limit,
includeDeleted,
startTime,
endTime,
});
console.log(`获取到 ${shorturls.length} 个shorturl资源`);
if (shorturls.length === 0) {
return { synced: 0, message: "没有找到需要同步的shorturl资源" };
}
// 为每个资源获取关联数据
const enrichedShorturls = await enrichShorturlData(pgPool, shorturls);
console.log(`已丰富 ${enrichedShorturls.length} 个shorturl资源的关联数据`);
// 转换为ClickHouse格式
const clickhouseData = formatForClickhouse(enrichedShorturls);
if (!dryRun) {
// 写入ClickHouse
const inserted = await insertToClickhouse(clickhouseData);
console.log(`成功写入 ${inserted} 条记录到ClickHouse`);
return { synced: inserted, message: "同步完成" };
} else {
console.log("Dry run模式 - 不执行实际写入");
console.log(`将写入 ${clickhouseData.length} 条记录到ClickHouse`);
// 输出示例数据
if (clickhouseData.length > 0) {
console.log("示例数据:");
console.log(JSON.stringify(clickhouseData[0], null, 2));
}
return { synced: 0, dryRun: true, sampleData: clickhouseData.slice(0, 1) };
}
} catch (error: unknown) {
console.error(`同步过程中发生错误: ${(error as Error).message}`);
console.error(`错误类型: ${(error as Error).name}`);
if ((error as Error).stack) {
console.error(`错误堆栈: ${(error as Error).stack}`);
}
throw error;
} finally {
if (pgPool) {
await pgPool.end();
console.log("PostgreSQL连接池已关闭");
}
}
}
// 从PostgreSQL获取所有shorturl资源
async function fetchShorturlResources(
pgPool: Pool,
options: {
limit: number;
includeDeleted: boolean;
startTime?: Date;
endTime?: Date;
}
) {
let query = `
SELECT
r.id,
r.external_id,
r.type,
r.attributes,
r.schema_version,
r.creator_id,
r.created_at,
r.updated_at,
r.deleted_at,
u.email as creator_email,
u.first_name as creator_first_name,
u.last_name as creator_last_name
FROM
limq.resources r
LEFT JOIN
limq.users u ON r.creator_id = u.id
WHERE
r.type = 'shorturl'
`;
const params = [];
let paramCount = 1;
if (!options.includeDeleted) {
query += ` AND r.deleted_at IS NULL`;
}
if (options.startTime) {
query += ` AND r.created_at >= $${paramCount}`;
params.push(options.startTime);
paramCount++;
}
if (options.endTime) {
query += ` AND r.created_at <= $${paramCount}`;
params.push(options.endTime);
paramCount++;
}
query += ` ORDER BY r.created_at DESC LIMIT $${paramCount}`;
params.push(options.limit);
const client = await pgPool.connect();
try {
const result = await client.queryObject(query, params);
// 添加调试日志 - 显示获取的数据样本
if (result.rows.length > 0) {
console.log(`获取到 ${result.rows.length} 条shorturl记录`);
console.log(`第一条记录ID: ${result.rows[0].id}`);
console.log(`attributes类型: ${typeof result.rows[0].attributes}`);
console.log(`attributes内容示例: ${JSON.stringify(String(result.rows[0].attributes)).substring(0, 100)}...`);
}
return result.rows;
} finally {
client.release();
}
}
// 为每个shorturl资源获取关联数据
async function enrichShorturlData(pgPool: Pool, shorturls: Record<string, unknown>[]) {
const client = await pgPool.connect();
const enriched = [];
try {
for (const shorturl of shorturls) {
// 1. 获取项目关联
const projectsResult = await client.queryObject(`
SELECT
pr.resource_id, pr.project_id,
p.name as project_name, p.description as project_description,
pr.assigned_at
FROM
limq.project_resources pr
JOIN
limq.projects p ON pr.project_id = p.id
WHERE
pr.resource_id = $1
`, [shorturl.id]);
// 2. 获取团队关联(通过项目)
const teamIds = projectsResult.rows.map((p: Record<string, unknown>) => p.project_id);
const teamsResult = teamIds.length > 0 ? await client.queryObject(`
SELECT
tp.team_id, tp.project_id,
t.name as team_name, t.description as team_description
FROM
limq.team_projects tp
JOIN
limq.teams t ON tp.team_id = t.id
WHERE
tp.project_id = ANY($1::uuid[])
`, [teamIds]) : { rows: [] };
// 3. 获取标签关联
const tagsResult = await client.queryObject(`
SELECT
rt.resource_id, rt.tag_id, rt.created_at,
t.name as tag_name, t.type as tag_type
FROM
limq.resource_tags rt
JOIN
limq.tags t ON rt.tag_id = t.id
WHERE
rt.resource_id = $1
`, [shorturl.id]);
// 4. 获取QR码关联
const qrCodesResult = await client.queryObject(`
SELECT
id as qr_id, scan_count, url, template_name, created_at
FROM
limq.qr_code
WHERE
resource_id = $1
`, [shorturl.id]);
// 5. 获取渠道关联
const channelsResult = await client.queryObject(`
SELECT
id as channel_id, name as channel_name, path as channel_path,
"isUserCreated" as is_user_created
FROM
limq.channel
WHERE
"shortUrlId" = $1
`, [shorturl.id]);
// 6. 获取收藏关联
const favoritesResult = await client.queryObject(`
SELECT
f.id as favorite_id, f.user_id, f.created_at,
u.first_name, u.last_name
FROM
limq.favorite f
JOIN
limq.users u ON f.user_id = u.id
WHERE
f.favoritable_id = $1 AND f.favoritable_type = 'resource'
`, [shorturl.id]);
// 调试日志
console.log(`\n处理资源ID: ${shorturl.id}`);
console.log(`attributes类型: ${typeof shorturl.attributes}`);
// 改进的attributes解析逻辑
let attributes: ResourceAttributes = {};
try {
if (typeof shorturl.attributes === 'string') {
// 如果是字符串尝试解析为JSON
console.log(`尝试解析attributes字符串长度: ${shorturl.attributes.length}`);
attributes = JSON.parse(shorturl.attributes);
} else if (typeof shorturl.attributes === 'object' && shorturl.attributes !== null) {
// 如果已经是对象,直接使用
console.log('attributes已经是对象类型');
attributes = shorturl.attributes as ResourceAttributes;
} else {
console.log(`无效的attributes类型: ${typeof shorturl.attributes}`);
attributes = {};
}
} catch (err) {
const error = err as Error;
console.warn(`无法解析资源 ${shorturl.id} 的attributes JSON:`, error.message);
// 尝试进行更多原始数据分析
if (typeof shorturl.attributes === 'string') {
console.log(`原始字符串前100字符: ${shorturl.attributes.substring(0, 100)}`);
}
attributes = {};
}
// 尝试从QR码获取数据
let slugFromQr = "";
const urlFromQr = "";
if (qrCodesResult.rows.length > 0 && qrCodesResult.rows[0].url) {
const qrUrl = qrCodesResult.rows[0].url as string;
console.log(`找到QR码URL: ${qrUrl}`);
try {
const urlParts = qrUrl.split('/');
slugFromQr = urlParts[urlParts.length - 1];
console.log(`从QR码URL提取的slug: ${slugFromQr}`);
} catch (err) {
const error = err as Error;
console.log('无法从QR码URL提取slug:', error.message);
}
}
// 日志输出实际字段值
console.log(`提取字段 - name: ${attributes.name || 'N/A'}, slug: ${attributes.slug || 'N/A'}`);
console.log(`提取字段 - originalUrl: ${attributes.originalUrl || 'N/A'}, original_url: ${attributes.original_url || 'N/A'}`);
// 整合所有数据
const slug = attributes.slug || attributes.name || slugFromQr || "";
const originalUrl = attributes.originalUrl || attributes.original_url || urlFromQr || "";
console.log(`最终使用的slug: ${slug}`);
console.log(`最终使用的originalUrl: ${originalUrl}`);
enriched.push({
...shorturl,
attributes,
projects: projectsResult.rows,
teams: teamsResult.rows,
tags: tagsResult.rows,
qrCodes: qrCodesResult.rows,
channels: channelsResult.rows,
favorites: favoritesResult.rows,
// 从attributes中提取特定字段 - 使用改进的顺序和QR码备选
slug,
originalUrl,
title: attributes.title || "",
description: attributes.description || "",
expiresAt: attributes.expires_at || attributes.expiresAt || null
});
}
} finally {
client.release();
}
return enriched;
}
// 将PostgreSQL数据格式化为ClickHouse格式
function formatForClickhouse(shorturls: Record<string, unknown>[]) {
// 将日期格式化为ClickHouse兼容的DateTime64(3)格式
const formatDateTime = (date: Date | string | number | null | undefined): string | null => {
if (!date) return null;
// 转换为Date对象
const dateObj = date instanceof Date ? date : new Date(date);
// 返回格式化的字符串: YYYY-MM-DD HH:MM:SS.SSS
return dateObj.toISOString().replace('T', ' ').replace('Z', '');
};
console.log(`\n准备格式化 ${shorturls.length} 条记录为ClickHouse格式`);
return shorturls.map(shorturl => {
// 调试日志:输出关键字段
console.log(`处理资源: ${shorturl.id}`);
console.log(`slug: ${shorturl.slug || 'EMPTY'}`);
console.log(`originalUrl: ${shorturl.originalUrl || 'EMPTY'}`);
// 记录attributes状态
const attributesStr = JSON.stringify(shorturl.attributes || {});
const attributesPrev = attributesStr.length > 100 ?
attributesStr.substring(0, 100) + '...' :
attributesStr;
console.log(`attributes: ${attributesPrev}`);
const creatorName = [shorturl.creator_first_name, shorturl.creator_last_name]
.filter(Boolean)
.join(" ");
// 格式化项目数据为JSON数组
const projects = JSON.stringify((shorturl.projects as Record<string, unknown>[]).map((p) => ({
project_id: p.project_id,
project_name: p.project_name,
project_description: p.project_description,
assigned_at: p.assigned_at
})));
// 格式化团队数据为JSON数组
const teams = JSON.stringify((shorturl.teams as Record<string, unknown>[]).map((t) => ({
team_id: t.team_id,
team_name: t.team_name,
team_description: t.team_description,
via_project_id: t.project_id
})));
// 格式化标签数据为JSON数组
const tags = JSON.stringify((shorturl.tags as Record<string, unknown>[]).map((t) => ({
tag_id: t.tag_id,
tag_name: t.tag_name,
tag_type: t.tag_type,
created_at: t.created_at
})));
// 格式化QR码数据为JSON数组
const qrCodes = JSON.stringify((shorturl.qrCodes as Record<string, unknown>[]).map((q) => ({
qr_id: q.qr_id,
scan_count: q.scan_count,
url: q.url,
template_name: q.template_name,
created_at: q.created_at
})));
// 格式化渠道数据为JSON数组
const channels = JSON.stringify((shorturl.channels as Record<string, unknown>[]).map((c) => ({
channel_id: c.channel_id,
channel_name: c.channel_name,
channel_path: c.channel_path,
is_user_created: c.is_user_created
})));
// 格式化收藏数据为JSON数组
const favorites = JSON.stringify((shorturl.favorites as Record<string, unknown>[]).map((f) => ({
favorite_id: f.favorite_id,
user_id: f.user_id,
user_name: `${f.first_name || ""} ${f.last_name || ""}`.trim(),
created_at: f.created_at
})));
// 统计信息可通过events表聚合或在其他地方设置
const clickCount = (shorturl.attributes as ResourceAttributes).click_count as number || 0;
const uniqueVisitors = 0;
// 返回ClickHouse格式数据
return {
id: shorturl.id,
external_id: shorturl.external_id || "",
type: shorturl.type,
slug: shorturl.slug || "",
original_url: shorturl.originalUrl || "",
title: shorturl.title || "",
description: shorturl.description || "",
attributes: JSON.stringify(shorturl.attributes || {}),
schema_version: shorturl.schema_version || 1,
creator_id: shorturl.creator_id || "",
creator_email: shorturl.creator_email || "",
creator_name: creatorName,
created_at: formatDateTime(shorturl.created_at as Date),
updated_at: formatDateTime(shorturl.updated_at as Date),
deleted_at: formatDateTime(shorturl.deleted_at as Date | null),
projects,
teams,
tags,
qr_codes: qrCodes,
channels,
favorites,
expires_at: formatDateTime(shorturl.expiresAt as Date | null),
click_count: clickCount,
unique_visitors: uniqueVisitors
};
});
}
// 获取ClickHouse配置
async function getClickHouseConfig(): Promise<ChConfig> {
try {
// 使用getVariable而不是getResource获取ClickHouse配置
const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse");
console.log("原始ClickHouse配置:", typeof chConfigJson);
// 确保配置不为空
if (!chConfigJson) {
throw new Error("未找到ClickHouse配置");
}
// 解析JSON字符串为对象
let chConfig: ChConfig;
if (typeof chConfigJson === 'string') {
try {
chConfig = JSON.parse(chConfigJson);
} catch (parseError) {
console.error("解析JSON失败:", parseError);
throw new Error("ClickHouse配置不是有效的JSON");
}
} else {
chConfig = chConfigJson as ChConfig;
}
// 验证配置
if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) {
chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`;
console.log(`已构建ClickHouse URL: ${chConfig.clickhouse_url}`);
}
if (!chConfig.clickhouse_url) {
throw new Error("ClickHouse配置缺少URL");
}
return chConfig;
} catch (error) {
console.error("获取ClickHouse配置失败:", error);
throw error;
}
}
// 写入数据到ClickHouse
async function insertToClickhouse(data: Record<string, unknown>[]) {
if (data.length === 0) return 0;
// 获取ClickHouse连接信息
const chConfig = await getClickHouseConfig();
// 确保URL有效
if (!chConfig.clickhouse_url) {
throw new Error("无效的ClickHouse URL: 未定义");
}
console.log(`准备写入数据到ClickHouse: ${chConfig.clickhouse_url}`);
// 构建INSERT查询
const columns = Object.keys(data[0]).join(", ");
// 收集所有记录的ID
const recordIds = data.map(record => record.id as string);
console.log(`需要处理的记录数: ${recordIds.length}`);
// 先删除可能存在的重复记录
try {
console.log(`删除可能存在的重复记录...`);
// 按批次处理删除,避免请求过大
const deleteBatchSize = 100;
for (let i = 0; i < recordIds.length; i += deleteBatchSize) {
const idBatch = recordIds.slice(i, i + deleteBatchSize);
const formattedIds = idBatch.map(id => `'${id}'`).join(', ');
const deleteQuery = `
ALTER TABLE shorturl_analytics.shorturl
DELETE WHERE id IN (${formattedIds})
`;
const response = await fetch(chConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}`
},
body: deleteQuery,
});
if (!response.ok) {
const errorText = await response.text();
console.warn(`删除记录时出错 (批次 ${i/deleteBatchSize + 1}): ${errorText}`);
// 继续执行,不中断流程
} else {
console.log(`成功删除批次 ${i/deleteBatchSize + 1}/${Math.ceil(recordIds.length/deleteBatchSize)}的潜在重复记录`);
}
}
} catch (error) {
console.warn(`删除重复记录时出错: ${(error as Error).message}`);
// 继续执行,不因为删除失败而中断整个过程
}
const query = `
INSERT INTO shorturl_analytics.shorturl (${columns})
FORMAT JSONEachRow
`;
// 批量插入
let inserted = 0;
const batchSize = 100;
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
// 使用JSONEachRow格式
const rows = batch.map(row => JSON.stringify(row)).join('\n');
// 使用HTTP接口执行查询
try {
console.log(`正在发送请求到: ${chConfig.clickhouse_url}`);
console.log(`认证信息: ${chConfig.clickhouse_user}:***`);
const response = await fetch(chConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}`
},
body: `${query}\n${rows}`,
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse插入失败: ${errorText}`);
}
inserted += batch.length;
console.log(`已插入 ${inserted}/${data.length} 条记录`);
} catch (error) {
console.error(`请求ClickHouse时出错:`, error);
throw error;
}
}
return inserted;
}