import clickhouse from '../utils/clickhouse'; import { logger } from '../utils/logger'; import { ResultSet } from '@clickhouse/client'; /** * Represents a KOL's performance overview data */ export interface KolPerformanceData { influencer_id: string; name: string; platform: string; profile_url: string; followers_count: number; followers_change: number; followers_change_percentage: number | null | string; likes_change: number; likes_change_percentage: number | null | string; follows_change: number; follows_change_percentage: number | null | string; } /** * Response structure for KOL performance overview */ export interface KolPerformanceResponse { kols: KolPerformanceData[]; total: number; } /** * 漏斗阶段数据 */ export interface FunnelStageData { stage: string; // 阶段名称 stage_display: string; // 阶段显示名称 count: number; // 用户数量 percentage: number; // 总体占比(%) conversion_rate: number | null; // 与上一阶段的转化率(%) } /** * 漏斗分析总览数据 */ export interface FunnelOverview { average_conversion_rate: number; // 平均转化率(%) highest_conversion_stage: string; // 转化率最高的阶段 lowest_conversion_stage: string; // 转化率最低的阶段 } /** * 漏斗分析响应 */ export interface FunnelResponse { stages: FunnelStageData[]; // 各阶段数据 overview: FunnelOverview; // 总览数据 } /** * 贴文表现数据 */ export interface PostPerformanceData { post_id: string; // 贴文ID title: string; // 标题 kol_id: string; // KOL ID kol_name: string; // KOL 名称 platform: string; // 平台 publish_date: string; // 发布日期 metrics: { views: number; // 观看数 likes: number; // 点赞数 comments: number; // 留言数 shares: number; // 分享数 }; sentiment_score: number; // 情绪指标评分 post_url: string; // 贴文链接 } /** * 贴文表现响应 */ export interface PostPerformanceResponse { posts: PostPerformanceData[]; // 贴文数据 total: number; // 总数 } /** * 概览卡片数据 */ export interface DashboardCardData { comments_count: { current: number; change_percentage: number; }; engagement_rate: { current: number; change_percentage: number; }; sentiment_score: { current: number; change_percentage: number; }; } /** * 留言趋势数据点 */ export interface CommentTrendPoint { date: string; count: number; } /** * 留言趋势响应 */ export interface CommentTrendResponse { data: CommentTrendPoint[]; max_count: number; total_count: number; } /** * 平台分布数据项 */ export interface PlatformDistributionItem { platform: string; count: number; percentage: number; } /** * 平台分布响应 */ export interface PlatformDistributionResponse { data: PlatformDistributionItem[]; total: number; } /** * 情感分析详情数据 */ export interface SentimentAnalysisData { positive: { count: number; percentage: number; }; neutral: { count: number; percentage: number; }; negative: { count: number; percentage: number; }; total: number; average_score: number; } /** * 热门文章数据项 */ export interface PopularPostItem { post_id: string; title: string; platform: string; influencer_id: string; influencer_name: string; publish_date: string; engagement_count: number; views_count: number; engagement_rate: number; is_high_engagement: boolean; } /** * 热门文章响应 */ export interface PopularPostsResponse { posts: PopularPostItem[]; total: number; } /** * Analytics service for KOL performance data */ export class AnalyticsService { /** * Get KOL performance overview from ClickHouse * @param timeRange Number of days to look back (7, 30, or 90) * @param projectId Optional project ID to filter by * @param sortBy Field to sort by (followers_change, likes_change, follows_change) * @param sortOrder Sort order (asc or desc) * @param limit Number of KOLs to return * @param offset Offset for pagination * @returns KOL performance overview data */ async getKolPerformanceOverview( timeRange: number, projectId?: string, sortBy: string = 'followers_change', sortOrder: string = 'desc', limit: number = 20, offset: number = 0 ): Promise { const startTime = Date.now(); logger.info('Fetching KOL performance overview', { timeRange, projectId, sortBy, sortOrder, limit, offset }); try { // First let's check if we have any data in the tables await this.checkDataExistence(); // Calculate date for time range const currentDate = new Date(); const pastDate = new Date(); pastDate.setDate(currentDate.getDate() - timeRange); // Format dates for SQL const currentDateStr = this.formatDateForClickhouse(currentDate); const pastDateStr = this.formatDateForClickhouse(pastDate); // Build project filter condition const projectFilter = projectId ? `AND project_id = '${projectId}'` : ''; // First get the influencers const influencersQuery = ` SELECT influencer_id, name, platform, profile_url, followers_count FROM influencers WHERE 1=1 ${projectFilter} ORDER BY followers_count DESC LIMIT ${limit} OFFSET ${offset} `; // Query to get total count for pagination const countQuery = ` SELECT COUNT(*) as total FROM influencers WHERE 1=1 ${projectFilter} `; logger.debug('Executing ClickHouse queries for influencers', { influencersQuery: influencersQuery.replace(/\n\s+/g, ' ').trim(), countQuery: countQuery.replace(/\n\s+/g, ' ').trim() }); // Execute the influencers and count queries const [influencersData, countResult] = await Promise.all([ this.executeClickhouseQuery(influencersQuery), this.executeClickhouseQuery(countQuery) ]); // If we have no influencers, return empty result if (!Array.isArray(influencersData) || influencersData.length === 0) { logger.info('No influencers found for the given criteria'); return { kols: [], total: 0 }; } // Get the list of influencer IDs to fetch events for const influencerIds = influencersData.map(inf => inf.influencer_id); const influencerIdsList = influencerIds.map(id => `'${id}'`).join(', '); // Now fetch the event metrics for these influencers const eventsQuery = ` SELECT influencer_id, -- Current period metrics SUM(CASE WHEN event_type = 'follow' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) - SUM(CASE WHEN event_type = 'unfollow' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) AS followers_change, SUM(CASE WHEN event_type = 'like' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) - SUM(CASE WHEN event_type = 'unlike' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) AS likes_change, SUM(CASE WHEN event_type = 'follow' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) AS follows_change FROM events WHERE influencer_id IN (${influencerIdsList}) GROUP BY influencer_id `; logger.debug('Executing ClickHouse query for events', { eventsQuery: eventsQuery.replace(/\n\s+/g, ' ').trim() }); // Execute the events query const eventsData = await this.executeClickhouseQuery(eventsQuery); // Create a map of influencer_id to events data for fast lookup const eventsMap: Record = {}; if (Array.isArray(eventsData)) { eventsData.forEach(event => { if (event.influencer_id) { eventsMap[event.influencer_id] = event; } }); } // Combine the influencer data with events data const kols = influencersData.map(influencer => { const events = eventsMap[influencer.influencer_id] || {}; return { influencer_id: String(influencer.influencer_id || ''), name: String(influencer.name || ''), platform: String(influencer.platform || ''), profile_url: String(influencer.profile_url || ''), followers_count: Number(influencer.followers_count || 0), followers_change: Number(events.followers_change || 0), followers_change_percentage: "待计算", likes_change: Number(events.likes_change || 0), likes_change_percentage: "待计算", follows_change: Number(events.follows_change || 0), follows_change_percentage: "待计算" }; }); // Sort the results based on the requested sort field and order kols.sort((a: any, b: any) => { const aValue = a[sortBy] || 0; const bValue = b[sortBy] || 0; return sortOrder.toLowerCase() === 'asc' ? aValue - bValue : bValue - aValue; }); const total = this.parseCountResult(countResult); const duration = Date.now() - startTime; logger.info('KOL performance overview fetched successfully', { duration, kolCount: kols.length, total }); return { kols, total }; } catch (error) { const duration = Date.now() - startTime; logger.error(`Error in getKolPerformanceOverview (${duration}ms)`, error); throw error; } } /** * Check if tables contain data and log the results for diagnostics */ private async checkDataExistence(): Promise { try { // Check if the influencers table has data const influencerCountQuery = "SELECT COUNT(*) as count FROM influencers"; const eventsCountQuery = "SELECT COUNT(*) as count FROM events"; const [influencersResult, eventsResult] = await Promise.all([ this.executeClickhouseQuery(influencerCountQuery), this.executeClickhouseQuery(eventsCountQuery) ]); const influencersCount = Array.isArray(influencersResult) && influencersResult.length > 0 ? influencersResult[0].count : 0; const eventsCount = Array.isArray(eventsResult) && eventsResult.length > 0 ? eventsResult[0].count : 0; logger.info('Data existence check', { influencers_count: influencersCount, events_count: eventsCount }); // Optional: Check for sample data if (influencersCount > 0) { const sampleQuery = "SELECT influencer_id, name, platform, profile_url, followers_count FROM influencers LIMIT 2"; const sampleResult = await this.executeClickhouseQuery(sampleQuery); logger.info('Sample influencer data', { sample: sampleResult }); } if (eventsCount > 0) { const sampleQuery = ` SELECT event_type, COUNT(*) as count FROM events GROUP BY event_type LIMIT 5 `; const sampleResult = await this.executeClickhouseQuery(sampleQuery); logger.info('Sample events data', { sample: sampleResult }); } } catch (error) { logger.warn('Error checking data existence', error); // Don't throw, just continue with the query } } /** * Execute a ClickHouse query with proper error handling * @param query SQL query to execute * @returns Query result */ private async executeClickhouseQuery(query: string): Promise { try { const result = await clickhouse.query({ query, format: 'JSONEachRow' }); // Handle different result formats if ('json' in result) { return await result.json(); } else if ('rows' in result) { return result.rows; } return []; } catch (error) { logger.error('Error executing ClickHouse query', error); // Re-throw to be handled by the caller throw new Error(`ClickHouse query error: ${error instanceof Error ? error.message : String(error)}`); } } /** * Parse count result from ClickHouse * @param data ClickHouse count query result * @returns Total count */ private parseCountResult(data: any[]): number { if (!Array.isArray(data) || data.length === 0) { logger.warn('ClickHouse count result is invalid, returning 0'); return 0; } return Number(data[0]?.total || 0); } /** * Format date for ClickHouse query * @param date Date to format * @returns Formatted date string (YYYY-MM-DD) */ private formatDateForClickhouse(date: Date): string { return date.toISOString().split('T')[0]; } /** * Debug event data - only called if debug=true parameter is set */ async debugEventData(): Promise { try { // Check events table const eventCountQuery = "SELECT COUNT(*) as count FROM events"; const eventTypesQuery = "SELECT event_type, COUNT(*) as count FROM events GROUP BY event_type"; const sampleEventsQuery = "SELECT * FROM events LIMIT 5"; const sampleInfluencersEventsQuery = ` SELECT e.event_type, e.influencer_id, e.date, i.name FROM events e JOIN influencers i ON e.influencer_id = i.influencer_id LIMIT 10 `; const [eventCount, eventTypes, sampleEvents, joinedEvents] = await Promise.all([ this.executeClickhouseQuery(eventCountQuery), this.executeClickhouseQuery(eventTypesQuery), this.executeClickhouseQuery(sampleEventsQuery), this.executeClickhouseQuery(sampleInfluencersEventsQuery).catch(() => []) ]); logger.info('DEBUG: Event counts', { total_events: eventCount[0]?.count || 0, event_types: eventTypes, }); logger.info('DEBUG: Sample events', { sample_events: sampleEvents, joined_events: joinedEvents }); // Try executing our specific events query directly const eventsQuery = ` SELECT influencer_id, SUM(CASE WHEN event_type = 'follow' THEN 1 ELSE 0 END) AS follows, SUM(CASE WHEN event_type = 'unfollow' THEN 1 ELSE 0 END) AS unfollows, SUM(CASE WHEN event_type = 'like' THEN 1 ELSE 0 END) AS likes, SUM(CASE WHEN event_type = 'unlike' THEN 1 ELSE 0 END) AS unlikes FROM events GROUP BY influencer_id LIMIT 10 `; const eventsResult = await this.executeClickhouseQuery(eventsQuery).catch(err => { logger.error('DEBUG: Error executing events query', err); return []; }); logger.info('DEBUG: Aggregated events', { events_result: eventsResult }); } catch (error) { logger.warn('Error in debugEventData', error); } } /** * 获取KOL合作转换漏斗数据 * @param timeRange 时间范围(天数) * @param projectId 可选项目ID * @returns 漏斗数据 */ async getKolFunnel( timeRange: number, projectId?: string ): Promise { const startTime = Date.now(); logger.info('Fetching KOL funnel data', { timeRange, projectId }); try { // 计算时间范围 const currentDate = new Date(); const pastDate = new Date(); pastDate.setDate(currentDate.getDate() - timeRange); // 格式化日期 const currentDateStr = this.formatDateForClickhouse(currentDate); const pastDateStr = this.formatDateForClickhouse(pastDate); // 构建项目过滤条件 const projectFilter = projectId ? `AND project_id = '${projectId}'` : ''; // 漏斗阶段及其显示名称 const stages = [ { id: 'exposure', display: '曝光' }, { id: 'interest', display: '兴趣' }, { id: 'consideration', display: '考虑' }, { id: 'intent', display: '意向' }, { id: 'evaluation', display: '评估' }, { id: 'purchase', display: '购买' } ]; // 查询每个阶段的用户数量 const funnelQuery = ` SELECT funnel_stage, COUNT(DISTINCT user_id) as user_count FROM events WHERE date BETWEEN '${pastDateStr}' AND '${currentDateStr}' ${projectFilter} GROUP BY funnel_stage ORDER BY CASE funnel_stage WHEN 'exposure' THEN 1 WHEN 'interest' THEN 2 WHEN 'consideration' THEN 3 WHEN 'intent' THEN 4 WHEN 'evaluation' THEN 5 WHEN 'purchase' THEN 6 ELSE 7 END `; logger.debug('Executing ClickHouse query for funnel data', { funnelQuery: funnelQuery.replace(/\n\s+/g, ' ').trim() }); // 执行查询 const funnelData = await this.executeClickhouseQuery(funnelQuery); // 将结果转换为Map便于查找 const stageCounts: Record = {}; if (Array.isArray(funnelData)) { funnelData.forEach(item => { if (item.funnel_stage && item.user_count) { stageCounts[item.funnel_stage] = Number(item.user_count); } }); } // 计算总用户数(以最上层漏斗的用户数为准) const totalUsers = stageCounts['exposure'] || 0; // 构建漏斗阶段数据 const stagesData: FunnelStageData[] = []; let prevCount = 0; stages.forEach((stage, index) => { const count = stageCounts[stage.id] || 0; const percentage = totalUsers > 0 ? (count / totalUsers * 100) : 0; let conversionRate = null; if (index > 0 && prevCount > 0) { conversionRate = (count / prevCount) * 100; } stagesData.push({ stage: stage.id, stage_display: stage.display, count, percentage: parseFloat(percentage.toFixed(2)), conversion_rate: conversionRate !== null ? parseFloat(conversionRate.toFixed(2)) : null }); prevCount = count; }); // 计算总览数据 const conversionRates = stagesData .filter(stage => stage.conversion_rate !== null) .map(stage => stage.conversion_rate as number); const averageConversionRate = conversionRates.length > 0 ? parseFloat((conversionRates.reduce((sum, rate) => sum + rate, 0) / conversionRates.length).toFixed(2)) : 0; // 找出转化率最高和最低的阶段 let highestStage = { stage_display: '无数据', rate: 0 }; let lowestStage = { stage_display: '无数据', rate: 100 }; stagesData.forEach(stage => { if (stage.conversion_rate !== null) { if (stage.conversion_rate > highestStage.rate) { highestStage = { stage_display: stage.stage_display, rate: stage.conversion_rate }; } if (stage.conversion_rate < lowestStage.rate) { lowestStage = { stage_display: stage.stage_display, rate: stage.conversion_rate }; } } }); const overview: FunnelOverview = { average_conversion_rate: averageConversionRate, highest_conversion_stage: highestStage.stage_display, lowest_conversion_stage: lowestStage.stage_display }; const duration = Date.now() - startTime; logger.info('KOL funnel data fetched successfully', { duration, totalUsers }); return { stages: stagesData, overview }; } catch (error) { const duration = Date.now() - startTime; logger.error(`Error in getKolFunnel (${duration}ms)`, error); throw error; } } /** * 获取KOL贴文表现数据 * @param kolId 可选KOL ID * @param platform 可选平台 * @param startDate 可选开始日期 * @param endDate 可选结束日期 * @param sortBy 排序字段 (views, likes, comments, shares, sentiment) * @param sortOrder 排序方向 (asc, desc) * @param limit 限制数量 * @param offset 偏移量 * @returns 贴文表现数据 */ async getPostPerformance( kolId?: string, platform?: string, startDate?: string, endDate?: string, sortBy: string = 'publish_date', sortOrder: string = 'desc', limit: number = 20, offset: number = 0 ): Promise { const startTime = Date.now(); logger.info('Fetching KOL post performance', { kolId, platform, startDate, endDate, sortBy, sortOrder, limit, offset }); try { // Check data existence first await this.checkDataExistence(); // Prepare filters const filters: string[] = []; if (kolId) { filters.push(`AND p.influencer_id = '${kolId}'`); } if (platform) { filters.push(`AND p.platform = '${platform}'`); } if (startDate) { filters.push(`AND p.date >= toDate('${startDate}')`); } if (endDate) { filters.push(`AND p.date <= toDate('${endDate}')`); } const filterCondition = filters.join(' '); // Validate and prepare sortBy field const validSortFields = ['publish_date', 'views', 'likes', 'comments', 'shares', 'sentiment_score']; const sortField = validSortFields.includes(sortBy) ? sortBy : 'publish_date'; // Prepare sort order const order = sortOrder.toLowerCase() === 'asc' ? 'ASC' : 'DESC'; // 查询帖文基本数据 const postsQuery = ` SELECT p.post_id, p.title, p.influencer_id AS kol_id, i.name AS kol_name, p.platform, p.created_at AS publish_date, CONCAT('https://', p.platform, '.com/post/', p.post_id) AS post_url FROM posts p LEFT JOIN influencers i ON p.influencer_id = i.influencer_id WHERE 1=1 ${filterCondition} ORDER BY p.created_at ${order} LIMIT ${limit} OFFSET ${offset} `; // 从events表聚合互动指标和情感评分 const metricsQuery = ` SELECT content_id AS post_id, SUM(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) AS views, SUM(CASE WHEN event_type = 'like' THEN 1 ELSE 0 END) AS likes, SUM(CASE WHEN event_type = 'comment' THEN 1 ELSE 0 END) AS comments, SUM(CASE WHEN event_type = 'share' THEN 1 ELSE 0 END) AS shares, AVG(CASE WHEN sentiment = 'positive' THEN 1 WHEN sentiment = 'neutral' THEN 0 WHEN sentiment = 'negative' THEN -1 ELSE NULL END) AS sentiment_score FROM events WHERE content_id IS NOT NULL GROUP BY content_id `; // Query to get total count for pagination const countQuery = ` SELECT COUNT(*) as total FROM posts p WHERE 1=1 ${filterCondition} `; logger.debug('Executing ClickHouse queries for post performance', { postsQuery: postsQuery.replace(/\n\s+/g, ' ').trim(), metricsQuery: metricsQuery.replace(/\n\s+/g, ' ').trim(), countQuery: countQuery.replace(/\n\s+/g, ' ').trim() }); // 同时执行所有查询 const [postsData, countResult, metricsData] = await Promise.all([ this.executeClickhouseQuery(postsQuery), this.executeClickhouseQuery(countQuery), this.executeClickhouseQuery(metricsQuery).catch(err => { logger.warn('Failed to fetch metrics data, using mock data instead', { error: err.message }); return []; }) ]); // Parse results const total = this.parseCountResult(countResult); // If no posts found, return empty result if (!Array.isArray(postsData) || postsData.length === 0) { logger.info('No posts found for the given criteria'); return { posts: [], total: 0 }; } // 创建指标Map,方便查找 const metricsMap: Record = {}; if (Array.isArray(metricsData)) { metricsData.forEach(item => { if (item.post_id) { metricsMap[item.post_id] = { views: Number(item.views || 0), likes: Number(item.likes || 0), comments: Number(item.comments || 0), shares: Number(item.shares || 0), sentiment_score: Number(item.sentiment_score || 0) }; } }); } // 合并数据,生成最终结果 const transformedPosts: PostPerformanceData[] = postsData.map(post => { // 获取帖文的指标数据 const metrics = metricsMap[post.post_id] || {}; const postMetrics = { views: Number(metrics.views || 0), likes: Number(metrics.likes || 0), comments: Number(metrics.comments || 0), shares: Number(metrics.shares || 0) }; // 获取情感分数 const sentimentScore = metrics.sentiment_score !== undefined ? Number(metrics.sentiment_score) : 0; // 默认为0(中性) return { post_id: post.post_id, title: post.title || '无标题', kol_id: post.kol_id, kol_name: post.kol_name || '未知KOL', platform: post.platform || 'unknown', publish_date: post.publish_date, metrics: postMetrics, sentiment_score: sentimentScore, post_url: post.post_url || `https://${post.platform || 'example'}.com/post/${post.post_id}` }; }); // 如果按照指标排序,则在内存中重新排序 if (sortField !== 'publish_date') { transformedPosts.sort((a, b) => { let aValue = 0; let bValue = 0; if (sortField === 'sentiment_score') { aValue = a.sentiment_score; bValue = b.sentiment_score; } else { // 处理metrics内部字段排序 const metricField = sortField as keyof typeof a.metrics; aValue = a.metrics[metricField] || 0; bValue = b.metrics[metricField] || 0; } return sortOrder.toLowerCase() === 'asc' ? aValue - bValue : bValue - aValue; }); } const duration = Date.now() - startTime; logger.info('KOL post performance data fetched successfully', { duration, resultCount: transformedPosts.length, totalPosts: total }); return { posts: transformedPosts, total }; } catch (error) { const duration = Date.now() - startTime; logger.error(`Error in getPostPerformance (${duration}ms)`, error); // 发生错误时返回空数据 return { posts: [], total: 0 }; } } /** * 获取概览卡片数据 * @param timeRange 时间范围(天数) * @param projectId 可选项目ID * @returns 概览卡片数据 */ async getDashboardCardData( timeRange: number, projectId?: string ): Promise { const startTime = Date.now(); logger.info('Fetching dashboard card data', { timeRange, projectId }); try { // 计算当前时间范围和比较时间范围 const currentDate = new Date(); const pastDate = new Date(); pastDate.setDate(currentDate.getDate() - timeRange); const olderPastDate = new Date(pastDate); olderPastDate.setDate(olderPastDate.getDate() - timeRange); // 格式化日期 const currentDateStr = this.formatDateForClickhouse(currentDate); const pastDateStr = this.formatDateForClickhouse(pastDate); const olderPastDateStr = this.formatDateForClickhouse(olderPastDate); // 构建项目过滤条件 const projectFilter = projectId ? `AND project_id = '${projectId}'` : ''; // 查询当前时间段的数据 const currentPeriodQuery = ` SELECT countIf(event_type = 'comment') AS comments_count, countIf(event_type = 'view') AS views_count, countIf(event_type = 'like' OR event_type = 'comment' OR event_type = 'share') AS interactions_count, avg(CASE WHEN sentiment = 'positive' THEN 1 WHEN sentiment = 'neutral' THEN 0 WHEN sentiment = 'negative' THEN -1 ELSE NULL END) AS avg_sentiment FROM events WHERE date BETWEEN '${pastDateStr}' AND '${currentDateStr}' ${projectFilter} `; // 查询上一时间段的数据(用于计算环比) const previousPeriodQuery = ` SELECT countIf(event_type = 'comment') AS comments_count, countIf(event_type = 'view') AS views_count, countIf(event_type = 'like' OR event_type = 'comment' OR event_type = 'share') AS interactions_count, avg(CASE WHEN sentiment = 'positive' THEN 1 WHEN sentiment = 'neutral' THEN 0 WHEN sentiment = 'negative' THEN -1 ELSE NULL END) AS avg_sentiment FROM events WHERE date BETWEEN '${olderPastDateStr}' AND '${pastDateStr}' ${projectFilter} `; logger.debug('Executing ClickHouse queries for dashboard cards', { currentPeriodQuery: currentPeriodQuery.replace(/\n\s+/g, ' ').trim(), previousPeriodQuery: previousPeriodQuery.replace(/\n\s+/g, ' ').trim() }); // 执行查询 const [currentData, previousData] = await Promise.all([ this.executeClickhouseQuery(currentPeriodQuery), this.executeClickhouseQuery(previousPeriodQuery) ]); // 解析结果 const current = currentData[0] || {}; const previous = previousData[0] || {}; // 计算当前值 const commentsCount = Number(current.comments_count || 0); const viewsCount = Number(current.views_count || 0); const interactionsCount = Number(current.interactions_count || 0); const avgSentiment = Number(current.avg_sentiment || 0); // 计算环比变化 const prevCommentsCount = Number(previous.comments_count || 0); const prevViewsCount = Number(previous.views_count || 0); const prevInteractionsCount = Number(previous.interactions_count || 0); const prevAvgSentiment = Number(previous.avg_sentiment || 0); // 计算环比百分比变化 const calculatePercentageChange = (current: number, previous: number): number => { if (previous === 0) return current > 0 ? 100 : 0; return ((current - previous) / previous) * 100; }; const commentsChange = calculatePercentageChange(commentsCount, prevCommentsCount); // 计算互动率 (interactions / views) const currentEngagementRate = viewsCount > 0 ? (interactionsCount / viewsCount) * 100 : 0; const prevEngagementRate = prevViewsCount > 0 ? (prevInteractionsCount / prevViewsCount) * 100 : 0; const engagementRateChange = calculatePercentageChange(currentEngagementRate, prevEngagementRate); // 计算情感得分变化 const sentimentChange = calculatePercentageChange(avgSentiment, prevAvgSentiment); const result: DashboardCardData = { comments_count: { current: commentsCount, change_percentage: parseFloat(commentsChange.toFixed(2)) }, engagement_rate: { current: parseFloat(currentEngagementRate.toFixed(2)), change_percentage: parseFloat(engagementRateChange.toFixed(2)) }, sentiment_score: { current: parseFloat(avgSentiment.toFixed(2)), change_percentage: parseFloat(sentimentChange.toFixed(2)) } }; const duration = Date.now() - startTime; logger.info('Dashboard card data fetched successfully', { duration, result }); return result; } catch (error) { const duration = Date.now() - startTime; logger.error(`Error in getDashboardCardData (${duration}ms)`, error); // 发生错误时返回模拟数据 return { comments_count: { current: 0, change_percentage: 0 }, engagement_rate: { current: 0, change_percentage: 0 }, sentiment_score: { current: 0, change_percentage: 0 } }; } } /** * 获取留言趋势数据 * @param timeRange 时间范围(天数) * @param projectId 可选项目ID * @param platform 可选平台 * @returns 留言趋势数据 */ async getCommentTrend( timeRange: number, projectId?: string, platform?: string ): Promise { const startTime = Date.now(); logger.info('Fetching comment trend data', { timeRange, projectId, platform }); try { // 计算时间范围 const currentDate = new Date(); const pastDate = new Date(); pastDate.setDate(currentDate.getDate() - timeRange); // 格式化日期 const currentDateStr = this.formatDateForClickhouse(currentDate); const pastDateStr = this.formatDateForClickhouse(pastDate); // 构建过滤条件 const filters: string[] = []; filters.push(`date BETWEEN '${pastDateStr}' AND '${currentDateStr}'`); filters.push(`event_type = 'comment'`); if (projectId) { filters.push(`project_id = '${projectId}'`); } if (platform) { filters.push(`platform = '${platform}'`); } const filterCondition = filters.join(' AND '); // 按日期分组查询留言数量 const trendQuery = ` SELECT toDate(timestamp) AS date, count() AS count FROM events WHERE ${filterCondition} GROUP BY date ORDER BY date ASC `; // 查询总留言数 const totalQuery = ` SELECT count() AS total FROM events WHERE ${filterCondition} `; logger.debug('Executing ClickHouse queries for comment trend', { trendQuery: trendQuery.replace(/\n\s+/g, ' ').trim(), totalQuery: totalQuery.replace(/\n\s+/g, ' ').trim() }); // 执行查询 const [trendData, totalData] = await Promise.all([ this.executeClickhouseQuery(trendQuery), this.executeClickhouseQuery(totalQuery) ]); // 解析结果 const total = totalData[0]?.total || 0; // 确保每天都有数据点 const trendPoints: CommentTrendPoint[] = []; let maxCount = 0; // 创建日期范围内的所有日期 const dateMap: Record = {}; const currentDateObj = new Date(pastDate); while (currentDateObj <= currentDate) { const dateStr = this.formatDateForClickhouse(currentDateObj); dateMap[dateStr] = 0; currentDateObj.setDate(currentDateObj.getDate() + 1); } // 填充实际数据 if (Array.isArray(trendData)) { trendData.forEach(point => { if (point.date && point.count !== undefined) { const dateStr = point.date.split('T')[0]; // 确保日期格式一致 dateMap[dateStr] = Number(point.count); maxCount = Math.max(maxCount, Number(point.count)); } }); } // 转换为数组 Object.entries(dateMap).forEach(([date, count]) => { trendPoints.push({ date, count: Number(count) }); }); // 按日期排序 trendPoints.sort((a, b) => a.date.localeCompare(b.date)); const result: CommentTrendResponse = { data: trendPoints, max_count: maxCount, total_count: Number(total) }; const duration = Date.now() - startTime; logger.info('Comment trend data fetched successfully', { duration, dataPoints: trendPoints.length, total }); return result; } catch (error) { const duration = Date.now() - startTime; logger.error(`Error in getCommentTrend (${duration}ms)`, error); // 发生错误时返回空数据 return { data: [], max_count: 0, total_count: 0 }; } } /** * 获取平台分布数据 * @param timeRange 时间范围(天数) * @param projectId 可选项目ID * @param eventType 可选事件类型 * @returns 平台分布数据 */ async getPlatformDistribution( timeRange: number, projectId?: string, eventType: string = 'comment' ): Promise { const startTime = Date.now(); logger.info('Fetching platform distribution data', { timeRange, projectId, eventType }); try { // 计算时间范围 const currentDate = new Date(); const pastDate = new Date(); pastDate.setDate(currentDate.getDate() - timeRange); // 格式化日期 const currentDateStr = this.formatDateForClickhouse(currentDate); const pastDateStr = this.formatDateForClickhouse(pastDate); // 构建过滤条件 const filters: string[] = []; filters.push(`date BETWEEN '${pastDateStr}' AND '${currentDateStr}'`); filters.push(`event_type = '${eventType}'`); if (projectId) { filters.push(`project_id = '${projectId}'`); } const filterCondition = filters.join(' AND '); // 按平台分组查询事件数量 const query = ` SELECT platform, count() AS count FROM events WHERE ${filterCondition} GROUP BY platform ORDER BY count DESC `; logger.debug('Executing ClickHouse query for platform distribution', { query: query.replace(/\n\s+/g, ' ').trim() }); // 执行查询 const result = await this.executeClickhouseQuery(query); // 处理结果 const platforms: PlatformDistributionItem[] = []; let total = 0; if (Array.isArray(result)) { result.forEach(item => { if (item.platform && item.count !== undefined) { const count = Number(item.count); total += count; platforms.push({ platform: item.platform, count, percentage: 0 // 先初始化为0,稍后计算 }); } }); } // 计算百分比 if (total > 0) { platforms.forEach(item => { item.percentage = parseFloat(((item.count / total) * 100).toFixed(2)); }); } // 确保结果包含主要平台,即使没有数据 const mainPlatforms = ['Instagram', 'TikTok', 'Twitter', 'Facebook', 'YouTube']; mainPlatforms.forEach(platform => { if (!platforms.some(item => item.platform.toLowerCase() === platform.toLowerCase())) { platforms.push({ platform, count: 0, percentage: 0 }); } }); // 按数量降序排序 platforms.sort((a, b) => b.count - a.count); const result_data: PlatformDistributionResponse = { data: platforms, total }; const duration = Date.now() - startTime; logger.info('Platform distribution data fetched successfully', { duration, platformCount: platforms.length, total }); return result_data; } catch (error) { const duration = Date.now() - startTime; logger.error(`Error in getPlatformDistribution (${duration}ms)`, error); // 发生错误时返回空数据 return { data: [], total: 0 }; } } /** * 获取情感分析详情数据 * @param timeRange 时间范围(天数) * @param projectId 可选项目ID * @param platform 可选平台 * @returns 情感分析详情数据 */ async getSentimentAnalysis( timeRange: number, projectId?: string, platform?: string ): Promise { const startTime = Date.now(); logger.info('Fetching sentiment analysis data', { timeRange, projectId, platform }); try { // 计算时间范围 const currentDate = new Date(); const pastDate = new Date(); pastDate.setDate(currentDate.getDate() - timeRange); // 格式化日期 const currentDateStr = this.formatDateForClickhouse(currentDate); const pastDateStr = this.formatDateForClickhouse(pastDate); // 构建过滤条件 const filters: string[] = []; filters.push(`date BETWEEN '${pastDateStr}' AND '${currentDateStr}'`); filters.push(`sentiment IN ('positive', 'neutral', 'negative')`); if (projectId) { filters.push(`project_id = '${projectId}'`); } if (platform) { filters.push(`platform = '${platform}'`); } const filterCondition = filters.join(' AND '); // 查询情感分布 const query = ` SELECT sentiment, count() AS count FROM events WHERE ${filterCondition} GROUP BY sentiment `; // 查询情感得分平均值 const averageQuery = ` SELECT avg(CASE WHEN sentiment = 'positive' THEN 1 WHEN sentiment = 'neutral' THEN 0 WHEN sentiment = 'negative' THEN -1 ELSE NULL END) AS avg_score FROM events WHERE ${filterCondition} `; logger.debug('Executing ClickHouse queries for sentiment analysis', { query: query.replace(/\n\s+/g, ' ').trim(), averageQuery: averageQuery.replace(/\n\s+/g, ' ').trim() }); // 执行查询 const [sentimentData, averageData] = await Promise.all([ this.executeClickhouseQuery(query), this.executeClickhouseQuery(averageQuery) ]); // 初始化结果 let positiveCount = 0; let neutralCount = 0; let negativeCount = 0; // 处理情感分布数据 if (Array.isArray(sentimentData)) { sentimentData.forEach(item => { if (item.sentiment && item.count !== undefined) { const count = Number(item.count); if (item.sentiment === 'positive') { positiveCount = count; } else if (item.sentiment === 'neutral') { neutralCount = count; } else if (item.sentiment === 'negative') { negativeCount = count; } } }); } // 计算总数和百分比 const total = positiveCount + neutralCount + negativeCount; const positivePercentage = total > 0 ? (positiveCount / total) * 100 : 0; const neutralPercentage = total > 0 ? (neutralCount / total) * 100 : 0; const negativePercentage = total > 0 ? (negativeCount / total) * 100 : 0; // 获取平均得分 const avgScore = Array.isArray(averageData) && averageData.length > 0 ? Number(averageData[0].avg_score || 0) : 0; const result: SentimentAnalysisData = { positive: { count: positiveCount, percentage: parseFloat(positivePercentage.toFixed(2)) }, neutral: { count: neutralCount, percentage: parseFloat(neutralPercentage.toFixed(2)) }, negative: { count: negativeCount, percentage: parseFloat(negativePercentage.toFixed(2)) }, total, average_score: parseFloat(avgScore.toFixed(2)) }; const duration = Date.now() - startTime; logger.info('Sentiment analysis data fetched successfully', { duration, total, avgScore }); return result; } catch (error) { const duration = Date.now() - startTime; logger.error(`Error in getSentimentAnalysis (${duration}ms)`, error); // 发生错误时返回空数据 return { positive: { count: 0, percentage: 0 }, neutral: { count: 0, percentage: 0 }, negative: { count: 0, percentage: 0 }, total: 0, average_score: 0 }; } } /** * 获取热门文章数据 * @param timeRange 时间范围(天数) * @param projectId 可选项目ID * @param platform 可选平台 * @param sortBy 排序字段 * @param limit 返回数量 * @returns 热门文章数据 */ async getPopularPosts( timeRange: number, projectId?: string, platform?: string, sortBy: string = 'engagement_count', limit: number = 10 ): Promise { const startTime = Date.now(); logger.info('Fetching popular posts data', { timeRange, projectId, platform, sortBy, limit }); try { // 计算时间范围 const currentDate = new Date(); const pastDate = new Date(); pastDate.setDate(currentDate.getDate() - timeRange); // 格式化日期 const currentDateStr = this.formatDateForClickhouse(currentDate); const pastDateStr = this.formatDateForClickhouse(pastDate); // 构建帖文过滤条件 const postFilters: string[] = []; if (platform) { postFilters.push(`p.platform = '${platform}'`); } if (projectId) { postFilters.push(`p.project_id = '${projectId}'`); } postFilters.push(`p.date BETWEEN '${pastDateStr}' AND '${currentDateStr}'`); const postFilterCondition = postFilters.length > 0 ? 'WHERE ' + postFilters.join(' AND ') : ''; // 获取帖文和互动数据 const query = ` WITH post_metrics AS ( SELECT content_id AS post_id, countIf(event_type = 'view') AS views_count, countIf(event_type = 'like' OR event_type = 'comment' OR event_type = 'share') AS engagement_count FROM events WHERE date BETWEEN '${pastDateStr}' AND '${currentDateStr}' AND content_id IS NOT NULL ${projectId ? `AND project_id = '${projectId}'` : ''} ${platform ? `AND platform = '${platform}'` : ''} GROUP BY post_id ) SELECT p.post_id, p.title, p.platform, p.influencer_id, i.name AS influencer_name, toString(p.created_at) AS publish_date, pm.engagement_count, pm.views_count, multiIf(pm.views_count > 0, pm.engagement_count / pm.views_count, 0) AS engagement_rate FROM posts p LEFT JOIN influencers i ON p.influencer_id = i.influencer_id LEFT JOIN post_metrics pm ON p.post_id = pm.post_id ${postFilterCondition} ORDER BY ${sortBy === 'engagement_rate' ? 'engagement_rate' : 'engagement_count'} DESC LIMIT ${limit} `; // 查询符合条件的帖文总数 const countQuery = ` SELECT count() AS total FROM posts p ${postFilterCondition} `; logger.debug('Executing ClickHouse queries for popular posts', { query: query.replace(/\n\s+/g, ' ').trim(), countQuery: countQuery.replace(/\n\s+/g, ' ').trim() }); // 执行查询 const [postsData, countData] = await Promise.all([ this.executeClickhouseQuery(query), this.executeClickhouseQuery(countQuery) ]); // 计算总数 const total = Array.isArray(countData) && countData.length > 0 ? Number(countData[0].total || 0) : 0; // 计算高互动率阈值(互动率大于帖文平均互动率的150%) let averageEngagementRate = 0; let engagementRates: number[] = []; if (Array.isArray(postsData)) { engagementRates = postsData .map(post => Number(post.engagement_rate || 0)) .filter(rate => rate > 0); if (engagementRates.length > 0) { averageEngagementRate = engagementRates.reduce((a, b) => a + b, 0) / engagementRates.length; } } const highEngagementThreshold = averageEngagementRate * 1.5; // 转换结果 const posts: PopularPostItem[] = []; if (Array.isArray(postsData)) { postsData.forEach(post => { const engagementCount = Number(post.engagement_count || 0); const viewsCount = Number(post.views_count || 0); const engagementRate = viewsCount > 0 ? engagementCount / viewsCount : 0; posts.push({ post_id: post.post_id, title: post.title || '无标题', platform: post.platform || 'unknown', influencer_id: post.influencer_id, influencer_name: post.influencer_name || '未知KOL', publish_date: post.publish_date || '', engagement_count: engagementCount, views_count: viewsCount, engagement_rate: parseFloat(engagementRate.toFixed(4)), is_high_engagement: engagementRate > highEngagementThreshold }); }); } const result: PopularPostsResponse = { posts, total }; const duration = Date.now() - startTime; logger.info('Popular posts data fetched successfully', { duration, resultCount: posts.length, total }); return result; } catch (error) { const duration = Date.now() - startTime; logger.error(`Error in getPopularPosts (${duration}ms)`, error); // 发生错误时返回空数据 return { posts: [], total: 0 }; } } } // Export singleton instance export const analyticsService = new AnalyticsService();