// 文件名: sync_resource_relations.ts // 描述: 此脚本用于同步PostgreSQL中资源关联数据到ClickHouse // 作者: AI Assistant // 创建日期: 2023-10-31 import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts"; import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts"; // PostgreSQL配置接口 interface PgConfig { host: string; port: number; user: string; password: string; dbname?: string; [key: string]: unknown; } // ClickHouse配置接口 interface ChConfig { clickhouse_host: string; clickhouse_port: number; clickhouse_user: string; clickhouse_password: string; clickhouse_url?: string; } // 资源相关接口定义 interface TeamData { team_id: string; team_name: string; team_description?: string; project_id?: string; } interface ProjectData { project_id: string; project_name: string; project_description?: string; assigned_at?: string; resource_id?: string; } interface TagData { tag_id: string; tag_name: string; tag_type?: string; created_at?: string; resource_id?: string; } interface FavoriteData { favorite_id: string; user_id: string; first_name?: string; last_name?: string; email?: string; created_at?: string; } // 资源关联数据接口 interface ResourceRelations { resource_id: string; teams?: TeamData[]; projects?: ProjectData[]; tags?: TagData[]; favorites?: FavoriteData[]; external_id?: string; type?: string; attributes?: Record; } /** * 同步PostgreSQL资源关联数据到ClickHouse */ export async function main( params: { /** 要同步的资源ID列表 */ resource_ids: string[]; /** 是否同步teams数据 */ sync_teams?: boolean; /** 是否同步projects数据 */ sync_projects?: boolean; /** 是否同步tags数据 */ sync_tags?: boolean; /** 是否同步favorites数据 */ sync_favorites?: boolean; /** 是否为测试模式(不执行实际更新) */ dry_run?: boolean; /** 是否显示详细日志 */ verbose?: boolean; } ) { // 设置默认参数 const resource_ids = params.resource_ids || []; const sync_teams = params.sync_teams !== false; const sync_projects = params.sync_projects !== false; const sync_tags = params.sync_tags !== false; const sync_favorites = params.sync_favorites !== false; const dry_run = params.dry_run || false; const verbose = params.verbose || false; if (resource_ids.length === 0) { return { success: false, message: "至少需要提供一个资源ID" }; } // 初始化日志函数 const log = (message: string, isVerbose = false) => { if (!isVerbose || verbose) { console.log(message); } }; log(`开始同步资源关联数据: ${resource_ids.join(", ")}`); log(`同步选项: teams=${sync_teams}, projects=${sync_projects}, tags=${sync_tags}, favorites=${sync_favorites}`, true); let pgPool: Pool | null = null; try { // 1. 获取数据库配置 log("获取PostgreSQL数据库配置...", true); const pgConfig = await getResource('f/limq/postgresql') as PgConfig; // 2. 创建PostgreSQL连接池 pgPool = new Pool({ hostname: pgConfig.host, port: pgConfig.port, user: pgConfig.user, password: pgConfig.password, database: pgConfig.dbname || 'postgres' }, 3); // 3. 获取需要更新的资源完整数据 const resourcesData = await getResourcesWithRelations(pgPool, resource_ids, { sync_teams, sync_projects, sync_tags, sync_favorites }, log); log(`成功获取 ${resourcesData.length} 个资源的关联数据`); if (resourcesData.length === 0) { return { success: true, message: "没有找到需要更新的资源数据", updated: 0 }; } // 4. 获取ClickHouse配置 const chConfig = await getClickHouseConfig(); // 5. 对每个资源执行更新 if (!dry_run) { // 5a. 更新shorturl表数据 const shorturlUpdated = await updateClickHouseShorturl(resourcesData, chConfig, log); // 5b. 更新events表数据 const eventsUpdated = await updateClickHouseEvents(resourcesData, chConfig, log); return { success: true, message: "资源关联数据同步完成", shorturl_updated: shorturlUpdated, events_updated: eventsUpdated, total_updated: shorturlUpdated + eventsUpdated }; } else { log("测试模式: 不执行实际更新"); if (resourcesData.length > 0) { log("示例数据:"); log(JSON.stringify(resourcesData[0], null, 2)); } return { success: true, dry_run: true, resources: resourcesData }; } } catch (error) { const errorMessage = `同步过程中发生错误: ${(error as Error).message}`; log(errorMessage); if ((error as Error).stack) { log(`错误堆栈: ${(error as Error).stack}`, true); } return { success: false, message: errorMessage }; } finally { if (pgPool) { await pgPool.end(); log("PostgreSQL连接池已关闭", true); } } } /** * 从PostgreSQL获取资源及其关联数据 */ async function getResourcesWithRelations( pgPool: Pool, resourceIds: string[], options: { sync_teams: boolean; sync_projects: boolean; sync_tags: boolean; sync_favorites: boolean; }, log: (message: string, isVerbose?: boolean) => void ): Promise { const client = await pgPool.connect(); try { // 准备资源IDs参数 const resourceIdsParam = resourceIds.map(id => `'${id}'`).join(','); // 1. 获取基本资源信息 log(`获取资源基本信息: ${resourceIdsParam}`, true); const resourcesQuery = ` SELECT r.id, r.external_id, r.type, r.attributes, r.schema_version, r.created_at, r.updated_at FROM limq.resources r WHERE r.id IN (${resourceIdsParam}) AND r.deleted_at IS NULL `; const resourcesResult = await client.queryObject(resourcesQuery); if (resourcesResult.rows.length === 0) { log(`未找到有效的资源数据`, true); return []; } // 处理每个资源 const enrichedResources: ResourceRelations[] = []; for (const resource of resourcesResult.rows) { const resourceId = resource.id as string; log(`处理资源ID: ${resourceId}`, true); // 初始化关联数据对象 const relationData: ResourceRelations = { resource_id: resourceId, external_id: resource.external_id as string, type: resource.type as string, attributes: parseJsonField(resource.attributes) }; // 2. 获取项目关联 if (options.sync_projects) { const projectsQuery = ` SELECT pr.resource_id, pr.project_id, p.name as project_name, p.description as project_description, pr.assigned_at FROM limq.project_resources pr JOIN limq.projects p ON pr.project_id = p.id WHERE pr.resource_id = $1 AND p.deleted_at IS NULL `; const projectsResult = await client.queryObject(projectsQuery, [resourceId]); relationData.projects = projectsResult.rows as ProjectData[]; log(`找到 ${projectsResult.rows.length} 个关联项目`, true); } // 3. 获取标签关联 if (options.sync_tags) { const tagsQuery = ` SELECT rt.resource_id, rt.tag_id, rt.created_at, t.name as tag_name, t.type as tag_type FROM limq.resource_tags rt JOIN limq.tags t ON rt.tag_id = t.id WHERE rt.resource_id = $1 AND t.deleted_at IS NULL `; const tagsResult = await client.queryObject(tagsQuery, [resourceId]); relationData.tags = tagsResult.rows as TagData[]; log(`找到 ${tagsResult.rows.length} 个关联标签`, true); } // 4. 获取团队关联(通过项目) if (options.sync_teams && relationData.projects && relationData.projects.length > 0) { const projectIds = relationData.projects.map((p: ProjectData) => p.project_id); if (projectIds.length > 0) { const teamsQuery = ` SELECT tp.team_id, tp.project_id, t.name as team_name, t.description as team_description FROM limq.team_projects tp JOIN limq.teams t ON tp.team_id = t.id WHERE tp.project_id = ANY($1::uuid[]) AND t.deleted_at IS NULL `; const teamsResult = await client.queryObject(teamsQuery, [projectIds]); relationData.teams = teamsResult.rows as TeamData[]; log(`找到 ${teamsResult.rows.length} 个关联团队`, true); } } // 5. 获取收藏关联 if (options.sync_favorites) { const favoritesQuery = ` SELECT f.id as favorite_id, f.user_id, f.created_at, u.first_name, u.last_name, u.email FROM limq.favorite f JOIN limq.users u ON f.user_id = u.id WHERE f.favoritable_id = $1 AND f.favoritable_type = 'resource' AND f.deleted_at IS NULL `; const favoritesResult = await client.queryObject(favoritesQuery, [resourceId]); relationData.favorites = favoritesResult.rows as FavoriteData[]; log(`找到 ${favoritesResult.rows.length} 个收藏记录`, true); } // 添加到结果集 enrichedResources.push(relationData); } return enrichedResources; } finally { client.release(); } } /** * 更新ClickHouse中的shorturl表数据 */ async function updateClickHouseShorturl( resources: ResourceRelations[], chConfig: ChConfig, log: (message: string, isVerbose?: boolean) => void ): Promise { // 只处理类型为shorturl的资源 const shorturls = resources.filter(r => r.type === 'shorturl'); if (shorturls.length === 0) { log('没有找到shorturl类型的资源,跳过shorturl表更新'); return 0; } log(`准备更新 ${shorturls.length} 个shorturl资源`); let updatedCount = 0; // 检查ClickHouse中是否存在shorturl表 const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.shorturl'); if (!tableExists) { log('ClickHouse中未找到shorturl表,请先创建表'); return 0; } // 对每个资源执行更新 for (const shorturl of shorturls) { try { // 格式化团队数据 const teams = JSON.stringify(shorturl.teams || []); // 格式化项目数据 const projects = JSON.stringify(shorturl.projects || []); // 格式化标签数据 const tags = JSON.stringify((shorturl.tags || []).map((t: TagData) => ({ tag_id: t.tag_id, tag_name: t.tag_name, tag_type: t.tag_type, created_at: t.created_at }))); // 格式化收藏数据 const favorites = JSON.stringify((shorturl.favorites || []).map((f: FavoriteData) => ({ favorite_id: f.favorite_id, user_id: f.user_id, user_name: `${f.first_name || ""} ${f.last_name || ""}`.trim(), created_at: f.created_at }))); // 尝试更新ClickHouse数据 const updateQuery = ` ALTER TABLE shorturl_analytics.shorturl UPDATE teams = '${escapeString(teams)}', projects = '${escapeString(projects)}', tags = '${escapeString(tags)}', favorites = '${escapeString(favorites)}' WHERE id = '${shorturl.resource_id}' `; await executeClickHouseQuery(chConfig, updateQuery); log(`更新shorturl完成: ${shorturl.resource_id}`, true); updatedCount++; } catch (error) { log(`更新shorturl ${shorturl.resource_id} 失败: ${(error as Error).message}`); } } return updatedCount; } /** * 更新ClickHouse中的events表数据 */ async function updateClickHouseEvents( resources: ResourceRelations[], chConfig: ChConfig, log: (message: string, isVerbose?: boolean) => void ): Promise { // 过滤出有external_id的资源 const resourcesWithExternalId = resources.filter(r => r.external_id && r.external_id.trim() !== ''); if (resourcesWithExternalId.length === 0) { log('没有找到具有external_id的资源,跳过events表更新'); return 0; } log(`准备更新events表中与 ${resourcesWithExternalId.length} 个外部ID相关的记录`); // 检查ClickHouse中是否存在events表 const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.events'); if (!tableExists) { log('ClickHouse中未找到events表,请先创建表'); return 0; } // 提取所有的external_id const externalIds = resourcesWithExternalId.map(r => r.external_id).filter(Boolean) as string[]; // 构建资源数据映射(使用external_id作为键) const resourceMapByExternalId = resourcesWithExternalId.reduce((map, resource) => { if (resource.external_id) { map[resource.external_id] = resource; } return map; }, {} as Record); // 获取ClickHouse中相关资源的事件记录数量 let updatedCount = 0; try { // 格式化外部ID列表 const formattedExternalIds = externalIds.map(id => `'${id}'`).join(', '); // 先查询是否有相关事件 const countQuery = ` SELECT COUNT(*) as count FROM shorturl_analytics.events WHERE event_id IN (${formattedExternalIds}) `; const countResult = await executeClickHouseQuery(chConfig, countQuery); const eventCount = parseInt(countResult.trim(), 10); if (eventCount === 0) { // 尝试另一种查询方式 const alternateCountQuery = ` SELECT COUNT(*) as count FROM shorturl_analytics.events WHERE link_id IN (${formattedExternalIds}) `; const alternateCountResult = await executeClickHouseQuery(chConfig, alternateCountQuery); const alternateEventCount = parseInt(alternateCountResult.trim(), 10); if (alternateEventCount === 0) { log('没有找到相关事件记录,跳过events表更新'); log(`已尝试的匹配字段: event_id,link_id`, true); return 0; } else { log(`找到 ${alternateEventCount} 条以link_id匹配的事件记录需要更新`); } } else { log(`找到 ${eventCount} 条以event_id匹配的事件记录需要更新`); } // 批量更新每个资源相关的事件记录 for (const externalId of externalIds) { const resource = resourceMapByExternalId[externalId]; if (!resource) continue; // 获取关联数据 const tags = resource.tags ? JSON.stringify(resource.tags) : null; if (tags) { // 尝试通过event_id更新事件标签 const updateTagsQueryByEventId = ` ALTER TABLE shorturl_analytics.events UPDATE link_tags = '${escapeString(tags)}' WHERE event_id = '${externalId}' `; await executeClickHouseQuery(chConfig, updateTagsQueryByEventId); log(`尝试通过event_id更新事件标签: ${externalId}`, true); // 尝试通过link_id更新事件标签 const updateTagsQueryByLinkId = ` ALTER TABLE shorturl_analytics.events UPDATE link_tags = '${escapeString(tags)}' WHERE link_id = '${externalId}' `; await executeClickHouseQuery(chConfig, updateTagsQueryByLinkId); log(`尝试通过link_id更新事件标签: ${externalId}`, true); } // 如果资源有resource_id,也尝试使用它来更新 if (resource.resource_id) { const updateByResourceId = ` ALTER TABLE shorturl_analytics.events UPDATE link_tags = '${escapeString(tags || '[]')}' WHERE link_id = '${resource.resource_id}' `; await executeClickHouseQuery(chConfig, updateByResourceId); log(`尝试通过resource_id更新事件标签: ${resource.resource_id}`, true); } updatedCount++; } log(`已尝试更新 ${updatedCount} 个资源的事件记录`); } catch (error) { log(`更新events表失败: ${(error as Error).message}`); } return updatedCount; } /** * 获取ClickHouse配置 */ async function getClickHouseConfig(): Promise { try { const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse"); // 确保配置不为空 if (!chConfigJson) { throw new Error("未找到ClickHouse配置"); } // 解析JSON字符串为对象 let chConfig: ChConfig; if (typeof chConfigJson === 'string') { try { chConfig = JSON.parse(chConfigJson); } catch (_) { throw new Error("ClickHouse配置不是有效的JSON"); } } else { chConfig = chConfigJson as ChConfig; } // 验证并构建URL if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) { chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`; } if (!chConfig.clickhouse_url) { throw new Error("ClickHouse配置缺少URL"); } return chConfig; } catch (error) { throw new Error(`获取ClickHouse配置失败: ${(error as Error).message}`); } } /** * 检查ClickHouse中是否存在指定表 */ async function checkClickHouseTable(chConfig: ChConfig, tableName: string): Promise { try { const query = `EXISTS TABLE ${tableName}`; const result = await executeClickHouseQuery(chConfig, query); return result.trim() === '1'; } catch (error) { console.error(`检查表 ${tableName} 失败:`, error); return false; } } /** * 执行ClickHouse查询 */ async function executeClickHouseQuery(chConfig: ChConfig, query: string): Promise { // 确保URL有效 if (!chConfig.clickhouse_url) { throw new Error("无效的ClickHouse URL: 未定义"); } // 执行HTTP请求 try { const response = await fetch(chConfig.clickhouse_url, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}` }, body: query, }); if (!response.ok) { const errorText = await response.text(); throw new Error(`ClickHouse查询失败 (${response.status}): ${errorText}`); } return await response.text(); } catch (error) { throw new Error(`执行ClickHouse查询失败: ${(error as Error).message}`); } } /** * 解析JSON字段 */ function parseJsonField(field: unknown): Record { if (!field) return {}; try { if (typeof field === 'string') { return JSON.parse(field); } else if (typeof field === 'object') { return field as Record; } } catch (error) { console.warn(`无法解析JSON字段:`, error); } return {}; } /** * 转义字符串,避免SQL注入 */ function escapeString(str: string): string { if (!str) return ''; return str.replace(/'/g, "''"); }