diff --git a/.cursor/mcp.json b/.cursor/mcp.json new file mode 100644 index 0000000..eca4385 --- /dev/null +++ b/.cursor/mcp.json @@ -0,0 +1,34 @@ +{ + "mcpServers": { + "clickhouse-mcp": { + "command": "uv", + "args": [ + "run", + "--with", + "mcp-clickhouse", + "--python", + "3.13", + "mcp-clickhouse" + ], + "env": { + "CLICKHOUSE_HOST": "localhost", + "CLICKHOUSE_PORT": "8123", + "CLICKHOUSE_USER": "admin", + "CLICKHOUSE_PASSWORD": "your_secure_password", + "CLICKHOUSE_DATABASE": "promote", + "CLICKHOUSE_SECURE": "false", + "CLICKHOUSE_VERIFY": "false", + "CLICKHOUSE_CONNECT_TIMEOUT": "120", + "CLICKHOUSE_SEND_RECEIVE_TIMEOUT": "120" + } + }, + "supabase": { + "command": "npx", + "args": [ + "-y", + "@modelcontextprotocol/server-postgres", + "postgresql://postgres.xtqhluzornazlmkonucr:KR$kH9fdwZAd@tdS@aws-0-ap-southeast-1.pooler.supabase.com:5432/postgres" + ] + } + } +} \ No newline at end of file diff --git a/.env b/.env new file mode 100644 index 0000000..45d191c --- /dev/null +++ b/.env @@ -0,0 +1,8 @@ +CLICKHOUSE_HOST=localhost +CLICKHOUSE_PORT=8123 +CLICKHOUSE_USER=admin +CLICKHOUSE_PASSWORD=your_secure_password +CLICKHOUSE_SECURE=true +CLICKHOUSE_VERIFY=true +CLICKHOUSE_CONNECT_TIMEOUT=30 +CLICKHOUSE_SEND_RECEIVE_TIMEOUT=30 \ No newline at end of file diff --git a/ATTENTION.md b/ATTENTION.md new file mode 100644 index 0000000..847c07c --- /dev/null +++ b/ATTENTION.md @@ -0,0 +1,81 @@ +# ClickHouse使用注意事项 + +## 数据插入最佳实践 + +### 1. 利用ClickHouse的默认值机制 + +在向ClickHouse插入数据时,对于有默认值的字段(如`event_id`、`timestamp`、`date`、`hour`等),可以不用显式提供这些值,让ClickHouse自动生成。这样可以: +- 简化代码 +- 避免格式错误 +- 确保数据一致性 + +示例: + +```typescript +// 推荐做法:不指定有默认值的字段 +await clickhouse.insert({ + table: 'events', + values: [{ + // event_id, timestamp, date, hour 自动由ClickHouse生成 + user_id: 'test-user-123', + influencer_id: 'influencer-456', + // 其他必要字段... + }], + format: 'JSONEachRow' +}); +``` + +### 2. 避免JSON格式问题 + +使用`JSONEachRow`格式时,可能会遇到时间戳等特殊格式字段的解析错误: + +``` +Cannot parse input: expected '"' before: '.749Z","date":"2025-03-12",... +``` + +解决方法: +- 让ClickHouse使用默认值自动生成时间相关字段 +- 如必须手动指定,确保格式严格符合ClickHouse要求 + +### 3. 使用正确的方法 + +ClickHouse客户端库(@clickhouse/client)提供了多种插入数据的方法: + +- **insert方法**:最适合结构化数据 + ```typescript + await clickhouse.insert({ + table: 'table_name', + values: [{ field1: value1, field2: value2 }], + format: 'JSONEachRow' + }); + ``` + +- **query方法**:适合原始SQL语句 + ```typescript + await clickhouse.query({ + query: `INSERT INTO table_name (field1, field2) VALUES (?, ?)`, + values: [[value1, value2]] + }); + ``` + +### 4. Enum类型字段 + +对于Enum类型字段(如`event_type`、`funnel_stage`等),插入时使用定义的枚举字符串值,ClickHouse会自动转换为内部数字表示: + +```typescript +// 正确:使用枚举字符串值 +event_type: 'comment', +funnel_stage: 'consideration', +``` + +## 常见错误处理 + +1. **JSON格式错误**:通常与时间戳和日期字段有关,尽可能使用ClickHouse默认值 +2. **枚举值错误**:确保使用表定义中的精确枚举字符串 +3. **数据类型不匹配**:检查每个字段的类型是否与表定义匹配 + +## 性能考虑 + +1. 批量插入比单条插入更高效 +2. 对于大批量数据,考虑使用流式插入 +3. 高频小数据量插入,考虑使用异步插入方法 diff --git a/backend/db/sql/clickhouse/sync_logs.sql b/backend/db/sql/clickhouse/sync_logs.sql index 45e065b..19e3a0a 100644 --- a/backend/db/sql/clickhouse/sync_logs.sql +++ b/backend/db/sql/clickhouse/sync_logs.sql @@ -1,5 +1,7 @@ +DROP TABLE IF EXISTS promote.sync_logs; + CREATE TABLE IF NOT EXISTS promote.sync_logs ( - timestamp DateTime, + timestamp DateTime DEFAULT now(), duration_ms UInt64, posts_synced UInt32, comments_synced UInt32, diff --git a/backend/src/services/syncService.ts b/backend/src/services/syncService.ts index dd3fa30..024796e 100644 --- a/backend/src/services/syncService.ts +++ b/backend/src/services/syncService.ts @@ -1,8 +1,16 @@ -import { randomUUID } from 'crypto'; +import { Client } from 'pg'; import clickhouse from '../utils/clickhouse'; +import { randomUUID } from 'crypto'; + +// 数据库连接信息 +const PG_CONNECTION_STRING = process.env.DATABASE_URL || ''; /** - * 简单的同步函数,只插入一条测试数据到ClickHouse + * 从PostgreSQL数据库同步数据到ClickHouse + * 同步三种事件类型: + * 1. 新增Posts + * 2. 新增Comments + * 3. Influencer变化 */ export async function syncAllData(fromTimestamp: string): Promise<{ success: boolean; @@ -15,53 +23,413 @@ export async function syncAllData(fromTimestamp: string): Promise<{ }> { console.log(`开始同步数据,时间范围: ${fromTimestamp} - 现在`); const errors: string[] = []; + const startTime = Date.now(); + + // 创建PostgreSQL客户端 + const pgClient = new Client({ + connectionString: PG_CONNECTION_STRING + }); try { - // 使用insert方法并仅提供必要字段,让ClickHouse为其他字段使用默认值 - await clickhouse.insert({ - table: 'events', - values: [{ - // 让ClickHouse自动生成event_id、timestamp、date和hour - user_id: 'test-user-123', - influencer_id: 'influencer-456', - content_id: 'content-789', - project_id: 'project-abc', - event_type: 'comment', - funnel_stage: 'consideration', - platform: 'instagram', - content_type: 'text', - content_status: 'approved', - sentiment: 'positive', - comment_text: '测试数据 - ClickHouse同步测试' - }], - format: 'JSONEachRow' // 使用JSONEachRow格式 - }); + // 连接到PostgreSQL + await pgClient.connect(); + console.log('PostgreSQL连接成功'); - console.log('数据插入成功'); + // 同步各类数据 + const postsCount = await syncPosts(pgClient, fromTimestamp); + const commentsCount = await syncComments(pgClient, fromTimestamp); + const influencerChangesCount = await syncInfluencerChanges(pgClient, fromTimestamp); + const projectsCount = await syncProjects(pgClient, fromTimestamp); - // 只计算了一条评论 - const comments = 1; + // 关闭PostgreSQL连接 + await pgClient.end(); + + const duration = (Date.now() - startTime) / 1000; + + // 记录同步日志到ClickHouse + await logSyncResults(duration, postsCount, commentsCount, influencerChangesCount, projectsCount, errors.length === 0, errors); return { - success: true, - message: '测试数据插入成功', - comments, - posts: 0, - influencer_changes: 0, - projects: 0, + success: errors.length === 0, + message: errors.length === 0 ? '同步成功' : '同步过程中有错误', + posts: postsCount, + comments: commentsCount, + influencer_changes: influencerChangesCount, + projects: projectsCount, errors }; } catch (err: any) { - console.error('数据插入失败:', err.message); + console.error('同步过程中发生错误:', err.message); errors.push(err.message); + + // 确保关闭PostgreSQL连接 + try { + await pgClient.end(); + } catch (e) { + console.error('关闭PostgreSQL连接失败:', e); + } + return { success: false, - message: `插入失败: ${err.message}`, - comments: 0, + message: `同步失败: ${err.message}`, posts: 0, + comments: 0, influencer_changes: 0, projects: 0, errors }; } } + +/** + * 同步Posts数据 + */ +async function syncPosts(pgClient: Client, fromTimestamp: string): Promise { + console.log('开始同步Posts数据...'); + let count = 0; + + try { + // 查询在指定时间范围内新增的posts + const query = ` + SELECT + p.post_id, + p.influencer_id, + p.platform, + p.project_id, + p.title, + p.description, + p.published_at, + p.created_at + FROM + posts p + WHERE + p.created_at >= $1 + ORDER BY + p.created_at ASC + `; + + const result = await pgClient.query(query, [fromTimestamp]); + const posts = result.rows; + + if (posts.length === 0) { + console.log('没有新的Posts数据需要同步'); + return 0; + } + + // 将每个post作为一个事件插入到ClickHouse + const values = posts.map(post => ({ + user_id: 'system', + influencer_id: post.influencer_id, + content_id: post.post_id, + project_id: post.project_id || '', + event_type: 'impression', // 新帖子作为一个曝光事件 + funnel_stage: 'exposure', + platform: post.platform || 'unknown', + content_type: 'video', // 默认类型,可根据实际情况调整 + content_status: 'approved', + sentiment: 'neutral', + comment_text: post.title || post.description || '', + interaction_value: 1.0, + recorded_by: 'system' + })); + + if (values.length > 0) { + // 批量插入事件 + await clickhouse.insert({ + table: 'events', + values, + format: 'JSONEachRow' + }); + + count = values.length; + console.log(`成功同步 ${count} 条新增Posts数据`); + } + + return count; + } catch (error: any) { + console.error('同步Posts数据失败:', error.message); + throw error; + } +} + +/** + * 同步Comments数据 + */ +async function syncComments(pgClient: Client, fromTimestamp: string): Promise { + console.log('开始同步Comments数据...'); + let count = 0; + + try { + // 查询在指定时间范围内新增的comments + const query = ` + SELECT + c.comment_id, + c.post_id, + c.user_id, + c.content, + c.sentiment_score, + c.created_at, + p.influencer_id, + p.project_id, + p.platform + FROM + comments c + LEFT JOIN + posts p ON c.post_id = p.post_id + WHERE + c.created_at >= $1 + ORDER BY + c.created_at ASC + `; + + const result = await pgClient.query(query, [fromTimestamp]); + const comments = result.rows; + + if (comments.length === 0) { + console.log('没有新的Comments数据需要同步'); + return 0; + } + + // 将每个comment作为一个事件插入到ClickHouse + const values = comments.map(comment => { + // 根据情感分数确定情感 + let sentiment = 'neutral'; + if (comment.sentiment_score > 0.5) { + sentiment = 'positive'; + } else if (comment.sentiment_score < -0.5) { + sentiment = 'negative'; + } + + return { + user_id: comment.user_id || 'anonymous', + influencer_id: comment.influencer_id || '', + content_id: comment.post_id || '', + project_id: comment.project_id || '', + event_type: 'comment', + funnel_stage: 'consideration', // 评论通常表示用户在考虑阶段 + platform: comment.platform || 'unknown', + content_type: 'text', + content_status: 'approved', + sentiment, + comment_text: comment.content || '', + interaction_value: 3.0, // 评论价值较高 + recorded_by: 'system' + }; + }); + + if (values.length > 0) { + // 批量插入事件 + await clickhouse.insert({ + table: 'events', + values, + format: 'JSONEachRow' + }); + + count = values.length; + console.log(`成功同步 ${count} 条新增Comments数据`); + } + + return count; + } catch (error: any) { + console.error('同步Comments数据失败:', error.message); + throw error; + } +} + +/** + * 同步Influencer变化数据 + */ +async function syncInfluencerChanges(pgClient: Client, fromTimestamp: string): Promise { + console.log('开始同步Influencer变化数据...'); + let count = 0; + + try { + // 查询在指定时间范围内更新的influencers + const query = ` + SELECT + i.influencer_id, + i.name, + i.platform, + i.followers_count, + i.video_count, + i.updated_at + FROM + influencers i + WHERE + i.updated_at >= $1 + ORDER BY + i.updated_at ASC + `; + + const result = await pgClient.query(query, [fromTimestamp]); + const influencers = result.rows; + + if (influencers.length === 0) { + console.log('没有Influencer变化数据需要同步'); + return 0; + } + + // 将每个influencer的变化作为一个事件插入到ClickHouse + const values = influencers.map(influencer => ({ + user_id: 'system', + influencer_id: influencer.influencer_id, + content_id: '', + project_id: '', + event_type: 'follow', // KOL的粉丝变化 + funnel_stage: 'interest', + platform: influencer.platform || 'unknown', + content_type: 'text', + content_status: 'approved', + sentiment: 'neutral', + comment_text: '', + followers_count: influencer.followers_count || 0, + followers_change: 0, // 这里可以通过查询历史记录计算变化值 + interaction_value: 1.0, + recorded_by: 'system' + })); + + if (values.length > 0) { + // 批量插入事件 + await clickhouse.insert({ + table: 'events', + values, + format: 'JSONEachRow' + }); + + count = values.length; + console.log(`成功同步 ${count} 条Influencer变化数据`); + } + + return count; + } catch (error: any) { + console.error('同步Influencer变化数据失败:', error.message); + throw error; + } +} + +/** + * 同步Projects数据 + */ +async function syncProjects(pgClient: Client, fromTimestamp: string): Promise { + console.log('开始同步Projects数据...'); + let count = 0; + + try { + // 查询在指定时间范围内新增的projects + const query = ` + SELECT + p.id, + p.name, + p.description, + p.created_by, + p.created_at + FROM + projects p + WHERE + p.created_at >= $1 + ORDER BY + p.created_at ASC + `; + + const result = await pgClient.query(query, [fromTimestamp]); + const projects = result.rows; + + if (projects.length === 0) { + console.log('没有新的Projects数据需要同步'); + return 0; + } + + // 将每个project作为一个事件插入到ClickHouse + const values = projects.map(project => ({ + user_id: project.created_by || 'system', + influencer_id: '', + content_id: '', + project_id: project.id, + event_type: 'impression', + funnel_stage: 'exposure', + platform: 'internal', + content_type: 'text', + content_status: 'approved', + sentiment: 'neutral', + comment_text: project.name || '', + interaction_value: 1.0, + recorded_by: 'system' + })); + + if (values.length > 0) { + // 批量插入事件 + await clickhouse.insert({ + table: 'events', + values, + format: 'JSONEachRow' + }); + + count = values.length; + console.log(`成功同步 ${count} 条新增Projects数据`); + } + + return count; + } catch (error: any) { + console.error('同步Projects数据失败:', error.message); + throw error; + } +} + +/** + * 记录同步结果到ClickHouse的sync_logs表 + */ +async function logSyncResults( + duration: number, + postsCount: number, + commentsCount: number, + influencerChangesCount: number, + projectsCount: number, + success: boolean, + errors: string[] +): Promise { + try { + // 使用insert方法插入数据,timestamp字段由ClickHouse自动生成 + await clickhouse.insert({ + table: 'sync_logs', + values: [{ + // 不提供timestamp字段,由ClickHouse的DEFAULT now()自动生成 + duration_ms: Math.round(duration * 1000), + posts_synced: postsCount, + comments_synced: commentsCount, + influencer_changes_synced: influencerChangesCount, + projects_synced: projectsCount, + success: success ? 1 : 0, + error_messages: errors.join('; ') + }], + format: 'JSONEachRow' + }); + + console.log('同步日志记录成功'); + } catch (error: any) { + console.error('记录同步日志失败:', error.message); + } +} + +/** + * 创建定时同步任务的函数 + * 可以被调度系统调用,比如使用node-cron + */ +export async function scheduledSync(): Promise { + try { + // 获取上次同步时间,如果没有则使用24小时前 + const lastDay = new Date(); + lastDay.setDate(lastDay.getDate() - 1); + const fromTimestamp = lastDay.toISOString(); + + console.log(`执行计划同步任务,同步时间范围: ${fromTimestamp} - 现在`); + const result = await syncAllData(fromTimestamp); + + if (result.success) { + console.log(`计划同步任务成功完成,已同步: 帖子(${result.posts}), 评论(${result.comments}), KOL变化(${result.influencer_changes}), 项目(${result.projects})`); + } else { + console.error(`计划同步任务有错误:`, result.errors); + } + } catch (error: any) { + console.error('计划同步任务失败:', error.message); + } +} diff --git a/backend/src/utils/scheduledTasks.ts b/backend/src/utils/scheduledTasks.ts index f624e97..b2d3ba9 100644 --- a/backend/src/utils/scheduledTasks.ts +++ b/backend/src/utils/scheduledTasks.ts @@ -459,11 +459,7 @@ export const scheduleDatabaseSync = async ( * Syncs data from PostgreSQL to ClickHouse * @param lastSyncTimestamp The timestamp of the last sync */ -async function syncPostgresToClickhouse(lastSyncTimestamp?: string): Promise<{ - posts: number; - comments: number; - influencerChanges: number; -}> { +async function syncPostgresToClickhouse(lastSyncTimestamp?: string): Promise { console.log(`Starting PostgreSQL to ClickHouse sync from timestamp: ${lastSyncTimestamp}`); // If no last sync timestamp provided, use a timestamp from 1 hour ago @@ -474,12 +470,28 @@ async function syncPostgresToClickhouse(lastSyncTimestamp?: string): Promise<{ } try { - // Sync all data + // Use the enhanced syncAllData function from syncService const result = await syncAllData(lastSyncTimestamp); - console.log('PostgreSQL to ClickHouse sync completed:', result); + console.log('PostgreSQL to ClickHouse sync completed:', { + success: result.success, + posts: result.posts, + comments: result.comments, + influencer_changes: result.influencer_changes, + projects: result.projects, + errors: result.errors.length > 0 ? result.errors : 'None' + }); - return result; + if (!result.success) { + throw new Error(`Sync failed with errors: ${result.errors.join('; ')}`); + } + + return { + posts: result.posts || 0, + comments: result.comments || 0, + influencerChanges: result.influencer_changes || 0, + projects: result.projects || 0 + }; } catch (error) { console.error('Error in PostgreSQL to ClickHouse sync:', error); throw error; diff --git a/backend/src/utils/scheduler.ts b/backend/src/utils/scheduler.ts new file mode 100644 index 0000000..e99586d --- /dev/null +++ b/backend/src/utils/scheduler.ts @@ -0,0 +1,52 @@ +/** + * 任务调度器,用于定期执行数据同步任务 + */ + +import { scheduledSync } from '../services/syncService'; + +// 定义任务间隔时间(毫秒) +const SYNC_INTERVAL = 1000 * 60 * 60 * 6; // 每6小时同步一次 + +let syncIntervalId: NodeJS.Timeout | null = null; + +/** + * 启动定时同步任务 + */ +export function startScheduledTasks() { + // 停止现有任务(如果有) + stopScheduledTasks(); + + console.log('启动数据同步定时任务,间隔时间:', SYNC_INTERVAL / (1000 * 60 * 60), '小时'); + + // 立即执行一次同步 + console.log('执行初始数据同步...'); + scheduledSync().catch(err => { + console.error('初始同步任务失败:', err); + }); + + // 设置定时执行 + syncIntervalId = setInterval(() => { + console.log('执行定时数据同步任务...'); + scheduledSync().catch(err => { + console.error('定时同步任务失败:', err); + }); + }, SYNC_INTERVAL); + + console.log('数据同步定时任务已启动'); +} + +/** + * 停止定时同步任务 + */ +export function stopScheduledTasks() { + if (syncIntervalId) { + clearInterval(syncIntervalId); + syncIntervalId = null; + console.log('数据同步定时任务已停止'); + } +} + +// 导出任务状态检查函数 +export function isSchedulerRunning(): boolean { + return syncIntervalId !== null; +} \ No newline at end of file