660 lines
19 KiB
TypeScript
660 lines
19 KiB
TypeScript
// 文件名: sync_resource_relations.ts
|
||
// 描述: 此脚本用于同步PostgreSQL中资源关联数据到ClickHouse
|
||
// 作者: AI Assistant
|
||
// 创建日期: 2023-10-31
|
||
|
||
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
|
||
import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
|
||
|
||
// PostgreSQL配置接口
|
||
interface PgConfig {
|
||
host: string;
|
||
port: number;
|
||
user: string;
|
||
password: string;
|
||
dbname?: string;
|
||
[key: string]: unknown;
|
||
}
|
||
|
||
// ClickHouse配置接口
|
||
interface ChConfig {
|
||
clickhouse_host: string;
|
||
clickhouse_port: number;
|
||
clickhouse_user: string;
|
||
clickhouse_password: string;
|
||
clickhouse_url?: string;
|
||
}
|
||
|
||
// 资源相关接口定义
|
||
interface TeamData {
|
||
team_id: string;
|
||
team_name: string;
|
||
team_description?: string;
|
||
project_id?: string;
|
||
}
|
||
|
||
interface ProjectData {
|
||
project_id: string;
|
||
project_name: string;
|
||
project_description?: string;
|
||
assigned_at?: string;
|
||
resource_id?: string;
|
||
}
|
||
|
||
interface TagData {
|
||
tag_id: string;
|
||
tag_name: string;
|
||
tag_type?: string;
|
||
created_at?: string;
|
||
resource_id?: string;
|
||
}
|
||
|
||
interface FavoriteData {
|
||
favorite_id: string;
|
||
user_id: string;
|
||
first_name?: string;
|
||
last_name?: string;
|
||
email?: string;
|
||
created_at?: string;
|
||
}
|
||
|
||
// 资源关联数据接口
|
||
interface ResourceRelations {
|
||
resource_id: string;
|
||
teams?: TeamData[];
|
||
projects?: ProjectData[];
|
||
tags?: TagData[];
|
||
favorites?: FavoriteData[];
|
||
external_id?: string;
|
||
type?: string;
|
||
attributes?: Record<string, unknown>;
|
||
}
|
||
|
||
/**
|
||
* 同步PostgreSQL资源关联数据到ClickHouse
|
||
*/
|
||
export async function main(
|
||
params: {
|
||
/** 要同步的资源ID列表 */
|
||
resource_ids: string[];
|
||
/** 是否同步teams数据 */
|
||
sync_teams?: boolean;
|
||
/** 是否同步projects数据 */
|
||
sync_projects?: boolean;
|
||
/** 是否同步tags数据 */
|
||
sync_tags?: boolean;
|
||
/** 是否同步favorites数据 */
|
||
sync_favorites?: boolean;
|
||
/** 是否为测试模式(不执行实际更新) */
|
||
dry_run?: boolean;
|
||
/** 是否显示详细日志 */
|
||
verbose?: boolean;
|
||
}
|
||
) {
|
||
// 设置默认参数
|
||
const resource_ids = params.resource_ids || [];
|
||
const sync_teams = params.sync_teams !== false;
|
||
const sync_projects = params.sync_projects !== false;
|
||
const sync_tags = params.sync_tags !== false;
|
||
const sync_favorites = params.sync_favorites !== false;
|
||
const dry_run = params.dry_run || false;
|
||
const verbose = params.verbose || false;
|
||
|
||
if (resource_ids.length === 0) {
|
||
return { success: false, message: "至少需要提供一个资源ID" };
|
||
}
|
||
|
||
// 初始化日志函数
|
||
const log = (message: string, isVerbose = false) => {
|
||
if (!isVerbose || verbose) {
|
||
console.log(message);
|
||
}
|
||
};
|
||
|
||
log(`开始同步资源关联数据: ${resource_ids.join(", ")}`);
|
||
log(`同步选项: teams=${sync_teams}, projects=${sync_projects}, tags=${sync_tags}, favorites=${sync_favorites}`, true);
|
||
|
||
let pgPool: Pool | null = null;
|
||
|
||
try {
|
||
// 1. 获取数据库配置
|
||
log("获取PostgreSQL数据库配置...", true);
|
||
const pgConfig = await getResource('f/limq/postgresql') as PgConfig;
|
||
|
||
// 2. 创建PostgreSQL连接池
|
||
pgPool = new Pool({
|
||
hostname: pgConfig.host,
|
||
port: pgConfig.port,
|
||
user: pgConfig.user,
|
||
password: pgConfig.password,
|
||
database: pgConfig.dbname || 'postgres'
|
||
}, 3);
|
||
|
||
// 3. 获取需要更新的资源完整数据
|
||
const resourcesData = await getResourcesWithRelations(pgPool, resource_ids, {
|
||
sync_teams,
|
||
sync_projects,
|
||
sync_tags,
|
||
sync_favorites
|
||
}, log);
|
||
|
||
log(`成功获取 ${resourcesData.length} 个资源的关联数据`);
|
||
|
||
if (resourcesData.length === 0) {
|
||
return { success: true, message: "没有找到需要更新的资源数据", updated: 0 };
|
||
}
|
||
|
||
// 4. 获取ClickHouse配置
|
||
const chConfig = await getClickHouseConfig();
|
||
|
||
// 5. 对每个资源执行更新
|
||
if (!dry_run) {
|
||
// 5a. 更新shorturl表数据
|
||
const shorturlUpdated = await updateClickHouseShorturl(resourcesData, chConfig, log);
|
||
|
||
// 5b. 更新events表数据
|
||
const eventsUpdated = await updateClickHouseEvents(resourcesData, chConfig, log);
|
||
|
||
return {
|
||
success: true,
|
||
message: "资源关联数据同步完成",
|
||
shorturl_updated: shorturlUpdated,
|
||
events_updated: eventsUpdated,
|
||
total_updated: shorturlUpdated + eventsUpdated
|
||
};
|
||
} else {
|
||
log("测试模式: 不执行实际更新");
|
||
if (resourcesData.length > 0) {
|
||
log("示例数据:");
|
||
log(JSON.stringify(resourcesData[0], null, 2));
|
||
}
|
||
return { success: true, dry_run: true, resources: resourcesData };
|
||
}
|
||
} catch (error) {
|
||
const errorMessage = `同步过程中发生错误: ${(error as Error).message}`;
|
||
log(errorMessage);
|
||
if ((error as Error).stack) {
|
||
log(`错误堆栈: ${(error as Error).stack}`, true);
|
||
}
|
||
return { success: false, message: errorMessage };
|
||
} finally {
|
||
if (pgPool) {
|
||
await pgPool.end();
|
||
log("PostgreSQL连接池已关闭", true);
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 从PostgreSQL获取资源及其关联数据
|
||
*/
|
||
async function getResourcesWithRelations(
|
||
pgPool: Pool,
|
||
resourceIds: string[],
|
||
options: {
|
||
sync_teams: boolean;
|
||
sync_projects: boolean;
|
||
sync_tags: boolean;
|
||
sync_favorites: boolean;
|
||
},
|
||
log: (message: string, isVerbose?: boolean) => void
|
||
): Promise<ResourceRelations[]> {
|
||
const client = await pgPool.connect();
|
||
|
||
try {
|
||
// 准备资源IDs参数
|
||
const resourceIdsParam = resourceIds.map(id => `'${id}'`).join(',');
|
||
|
||
// 1. 获取基本资源信息
|
||
log(`获取资源基本信息: ${resourceIdsParam}`, true);
|
||
const resourcesQuery = `
|
||
SELECT
|
||
r.id,
|
||
r.external_id,
|
||
r.type,
|
||
r.attributes,
|
||
r.schema_version,
|
||
r.created_at,
|
||
r.updated_at
|
||
FROM
|
||
limq.resources r
|
||
WHERE
|
||
r.id IN (${resourceIdsParam})
|
||
AND r.deleted_at IS NULL
|
||
`;
|
||
|
||
const resourcesResult = await client.queryObject(resourcesQuery);
|
||
|
||
if (resourcesResult.rows.length === 0) {
|
||
log(`未找到有效的资源数据`, true);
|
||
return [];
|
||
}
|
||
|
||
// 处理每个资源
|
||
const enrichedResources: ResourceRelations[] = [];
|
||
|
||
for (const resource of resourcesResult.rows) {
|
||
const resourceId = resource.id as string;
|
||
log(`处理资源ID: ${resourceId}`, true);
|
||
|
||
// 初始化关联数据对象
|
||
const relationData: ResourceRelations = {
|
||
resource_id: resourceId,
|
||
external_id: resource.external_id as string,
|
||
type: resource.type as string,
|
||
attributes: parseJsonField(resource.attributes)
|
||
};
|
||
|
||
// 2. 获取项目关联
|
||
if (options.sync_projects) {
|
||
const projectsQuery = `
|
||
SELECT
|
||
pr.resource_id, pr.project_id,
|
||
p.name as project_name, p.description as project_description,
|
||
pr.assigned_at
|
||
FROM
|
||
limq.project_resources pr
|
||
JOIN
|
||
limq.projects p ON pr.project_id = p.id
|
||
WHERE
|
||
pr.resource_id = $1
|
||
AND p.deleted_at IS NULL
|
||
`;
|
||
|
||
const projectsResult = await client.queryObject(projectsQuery, [resourceId]);
|
||
relationData.projects = projectsResult.rows as ProjectData[];
|
||
log(`找到 ${projectsResult.rows.length} 个关联项目`, true);
|
||
}
|
||
|
||
// 3. 获取标签关联
|
||
if (options.sync_tags) {
|
||
const tagsQuery = `
|
||
SELECT
|
||
rt.resource_id, rt.tag_id, rt.created_at,
|
||
t.name as tag_name, t.type as tag_type
|
||
FROM
|
||
limq.resource_tags rt
|
||
JOIN
|
||
limq.tags t ON rt.tag_id = t.id
|
||
WHERE
|
||
rt.resource_id = $1
|
||
AND t.deleted_at IS NULL
|
||
`;
|
||
|
||
const tagsResult = await client.queryObject(tagsQuery, [resourceId]);
|
||
relationData.tags = tagsResult.rows as TagData[];
|
||
log(`找到 ${tagsResult.rows.length} 个关联标签`, true);
|
||
}
|
||
|
||
// 4. 获取团队关联(通过项目)
|
||
if (options.sync_teams && relationData.projects && relationData.projects.length > 0) {
|
||
const projectIds = relationData.projects.map((p: ProjectData) => p.project_id);
|
||
|
||
if (projectIds.length > 0) {
|
||
const teamsQuery = `
|
||
SELECT
|
||
tp.team_id, tp.project_id,
|
||
t.name as team_name, t.description as team_description
|
||
FROM
|
||
limq.team_projects tp
|
||
JOIN
|
||
limq.teams t ON tp.team_id = t.id
|
||
WHERE
|
||
tp.project_id = ANY($1::uuid[])
|
||
AND t.deleted_at IS NULL
|
||
`;
|
||
|
||
const teamsResult = await client.queryObject(teamsQuery, [projectIds]);
|
||
relationData.teams = teamsResult.rows as TeamData[];
|
||
log(`找到 ${teamsResult.rows.length} 个关联团队`, true);
|
||
}
|
||
}
|
||
|
||
// 5. 获取收藏关联
|
||
if (options.sync_favorites) {
|
||
const favoritesQuery = `
|
||
SELECT
|
||
f.id as favorite_id, f.user_id, f.created_at,
|
||
u.first_name, u.last_name, u.email
|
||
FROM
|
||
limq.favorite f
|
||
JOIN
|
||
limq.users u ON f.user_id = u.id
|
||
WHERE
|
||
f.favoritable_id = $1
|
||
AND f.favoritable_type = 'resource'
|
||
AND f.deleted_at IS NULL
|
||
`;
|
||
|
||
const favoritesResult = await client.queryObject(favoritesQuery, [resourceId]);
|
||
relationData.favorites = favoritesResult.rows as FavoriteData[];
|
||
log(`找到 ${favoritesResult.rows.length} 个收藏记录`, true);
|
||
}
|
||
|
||
// 添加到结果集
|
||
enrichedResources.push(relationData);
|
||
}
|
||
|
||
return enrichedResources;
|
||
} finally {
|
||
client.release();
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 更新ClickHouse中的shorturl表数据
|
||
*/
|
||
async function updateClickHouseShorturl(
|
||
resources: ResourceRelations[],
|
||
chConfig: ChConfig,
|
||
log: (message: string, isVerbose?: boolean) => void
|
||
): Promise<number> {
|
||
// 只处理类型为shorturl的资源
|
||
const shorturls = resources.filter(r => r.type === 'shorturl');
|
||
|
||
if (shorturls.length === 0) {
|
||
log('没有找到shorturl类型的资源,跳过shorturl表更新');
|
||
return 0;
|
||
}
|
||
|
||
log(`准备更新 ${shorturls.length} 个shorturl资源`);
|
||
|
||
let updatedCount = 0;
|
||
|
||
// 检查ClickHouse中是否存在shorturl表
|
||
const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.shorturl');
|
||
|
||
if (!tableExists) {
|
||
log('ClickHouse中未找到shorturl表,请先创建表');
|
||
return 0;
|
||
}
|
||
|
||
// 对每个资源执行更新
|
||
for (const shorturl of shorturls) {
|
||
try {
|
||
// 格式化团队数据
|
||
const teams = JSON.stringify(shorturl.teams || []);
|
||
|
||
// 格式化项目数据
|
||
const projects = JSON.stringify(shorturl.projects || []);
|
||
|
||
// 格式化标签数据
|
||
const tags = JSON.stringify((shorturl.tags || []).map((t: TagData) => ({
|
||
tag_id: t.tag_id,
|
||
tag_name: t.tag_name,
|
||
tag_type: t.tag_type,
|
||
created_at: t.created_at
|
||
})));
|
||
|
||
// 格式化收藏数据
|
||
const favorites = JSON.stringify((shorturl.favorites || []).map((f: FavoriteData) => ({
|
||
favorite_id: f.favorite_id,
|
||
user_id: f.user_id,
|
||
user_name: `${f.first_name || ""} ${f.last_name || ""}`.trim(),
|
||
created_at: f.created_at
|
||
})));
|
||
|
||
// 尝试更新ClickHouse数据
|
||
const updateQuery = `
|
||
ALTER TABLE shorturl_analytics.shorturl
|
||
UPDATE
|
||
teams = '${escapeString(teams)}',
|
||
projects = '${escapeString(projects)}',
|
||
tags = '${escapeString(tags)}',
|
||
favorites = '${escapeString(favorites)}'
|
||
WHERE id = '${shorturl.resource_id}'
|
||
`;
|
||
|
||
await executeClickHouseQuery(chConfig, updateQuery);
|
||
log(`更新shorturl完成: ${shorturl.resource_id}`, true);
|
||
updatedCount++;
|
||
|
||
} catch (error) {
|
||
log(`更新shorturl ${shorturl.resource_id} 失败: ${(error as Error).message}`);
|
||
}
|
||
}
|
||
|
||
return updatedCount;
|
||
}
|
||
|
||
/**
|
||
* 更新ClickHouse中的events表数据
|
||
*/
|
||
async function updateClickHouseEvents(
|
||
resources: ResourceRelations[],
|
||
chConfig: ChConfig,
|
||
log: (message: string, isVerbose?: boolean) => void
|
||
): Promise<number> {
|
||
// 过滤出有external_id的资源
|
||
const resourcesWithExternalId = resources.filter(r => r.external_id && r.external_id.trim() !== '');
|
||
|
||
if (resourcesWithExternalId.length === 0) {
|
||
log('没有找到具有external_id的资源,跳过events表更新');
|
||
return 0;
|
||
}
|
||
|
||
log(`准备更新events表中与 ${resourcesWithExternalId.length} 个外部ID相关的记录`);
|
||
|
||
// 检查ClickHouse中是否存在events表
|
||
const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.events');
|
||
|
||
if (!tableExists) {
|
||
log('ClickHouse中未找到events表,请先创建表');
|
||
return 0;
|
||
}
|
||
|
||
// 提取所有的external_id
|
||
const externalIds = resourcesWithExternalId.map(r => r.external_id).filter(Boolean) as string[];
|
||
|
||
// 构建资源数据映射(使用external_id作为键)
|
||
const resourceMapByExternalId = resourcesWithExternalId.reduce((map, resource) => {
|
||
if (resource.external_id) {
|
||
map[resource.external_id] = resource;
|
||
}
|
||
return map;
|
||
}, {} as Record<string, ResourceRelations>);
|
||
|
||
// 获取ClickHouse中相关资源的事件记录数量
|
||
let updatedCount = 0;
|
||
|
||
try {
|
||
// 格式化外部ID列表
|
||
const formattedExternalIds = externalIds.map(id => `'${id}'`).join(', ');
|
||
|
||
// 先查询是否有相关事件
|
||
const countQuery = `
|
||
SELECT COUNT(*) as count
|
||
FROM shorturl_analytics.events
|
||
WHERE event_id IN (${formattedExternalIds})
|
||
`;
|
||
|
||
const countResult = await executeClickHouseQuery(chConfig, countQuery);
|
||
const eventCount = parseInt(countResult.trim(), 10);
|
||
|
||
if (eventCount === 0) {
|
||
// 尝试另一种查询方式
|
||
const alternateCountQuery = `
|
||
SELECT COUNT(*) as count
|
||
FROM shorturl_analytics.events
|
||
WHERE link_id IN (${formattedExternalIds})
|
||
`;
|
||
|
||
const alternateCountResult = await executeClickHouseQuery(chConfig, alternateCountQuery);
|
||
const alternateEventCount = parseInt(alternateCountResult.trim(), 10);
|
||
|
||
if (alternateEventCount === 0) {
|
||
log('没有找到相关事件记录,跳过events表更新');
|
||
log(`已尝试的匹配字段: event_id,link_id`, true);
|
||
return 0;
|
||
} else {
|
||
log(`找到 ${alternateEventCount} 条以link_id匹配的事件记录需要更新`);
|
||
}
|
||
} else {
|
||
log(`找到 ${eventCount} 条以event_id匹配的事件记录需要更新`);
|
||
}
|
||
|
||
// 批量更新每个资源相关的事件记录
|
||
for (const externalId of externalIds) {
|
||
const resource = resourceMapByExternalId[externalId];
|
||
|
||
if (!resource) continue;
|
||
|
||
// 获取关联数据
|
||
const tags = resource.tags ? JSON.stringify(resource.tags) : null;
|
||
|
||
if (tags) {
|
||
// 尝试通过event_id更新事件标签
|
||
const updateTagsQueryByEventId = `
|
||
ALTER TABLE shorturl_analytics.events
|
||
UPDATE link_tags = '${escapeString(tags)}'
|
||
WHERE event_id = '${externalId}'
|
||
`;
|
||
|
||
await executeClickHouseQuery(chConfig, updateTagsQueryByEventId);
|
||
log(`尝试通过event_id更新事件标签: ${externalId}`, true);
|
||
|
||
// 尝试通过link_id更新事件标签
|
||
const updateTagsQueryByLinkId = `
|
||
ALTER TABLE shorturl_analytics.events
|
||
UPDATE link_tags = '${escapeString(tags)}'
|
||
WHERE link_id = '${externalId}'
|
||
`;
|
||
|
||
await executeClickHouseQuery(chConfig, updateTagsQueryByLinkId);
|
||
log(`尝试通过link_id更新事件标签: ${externalId}`, true);
|
||
}
|
||
|
||
// 如果资源有resource_id,也尝试使用它来更新
|
||
if (resource.resource_id) {
|
||
const updateByResourceId = `
|
||
ALTER TABLE shorturl_analytics.events
|
||
UPDATE link_tags = '${escapeString(tags || '[]')}'
|
||
WHERE link_id = '${resource.resource_id}'
|
||
`;
|
||
|
||
await executeClickHouseQuery(chConfig, updateByResourceId);
|
||
log(`尝试通过resource_id更新事件标签: ${resource.resource_id}`, true);
|
||
}
|
||
|
||
updatedCount++;
|
||
}
|
||
|
||
log(`已尝试更新 ${updatedCount} 个资源的事件记录`);
|
||
|
||
} catch (error) {
|
||
log(`更新events表失败: ${(error as Error).message}`);
|
||
}
|
||
|
||
return updatedCount;
|
||
}
|
||
|
||
/**
|
||
* 获取ClickHouse配置
|
||
*/
|
||
async function getClickHouseConfig(): Promise<ChConfig> {
|
||
try {
|
||
const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse");
|
||
|
||
// 确保配置不为空
|
||
if (!chConfigJson) {
|
||
throw new Error("未找到ClickHouse配置");
|
||
}
|
||
|
||
// 解析JSON字符串为对象
|
||
let chConfig: ChConfig;
|
||
if (typeof chConfigJson === 'string') {
|
||
try {
|
||
chConfig = JSON.parse(chConfigJson);
|
||
} catch (_) {
|
||
throw new Error("ClickHouse配置不是有效的JSON");
|
||
}
|
||
} else {
|
||
chConfig = chConfigJson as ChConfig;
|
||
}
|
||
|
||
// 验证并构建URL
|
||
if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) {
|
||
chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`;
|
||
}
|
||
|
||
if (!chConfig.clickhouse_url) {
|
||
throw new Error("ClickHouse配置缺少URL");
|
||
}
|
||
|
||
return chConfig;
|
||
} catch (error) {
|
||
throw new Error(`获取ClickHouse配置失败: ${(error as Error).message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 检查ClickHouse中是否存在指定表
|
||
*/
|
||
async function checkClickHouseTable(chConfig: ChConfig, tableName: string): Promise<boolean> {
|
||
try {
|
||
const query = `EXISTS TABLE ${tableName}`;
|
||
const result = await executeClickHouseQuery(chConfig, query);
|
||
return result.trim() === '1';
|
||
} catch (error) {
|
||
console.error(`检查表 ${tableName} 失败:`, error);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 执行ClickHouse查询
|
||
*/
|
||
async function executeClickHouseQuery(chConfig: ChConfig, query: string): Promise<string> {
|
||
// 确保URL有效
|
||
if (!chConfig.clickhouse_url) {
|
||
throw new Error("无效的ClickHouse URL: 未定义");
|
||
}
|
||
|
||
// 执行HTTP请求
|
||
try {
|
||
const response = await fetch(chConfig.clickhouse_url, {
|
||
method: "POST",
|
||
headers: {
|
||
"Content-Type": "application/x-www-form-urlencoded",
|
||
"Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}`
|
||
},
|
||
body: query,
|
||
});
|
||
|
||
if (!response.ok) {
|
||
const errorText = await response.text();
|
||
throw new Error(`ClickHouse查询失败 (${response.status}): ${errorText}`);
|
||
}
|
||
|
||
return await response.text();
|
||
} catch (error) {
|
||
throw new Error(`执行ClickHouse查询失败: ${(error as Error).message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 解析JSON字段
|
||
*/
|
||
function parseJsonField(field: unknown): Record<string, unknown> {
|
||
if (!field) return {};
|
||
|
||
try {
|
||
if (typeof field === 'string') {
|
||
return JSON.parse(field);
|
||
} else if (typeof field === 'object') {
|
||
return field as Record<string, unknown>;
|
||
}
|
||
} catch (error) {
|
||
console.warn(`无法解析JSON字段:`, error);
|
||
}
|
||
|
||
return {};
|
||
}
|
||
|
||
/**
|
||
* 转义字符串,避免SQL注入
|
||
*/
|
||
function escapeString(str: string): string {
|
||
if (!str) return '';
|
||
return str.replace(/'/g, "''");
|
||
} |