diff --git a/app/api/shortlinks/[id]/route.ts b/app/api/shortlinks/[id]/route.ts index 72ea1c8..bbfa192 100644 --- a/app/api/shortlinks/[id]/route.ts +++ b/app/api/shortlinks/[id]/route.ts @@ -45,7 +45,8 @@ export async function GET( favorites, expires_at, click_count, - unique_visitors + unique_visitors, + domain FROM shorturl_analytics.shorturl WHERE id = '${id}' AND deleted_at IS NULL LIMIT 1 @@ -120,7 +121,7 @@ export async function GET( projects: projects, tags: tags.map((tag: any) => tag.tag_name || ''), createdAt: shortlink.created_at, - domain: new URL(shortUrl || 'https://example.com').hostname + domain: shortlink.domain || (shortUrl ? new URL(shortUrl).hostname : '') }; const response: ApiResponse = { diff --git a/app/api/shortlinks/byUrl/route.ts b/app/api/shortlinks/byUrl/route.ts index 57e1650..29b4a7d 100644 --- a/app/api/shortlinks/byUrl/route.ts +++ b/app/api/shortlinks/byUrl/route.ts @@ -43,7 +43,8 @@ export async function GET(request: NextRequest) { favorites, expires_at, click_count, - unique_visitors + unique_visitors, + domain FROM shorturl_analytics.shorturl WHERE JSONHas(attributes, 'shortUrl') AND JSONExtractString(attributes, 'shortUrl') = '${url}' @@ -120,7 +121,7 @@ export async function GET(request: NextRequest) { projects: projects, tags: tags.map((tag) => tag.tag_name || ''), createdAt: shortlink.created_at, - domain: new URL(shortUrl || 'https://example.com').hostname + domain: shortlink.domain || (shortUrl ? new URL(shortUrl).hostname : '') }; console.log('Shortlink data formatted with externalId:', shortlink.external_id, 'Final object:', formattedShortlink); diff --git a/app/api/shortlinks/exact/route.ts b/app/api/shortlinks/exact/route.ts index 48515f7..76a7bbb 100644 --- a/app/api/shortlinks/exact/route.ts +++ b/app/api/shortlinks/exact/route.ts @@ -43,7 +43,8 @@ export async function GET(request: NextRequest) { favorites, expires_at, click_count, - unique_visitors + unique_visitors, + domain FROM shorturl_analytics.shorturl WHERE JSONHas(attributes, 'shortUrl') AND JSONExtractString(attributes, 'shortUrl') = '${shortUrl}' @@ -120,7 +121,7 @@ export async function GET(request: NextRequest) { projects: projects, tags: tags.map((tag: any) => tag.tag_name || ''), createdAt: shortlink.created_at, - domain: new URL(shortUrlValue || 'https://example.com').hostname + domain: shortlink.domain || (shortUrlValue ? new URL(shortUrlValue).hostname : '') }; console.log('Formatted shortlink with externalId:', shortlink.external_id); diff --git a/app/api/shortlinks/route.ts b/app/api/shortlinks/route.ts index b2b84e0..d861882 100644 --- a/app/api/shortlinks/route.ts +++ b/app/api/shortlinks/route.ts @@ -75,7 +75,8 @@ export async function GET(request: NextRequest) { favorites, expires_at, click_count, - unique_visitors + unique_visitors, + domain FROM shorturl_analytics.shorturl WHERE ${whereClause} ORDER BY created_at DESC diff --git a/app/links/page.tsx b/app/links/page.tsx index 9b00055..dcd25cb 100644 --- a/app/links/page.tsx +++ b/app/links/page.tsx @@ -49,6 +49,7 @@ interface ShortLink { expires_at?: string | null; click_count?: number; unique_visitors?: number; + domain?: string; } // Define ClickHouse shorturl type @@ -77,6 +78,7 @@ interface ClickHouseShortUrl { expires_at: string | null; click_count: number; unique_visitors: number; + domain?: string; // 添加domain字段 link_attributes?: string; // Optional JSON string containing link-specific attributes } @@ -175,7 +177,7 @@ export default function LinksPage() { projects: projects, tags: tags, createdAt: link.created_at, - domain: shortUrlValue ? new URL(shortUrlValue).hostname : 'shorturl.example.com' + domain: link.domain || (shortUrlValue ? new URL(shortUrlValue).hostname : '') }; // 打印完整数据,确保 externalId 被包含 @@ -197,19 +199,26 @@ export default function LinksPage() { : link.attributes || {}; // Parse attributes to get domain if available - let domain = 'shorturl.example.com'; + let domain = ''; try { - // Extract domain from shortUrl in attributes if available - const attributesObj = typeof link.attributes === 'string' - ? JSON.parse(link.attributes) - : link.attributes || {}; - - if (attributesObj.shortUrl) { - try { - const urlObj = new URL(attributesObj.shortUrl); - domain = urlObj.hostname; - } catch (e) { - console.error('Error parsing shortUrl:', e); + // 首先尝试使用link.domain字段 + if (link.domain) { + domain = link.domain; + } + // 如果没有domain字段,从shortUrl中提取 + else { + // Extract domain from shortUrl in attributes if available + const attributesObj = typeof link.attributes === 'string' + ? JSON.parse(link.attributes) + : link.attributes || {}; + + if (attributesObj.shortUrl) { + try { + const urlObj = new URL(attributesObj.shortUrl); + domain = urlObj.hostname; + } catch (e) { + console.error('Error parsing shortUrl:', e); + } } } } catch (e) { diff --git a/windmill/sync_shorturl_schema_to_clickhouse.ts b/windmill/sync_shorturl_schema_to_clickhouse.ts new file mode 100644 index 0000000..5d488f5 --- /dev/null +++ b/windmill/sync_shorturl_schema_to_clickhouse.ts @@ -0,0 +1,467 @@ +// 文件名: sync_shorturl_schema_to_clickhouse.ts +// 描述: 此脚本用于同步PostgreSQL中的short_url.shorturl表数据到ClickHouse +// 创建日期: 2023-11-21 + +import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts"; +import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts"; + +// PostgreSQL配置接口 +interface PgConfig { + host: string; + port: number; + user: string; + password: string; + dbname?: string; + [key: string]: unknown; +} + +// ClickHouse配置接口 +interface ChConfig { + clickhouse_host: string; + clickhouse_port: number; + clickhouse_user: string; + clickhouse_password: string; + clickhouse_url?: string; +} + +// Shorturl数据接口 +interface ShortUrlData { + id: string; + slug: string; + origin: string; // 对应ClickHouse中的original_url + title?: string; + description?: string; + created_at?: string; + updated_at?: string; + deleted_at?: string; + expires_at?: string; // 注意这里已更正为expires_at + domain?: string; // 添加domain字段 +} + +/** + * 同步PostgreSQL short_url.shorturl表数据到ClickHouse + */ +export async function main( + params: { + /** 同步数据的开始时间(ISO 8601格式)。默认为1小时前 */ + start_time?: string; + /** 同步数据的结束时间(ISO 8601格式)。默认为当前时间 */ + end_time?: string; + /** 是否为测试模式(不执行实际更新) */ + dry_run?: boolean; + /** 是否显示详细日志 */ + verbose?: boolean; + } +) { + // 设置默认参数 + const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000).toISOString(); + const start_time = params.start_time || oneHourAgo; + const end_time = params.end_time || new Date().toISOString(); + const dry_run = params.dry_run || false; + const verbose = params.verbose || false; + + // 初始化日志函数 + const log = (message: string, isVerbose = false) => { + if (!isVerbose || verbose) { + console.log(message); + } + }; + + log(`开始同步shorturl表数据: ${start_time} 至 ${end_time}`); + + let pgPool: Pool | null = null; + + try { + // 1. 获取数据库配置 + log("获取PostgreSQL数据库配置...", true); + const pgConfig = await getResource('f/limq/production_supabase') as PgConfig; + + // 2. 创建PostgreSQL连接池 + pgPool = new Pool({ + hostname: pgConfig.host, + port: pgConfig.port, + user: pgConfig.user, + password: pgConfig.password, + database: pgConfig.dbname || 'postgres' + }, 3); + + // 3. 获取需要更新的数据 + const shorturlData = await getShortUrlData(pgPool, start_time, end_time, log); + log(`成功获取 ${shorturlData.length} 条shorturl数据`); + + if (shorturlData.length === 0) { + return { success: true, message: "没有找到需要更新的数据", updated: 0 }; + } + + // 4. 获取ClickHouse配置 + const chConfig = await getClickHouseConfig(); + + // 5. 执行更新 + if (!dry_run) { + const shorturlUpdated = await updateClickHouseShortUrl(shorturlData, chConfig, log); + + return { + success: true, + message: "shorturl表数据同步完成", + shorturl_updated: shorturlUpdated + }; + } else { + log("测试模式: 不执行实际更新"); + return { + success: true, + dry_run: true, + shorturl_count: shorturlData.length, + shorturl_sample: shorturlData.slice(0, 1) + }; + } + } catch (error) { + const errorMessage = `同步过程中发生错误: ${(error as Error).message}`; + log(errorMessage); + if ((error as Error).stack) { + log(`错误堆栈: ${(error as Error).stack}`, true); + } + return { success: false, message: errorMessage }; + } finally { + if (pgPool) { + await pgPool.end(); + log("PostgreSQL连接池已关闭", true); + } + } +} + +/** + * 从PostgreSQL获取shorturl数据 + */ +async function getShortUrlData( + pgPool: Pool, + startTime: string, + endTime: string, + log: (message: string, isVerbose?: boolean) => void +): Promise { + const client = await pgPool.connect(); + + try { + log(`获取shorturl数据 (${startTime} 至 ${endTime})`, true); + + const query = ` + SELECT + id, + slug, + origin, + title, + description, + domain, + created_at, + updated_at, + deleted_at, + expired_at as expires_at + FROM + short_url.shorturl + WHERE + (created_at >= $1 AND created_at <= $2) + OR (updated_at >= $1 AND updated_at <= $2) + `; + + const result = await client.queryObject(query, [startTime, endTime]); + return result.rows as ShortUrlData[]; + } finally { + client.release(); + } +} + +/** + * 格式化日期时间为ClickHouse可接受的格式 + */ +function formatDateTime(dateStr: string | null | undefined): string { + if (!dateStr) return 'NULL'; + + try { + // 将日期字符串转换为ISO格式 + const date = new Date(dateStr); + if (isNaN(date.getTime())) { + return 'NULL'; + } + + // 返回ISO格式的日期字符串,ClickHouse可以解析 + return `parseDateTimeBestEffort('${date.toISOString()}')`; + } catch (error) { + console.error(`日期格式化错误: ${error}`); + return 'NULL'; + } +} + +/** + * 格式化进度显示 + */ +function formatProgress(current: number, total: number): string { + const percent = Math.round((current / total) * 100); + const progressBar = '[' + '='.repeat(Math.floor(percent / 5)) + ' '.repeat(20 - Math.floor(percent / 5)) + ']'; + return `${progressBar} ${percent}% (${current}/${total})`; +} + +/** + * 更新ClickHouse中的shorturl表数据 + */ +async function updateClickHouseShortUrl( + shorturls: ShortUrlData[], + chConfig: ChConfig, + log: (message: string, isVerbose?: boolean) => void +): Promise { + if (shorturls.length === 0) { + log('没有找到shorturl数据,跳过shorturl表更新'); + return 0; + } + + log(`准备更新 ${shorturls.length} 条shorturl数据`); + + // 检查ClickHouse中是否存在shorturl表 + const tableExists = await checkClickHouseTable(chConfig, 'shorturl_analytics.shorturl'); + + if (!tableExists) { + log('ClickHouse中未找到shorturl表,请先创建表'); + return 0; + } + + let updatedCount = 0; + const startTime = Date.now(); + + // 使用批量插入更高效 + const batchSize = 50; // 降低批次大小,使查询更稳定 + for (let i = 0; i < shorturls.length; i += batchSize) { + const batch = shorturls.slice(i, i + batchSize); + let successCount = 0; + + // 显示批处理进度信息 + const batchNumber = Math.floor(i / batchSize) + 1; + const totalBatches = Math.ceil(shorturls.length / batchSize); + log(`处理批次 ${batchNumber}/${totalBatches}: ${formatProgress(i, shorturls.length)}`); + + // 对每条记录使用单独的INSERT ... SELECT ... WHERE NOT EXISTS语句 + for (let j = 0; j < batch.length; j++) { + const shorturl = batch[j]; + // 显示记录处理细节进度 + const overallProgress = i + j + 1; + if (overallProgress % 10 === 0 || overallProgress === shorturls.length) { + // 每10条记录或最后一条记录显示一次进度 + const elapsedSeconds = (Date.now() - startTime) / 1000; + const recordsPerSecond = overallProgress / elapsedSeconds; + const remainingRecords = shorturls.length - overallProgress; + const estimatedSecondsRemaining = remainingRecords / recordsPerSecond; + + log(`总进度: ${formatProgress(overallProgress, shorturls.length)} - 速率: ${recordsPerSecond.toFixed(1)}条/秒 - 预计剩余时间: ${formatTime(estimatedSecondsRemaining)}`); + } + + try { + const insertQuery = ` + INSERT INTO shorturl_analytics.shorturl + SELECT + '${escapeString(shorturl.id)}' AS id, + '${escapeString(shorturl.id)}' AS external_id, + 'shorturl' AS type, + '${escapeString(shorturl.slug)}' AS slug, + '${escapeString(shorturl.origin)}' AS original_url, + ${shorturl.title ? `'${escapeString(shorturl.title)}'` : 'NULL'} AS title, + ${shorturl.description ? `'${escapeString(shorturl.description)}'` : 'NULL'} AS description, + '{}' AS attributes, + 1 AS schema_version, + '' AS creator_id, + '' AS creator_email, + '' AS creator_name, + ${formatDateTime(shorturl.created_at)} AS created_at, + ${formatDateTime(shorturl.updated_at)} AS updated_at, + ${formatDateTime(shorturl.deleted_at)} AS deleted_at, + '[]' AS projects, + '[]' AS teams, + '[]' AS tags, + '[]' AS qr_codes, + '[]' AS channels, + '[]' AS favorites, + ${formatDateTime(shorturl.expires_at)} AS expires_at, + 0 AS click_count, + 0 AS unique_visitors, + ${shorturl.domain ? `'${escapeString(shorturl.domain)}'` : 'NULL'} AS domain + WHERE NOT EXISTS ( + SELECT 1 FROM shorturl_analytics.shorturl WHERE id = '${escapeString(shorturl.id)}' + ) + `; + + await executeClickHouseQuery(chConfig, insertQuery); + successCount++; + log(`成功处理shorturl: ${shorturl.id}`, true); + } catch (error) { + log(`处理shorturl ${shorturl.id} 失败: ${(error as Error).message}`); + + // 尝试使用简单插入作为备选方案 + try { + log(`尝试替代方法更新: ${shorturl.id}`, true); + + // 先检查记录是否存在 + const checkQuery = `SELECT count() FROM shorturl_analytics.shorturl WHERE id = '${escapeString(shorturl.id)}'`; + const existsResult = await executeClickHouseQuery(chConfig, checkQuery); + const exists = parseInt(existsResult.trim()) > 0; + + if (!exists) { + const fallbackQuery = ` + INSERT INTO shorturl_analytics.shorturl ( + id, external_id, type, slug, original_url, + title, description, attributes, schema_version, + creator_id, creator_email, creator_name, + created_at, updated_at, deleted_at, + projects, teams, tags, qr_codes, channels, favorites, + expires_at, click_count, unique_visitors, domain + ) VALUES ( + '${escapeString(shorturl.id)}', + '${escapeString(shorturl.id)}', + 'shorturl', + '${escapeString(shorturl.slug)}', + '${escapeString(shorturl.origin)}', + ${shorturl.title ? `'${escapeString(shorturl.title)}'` : 'NULL'}, + ${shorturl.description ? `'${escapeString(shorturl.description)}'` : 'NULL'}, + '{}', + 1, + '', + '', + '', + ${formatDateTime(shorturl.created_at)}, + ${formatDateTime(shorturl.updated_at)}, + ${formatDateTime(shorturl.deleted_at)}, + '[]', + '[]', + '[]', + '[]', + '[]', + '[]', + ${formatDateTime(shorturl.expires_at)}, + 0, + 0, + ${shorturl.domain ? `'${escapeString(shorturl.domain)}'` : 'NULL'} + ) + `; + + await executeClickHouseQuery(chConfig, fallbackQuery); + successCount++; + log(`备选方式插入成功: ${shorturl.id}`, true); + } else { + log(`记录已存在,跳过: ${shorturl.id}`, true); + } + } catch (fallbackError) { + log(`备选方式失败 ${shorturl.id}: ${(fallbackError as Error).message}`); + } + } + } + + updatedCount += successCount; + log(`批次 ${batchNumber}/${totalBatches} 完成: ${successCount}/${batch.length} 条成功 (总计: ${updatedCount}/${shorturls.length})`); + } + + const totalTime = (Date.now() - startTime) / 1000; + log(`同步完成! 总计处理: ${updatedCount}/${shorturls.length} 条记录, 耗时: ${formatTime(totalTime)}, 平均速率: ${(updatedCount / totalTime).toFixed(1)}条/秒`); + + return updatedCount; +} + +/** + * 获取ClickHouse配置 + */ +async function getClickHouseConfig(): Promise { + try { + const chConfigJson = await getVariable("f/shorturl_analytics/clickhouse"); + + // 确保配置不为空 + if (!chConfigJson) { + throw new Error("未找到ClickHouse配置"); + } + + // 解析JSON字符串为对象 + let chConfig: ChConfig; + if (typeof chConfigJson === 'string') { + try { + chConfig = JSON.parse(chConfigJson); + } catch { + throw new Error("ClickHouse配置不是有效的JSON"); + } + } else { + chConfig = chConfigJson as ChConfig; + } + + // 验证并构建URL + if (!chConfig.clickhouse_url && chConfig.clickhouse_host && chConfig.clickhouse_port) { + chConfig.clickhouse_url = `http://${chConfig.clickhouse_host}:${chConfig.clickhouse_port}`; + } + + if (!chConfig.clickhouse_url) { + throw new Error("ClickHouse配置缺少URL"); + } + + return chConfig; + } catch (error) { + throw new Error(`获取ClickHouse配置失败: ${(error as Error).message}`); + } +} + +/** + * 检查ClickHouse中是否存在指定表 + */ +async function checkClickHouseTable(chConfig: ChConfig, tableName: string): Promise { + try { + const query = `EXISTS TABLE ${tableName}`; + const result = await executeClickHouseQuery(chConfig, query); + return result.trim() === '1'; + } catch (error) { + console.error(`检查表 ${tableName} 失败:`, error); + return false; + } +} + +/** + * 执行ClickHouse查询 + */ +async function executeClickHouseQuery(chConfig: ChConfig, query: string): Promise { + // 确保URL有效 + if (!chConfig.clickhouse_url) { + throw new Error("无效的ClickHouse URL: 未定义"); + } + + // 执行HTTP请求 + try { + const response = await fetch(chConfig.clickhouse_url, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": `Basic ${btoa(`${chConfig.clickhouse_user}:${chConfig.clickhouse_password}`)}` + }, + body: query, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`ClickHouse查询失败 (${response.status}): ${errorText}`); + } + + return await response.text(); + } catch (error) { + throw new Error(`执行ClickHouse查询失败: ${(error as Error).message}`); + } +} + +/** + * 转义字符串,避免SQL注入 + */ +function escapeString(str: string): string { + if (!str) return ''; + return str.replace(/'/g, "''"); +} + +/** + * 格式化时间(秒)为可读格式 + */ +function formatTime(seconds: number): string { + const mins = Math.floor(seconds / 60); + const secs = Math.floor(seconds % 60); + + if (mins === 0) { + return `${secs}秒`; + } else { + return `${mins}分${secs}秒`; + } +} \ No newline at end of file