Files
shorturl-analytics/windmill/sync_shorturl_to_clickhouse_intime.ts
2025-04-09 19:20:40 +08:00

660 lines
19 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// 文件名: sync_resource_relations.ts
// 描述: 此脚本用于同步PostgreSQL中资源关联数据到ClickHouse
// 作者: AI Assistant
// 创建日期: 2023-10-31
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
// PostgreSQL配置接口
interface PgConfig {
host: string;
port: number;
user: string;
password: string;
dbname?: string;
[key: string]: unknown;
}
// ClickHouse配置接口
interface ChConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_url?: string;
}
// 资源相关接口定义
interface TeamData {
team_id: string;
team_name: string;
team_description?: string;
project_id?: string;
}
interface ProjectData {
project_id: string;
project_name: string;
project_description?: string;
assigned_at?: string;
resource_id?: string;
}
interface TagData {
tag_id: string;
tag_name: string;
tag_type?: string;
created_at?: string;
resource_id?: string;
}
interface FavoriteData {
favorite_id: string;
user_id: string;
first_name?: string;
last_name?: string;
email?: string;
created_at?: string;
}
// 资源关联数据接口
interface ResourceRelations {
resource_id: string;
teams?: TeamData[];
projects?: ProjectData[];
tags?: TagData[];
favorites?: FavoriteData[];
external_id?: string;
type?: string;
attributes?: Record<string, unknown>;
}
/**
* 同步PostgreSQL资源关联数据到ClickHouse
*/
export async function main(
params: {
/** 要同步的资源ID列表 */
resource_ids: string[];
/** 是否同步teams数据 */
sync_teams?: boolean;
/** 是否同步projects数据 */
sync_projects?: boolean;
/** 是否同步tags数据 */
sync_tags?: boolean;
/** 是否同步favorites数据 */
sync_favorites?: boolean;
/** 是否为测试模式(不执行实际更新) */
dry_run?: boolean;
/** 是否显示详细日志 */
verbose?: boolean;
}
) {
// 设置默认参数
const resource_ids = params.resource_ids || [];
const sync_teams = params.sync_teams !== false;
const sync_projects = params.sync_projects !== false;
const sync_tags = params.sync_tags !== false;
const sync_favorites = params.sync_favorites !== false;
const dry_run = params.dry_run || false;
const verbose = params.verbose || false;
if (resource_ids.length === 0) {
return { success: false, message: "至少需要提供一个资源ID" };
}
// 初始化日志函数
const log = (message: string, isVerbose = false) => {
if (!isVerbose || verbose) {
console.log(message);
}
};
log(`开始同步资源关联数据: ${resource_ids.join(", ")}`);
log(`同步选项: teams=${sync_teams}, projects=${sync_projects}, tags=${sync_tags}, favorites=${sync_favorites}`, true);
let pgPool: Pool | null = null;
try {
// 1. 获取数据库配置
log("获取PostgreSQL数据库配置...", true);
const pgConfig = await getResource('f/limq/postgresql') as PgConfig;
// 2. 创建PostgreSQL连接池
pgPool = new Pool({
hostname: pgConfig.host,
port: pgConfig.port,
user: pgConfig.user,
password: pgConfig.password,
database: pgConfig.dbname || 'postgres'
}, 3);
// 3. 获取需要更新的资源完整数据
const resourcesData = await getResourcesWithRelations(pgPool, resource_ids, {
sync_teams,
sync_projects,
sync_tags,
sync_favorites
}, log);
log(`成功获取 ${resourcesData.length} 个资源的关联数据`);
if (resourcesData.length === 0) {
return { success: true, message: "没有找到需要更新的资源数据", updated: 0 };
}
// 4. 获取ClickHouse配置
const chConfig = await getClickHouseConfig();
// 5. 对每个资源执行更新
if (!dry_run) {
// 5a. 更新shorturl表数据
const shorturlUpdated = await updateClickHouseShorturl(resourcesData, chConfig, log);
// 5b. 更新events表数据
const eventsUpdated = await updateClickHouseEvents(resourcesData, chConfig, log);
return {
success: true,
message: "资源关联数据同步完成",
shorturl_updated: shorturlUpdated,
events_updated: eventsUpdated,
total_updated: shorturlUpdated + eventsUpdated
};
} else {
log("测试模式: 不执行实际更新");
if (resourcesData.length > 0) {
log("示例数据:");
log(JSON.stringify(resourcesData[0], null, 2));
}
return { success: true, dry_run: true, resources: resourcesData };
}
} catch (error) {
const errorMessage = `同步过程中发生错误: ${(error as Error).message}`;
log(errorMessage);
if ((error as Error).stack) {
log(`错误堆栈: ${(error as Error).stack}`, true);
}
return { success: false, message: errorMessage };
} finally {
if (pgPool) {
await pgPool.end();
log("PostgreSQL连接池已关闭", true);
}
}
}
/**
* 从PostgreSQL获取资源及其关联数据
*/
async function getResourcesWithRelations(
pgPool: Pool,
resourceIds: string[],
options: {
sync_teams: boolean;
sync_projects: boolean;
sync_tags: boolean;
sync_favorites: boolean;
},
log: (message: string, isVerbose?: boolean) => void
): Promise<ResourceRelations[]> {
const client = await pgPool.connect();
try {
// 准备资源IDs参数
const resourceIdsParam = resourceIds.map(id => `'${id}'`).join(',');
// 1. 获取基本资源信息
log(`获取资源基本信息: ${resourceIdsParam}`, true);
const resourcesQuery = `
SELECT
r.id,
r.external_id,
r.type,
r.attributes,
r.schema_version,
r.created_at,
r.updated_at
FROM
limq.resources r
WHERE
r.id IN (${resourceIdsParam})
AND r.deleted_at IS NULL
`;
const resourcesResult = await client.queryObject(resourcesQuery);
if (resourcesResult.rows.length === 0) {
log(`未找到有效的资源数据`, true);
return [];
}
// 处理每个资源
const enrichedResources: ResourceRelations[] = [];
for (const resource of resourcesResult.rows) {
const resourceId = resource.id as string;
log(`处理资源ID: ${resourceId}`, true);
// 初始化关联数据对象
const relationData: ResourceRelations = {
resource_id: resourceId,
external_id: resource.external_id as string,
type: resource.type as string,
attributes: parseJsonField(resource.attributes)
};
// 2. 获取项目关联
if (options.sync_projects) {
const projectsQuery = `
SELECT
pr.resource_id, pr.project_id,
p.name as project_name, p.description as project_description,
pr.assigned_at
FROM
limq.project_resources pr
JOIN
limq.projects p ON pr.project_id = p.id
WHERE
pr.resource_id = $1
AND p.deleted_at IS NULL
`;
const projectsResult = await client.queryObject(projectsQuery, [resourceId]);
relationData.projects = projectsResult.rows as ProjectData[];
log(`找到 ${projectsResult.rows.length} 个关联项目`, true);
}
// 3. 获取标签关联
if (options.sync_tags) {
const tagsQuery = `
SELECT
rt.resource_id, rt.tag_id, rt.created_at,
t.name as tag_name, t.type as tag_type
FROM
limq.resource_tags rt
JOIN
limq.tags t ON rt.tag_id = t.id
WHERE
rt.resource_id = $1
AND t.deleted_at IS NULL
`;
const tagsResult = await client.queryObject(tagsQuery, [resourceId]);
relationData.tags = tagsResult.rows as TagData[];
log(`找到 ${tagsResult.rows.length} 个关联标签`, true);
}
// 4. 获取团队关联(通过项目)
if (options.sync_teams && relationData.projects && relationData.projects.length > 0) {
const projectIds = relationData.projects.map((p: ProjectData) => p.project_id);
if (projectIds.length > 0) {
const teamsQuery = `
SELECT
tp.team_id, tp.project_id,
t.name as team_name, t.description as team_description
FROM
limq.team_projects tp
JOIN
limq.teams t ON tp.team_id = t.id
WHERE
tp.project_id = ANY($1::uuid[])
AND t.deleted_at IS NULL
`;
const teamsResult = await client.queryObject(teamsQuery, [projectIds]);
relationData.teams = teamsResult.rows as TeamData[];
log(`找到 ${teamsResult.rows.length} 个关联团队`, true);
}
}
// 5. 获取收藏关联
if (options.sync_favorites) {
const favoritesQuery = `
SELECT
f.id as favorite_id, f.user_id, f.created_at,
u.first_name, u.last_name, u.email
FROM
limq.favorite f
JOIN
limq.users u ON f.user_id = u.id
WHERE
f.favoritable_id = $1
AND f.favoritable_type = 'resource'
AND f.deleted_at IS NULL
`;
const favoritesResult = await client.queryObject(favoritesQuery, [resourceId]);
relationData.favorites = favoritesResult.rows as FavoriteData[];
log(`找到 ${favoritesResult.rows.length} 个收藏记录`, true);
}
// 添加到结果集
enrichedResources.push(relationData);
}
return enrichedResources;
} finally {
client.release();
}
}
/**
* 更新ClickHouse中的shorturl表数据
*/
async function updateClickHouseShorturl(
resources: ResourceRelations[],
chConfig: ChConfig,
log: (message: string, isVerbose?: boolean) => void
): Promise<number> {
// 只处理类型为shorturl的资源
const shorturls = resources.filter(r => r.type === 'shorturl');
if (shorturls.length === 0) {
log('没有找到shorturl类型的资源跳过shorturl表更新');
return 0;
}
log(`准备更新 ${shorturls.length} 个shorturl资源`);
let updatedCount = 0;
// 检查ClickHouse中是否存在shorturl表
const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.shorturl');
if (!tableExists) {
log('ClickHouse中未找到shorturl表请先创建表');
return 0;
}
// 对每个资源执行更新
for (const shorturl of shorturls) {
try {
// 格式化团队数据
const teams = JSON.stringify(shorturl.teams || []);
// 格式化项目数据
const projects = JSON.stringify(shorturl.projects || []);
// 格式化标签数据
const tags = JSON.stringify((shorturl.tags || []).map((t: TagData) => ({
tag_id: t.tag_id,
tag_name: t.tag_name,
tag_type: t.tag_type,
created_at: t.created_at
})));
// 格式化收藏数据
const favorites = JSON.stringify((shorturl.favorites || []).map((f: FavoriteData) => ({
favorite_id: f.favorite_id,
user_id: f.user_id,
user_name: `${f.first_name || ""} ${f.last_name || ""}`.trim(),
created_at: f.created_at
})));
// 尝试更新ClickHouse数据
const updateQuery = `
ALTER TABLE shorturl_analytics.shorturl
UPDATE
teams = '${escapeString(teams)}',
projects = '${escapeString(projects)}',
tags = '${escapeString(tags)}',
favorites = '${escapeString(favorites)}'
WHERE id = '${shorturl.resource_id}'
`;
await executeClickHouseQuery(chConfig, updateQuery);
log(`更新shorturl完成: ${shorturl.resource_id}`, true);
updatedCount++;
} catch (error) {
log(`更新shorturl ${shorturl.resource_id} 失败: ${(error as Error).message}`);
}
}
return updatedCount;
}
/**
* 更新ClickHouse中的events表数据
*/
async function updateClickHouseEvents(
resources: ResourceRelations[],
chConfig: ChConfig,
log: (message: string, isVerbose?: boolean) => void
): Promise<number> {
// 过滤出有external_id的资源
const resourcesWithExternalId = resources.filter(r => r.external_id && r.external_id.trim() !== '');
if (resourcesWithExternalId.length === 0) {
log('没有找到具有external_id的资源跳过events表更新');
return 0;
}
log(`准备更新events表中与 ${resourcesWithExternalId.length} 个外部ID相关的记录`);
// 检查ClickHouse中是否存在events表
const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.events');
if (!tableExists) {
log('ClickHouse中未找到events表请先创建表');
return 0;
}
// 提取所有的external_id
const externalIds = resourcesWithExternalId.map(r => r.external_id).filter(Boolean) as string[];
// 构建资源数据映射使用external_id作为键
const resourceMapByExternalId = resourcesWithExternalId.reduce((map, resource) => {
if (resource.external_id) {
map[resource.external_id] = resource;
}
return map;
}, {} as Record<string, ResourceRelations>);
// 获取ClickHouse中相关资源的事件记录数量
let updatedCount = 0;
try {
// 格式化外部ID列表
const formattedExternalIds = externalIds.map(id => `'${id}'`).join(', ');
// 先查询是否有相关事件
const countQuery = `
SELECT COUNT(*) as count
FROM shorturl_analytics.events
WHERE event_id IN (${formattedExternalIds})
`;
const countResult = await executeClickHouseQuery(chConfig, countQuery);
const eventCount = parseInt(countResult.trim(), 10);
if (eventCount === 0) {
// 尝试另一种查询方式
const alternateCountQuery = `
SELECT COUNT(*) as count
FROM shorturl_analytics.events
WHERE link_id IN (${formattedExternalIds})
`;
const alternateCountResult = await executeClickHouseQuery(chConfig, alternateCountQuery);
const alternateEventCount = parseInt(alternateCountResult.trim(), 10);
if (alternateEventCount === 0) {
log('没有找到相关事件记录跳过events表更新');
log(`已尝试的匹配字段: event_idlink_id`, true);
return 0;
} else {
log(`找到 ${alternateEventCount} 条以link_id匹配的事件记录需要更新`);
}
} else {
log(`找到 ${eventCount} 条以event_id匹配的事件记录需要更新`);
}
// 批量更新每个资源相关的事件记录
for (const externalId of externalIds) {
const resource = resourceMapByExternalId[externalId];
if (!resource) continue;
// 获取关联数据
const tags = resource.tags ? JSON.stringify(resource.tags) : null;
if (tags) {
// 尝试通过event_id更新事件标签
const updateTagsQueryByEventId = `
ALTER TABLE shorturl_analytics.events
UPDATE link_tags = '${escapeString(tags)}'
WHERE event_id = '${externalId}'
`;
await executeClickHouseQuery(chConfig, updateTagsQueryByEventId);
log(`尝试通过event_id更新事件标签: ${externalId}`, true);
// 尝试通过link_id更新事件标签
const updateTagsQueryByLinkId = `
ALTER TABLE shorturl_analytics.events
UPDATE link_tags = '${escapeString(tags)}'
WHERE link_id = '${externalId}'
`;
await executeClickHouseQuery(chConfig, updateTagsQueryByLinkId);
log(`尝试通过link_id更新事件标签: ${externalId}`, true);
}
// 如果资源有resource_id也尝试使用它来更新
if (resource.resource_id) {
const updateByResourceId = `
ALTER TABLE shorturl_analytics.events
UPDATE link_tags = '${escapeString(tags || '[]')}'
WHERE link_id = '${resource.resource_id}'
`;
await executeClickHouseQuery(chConfig, updateByResourceId);
log(`尝试通过resource_id更新事件标签: ${resource.resource_id}`, true);
}
updatedCount++;
}
log(`已尝试更新 ${updatedCount} 个资源的事件记录`);
} catch (error) {
log(`更新events表失败: ${(error as Error).message}`);
}
return updatedCount;
}
/**
* 获取ClickHouse配置
*/
async function getClickHouseConfig(): Promise<ChConfig> {
try {
const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse");
// 确保配置不为空
if (!chConfigJson) {
throw new Error("未找到ClickHouse配置");
}
// 解析JSON字符串为对象
let chConfig: ChConfig;
if (typeof chConfigJson === 'string') {
try {
chConfig = JSON.parse(chConfigJson);
} catch (_) {
throw new Error("ClickHouse配置不是有效的JSON");
}
} else {
chConfig = chConfigJson as ChConfig;
}
// 验证并构建URL
if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) {
chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`;
}
if (!chConfig.clickhouse_url) {
throw new Error("ClickHouse配置缺少URL");
}
return chConfig;
} catch (error) {
throw new Error(`获取ClickHouse配置失败: ${(error as Error).message}`);
}
}
/**
* 检查ClickHouse中是否存在指定表
*/
async function checkClickHouseTable(chConfig: ChConfig, tableName: string): Promise<boolean> {
try {
const query = `EXISTS TABLE ${tableName}`;
const result = await executeClickHouseQuery(chConfig, query);
return result.trim() === '1';
} catch (error) {
console.error(`检查表 ${tableName} 失败:`, error);
return false;
}
}
/**
* 执行ClickHouse查询
*/
async function executeClickHouseQuery(chConfig: ChConfig, query: string): Promise<string> {
// 确保URL有效
if (!chConfig.clickhouse_url) {
throw new Error("无效的ClickHouse URL: 未定义");
}
// 执行HTTP请求
try {
const response = await fetch(chConfig.clickhouse_url, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}`
},
body: query,
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询失败 (${response.status}): ${errorText}`);
}
return await response.text();
} catch (error) {
throw new Error(`执行ClickHouse查询失败: ${(error as Error).message}`);
}
}
/**
* 解析JSON字段
*/
function parseJsonField(field: unknown): Record<string, unknown> {
if (!field) return {};
try {
if (typeof field === 'string') {
return JSON.parse(field);
} else if (typeof field === 'object') {
return field as Record<string, unknown>;
}
} catch (error) {
console.warn(`无法解析JSON字段:`, error);
}
return {};
}
/**
* 转义字符串避免SQL注入
*/
function escapeString(str: string): string {
if (!str) return '';
return str.replace(/'/g, "''");
}