kol overview

This commit is contained in:
2025-03-13 18:17:27 +08:00
parent f5c660217a
commit 6d29a208f1
7 changed files with 806 additions and 1 deletions

View File

@@ -0,0 +1,374 @@
import clickhouse from '../utils/clickhouse';
import { logger } from '../utils/logger';
import { ResultSet } from '@clickhouse/client';
/**
* Represents a KOL's performance overview data
*/
export interface KolPerformanceData {
influencer_id: string;
name: string;
platform: string;
profile_url: string;
followers_count: number;
followers_change: number;
followers_change_percentage: number | null;
likes_change: number;
likes_change_percentage: number | null;
follows_change: number;
follows_change_percentage: number | null;
}
/**
* Response structure for KOL performance overview
*/
export interface KolPerformanceResponse {
kols: KolPerformanceData[];
total: number;
}
/**
* Analytics service for KOL performance data
*/
export class AnalyticsService {
/**
* Get KOL performance overview from ClickHouse
* @param timeRange Number of days to look back (7, 30, or 90)
* @param projectId Optional project ID to filter by
* @param sortBy Field to sort by (followers_change, likes_change, follows_change)
* @param sortOrder Sort order (asc or desc)
* @param limit Number of KOLs to return
* @param offset Offset for pagination
* @returns KOL performance overview data
*/
async getKolPerformanceOverview(
timeRange: number,
projectId?: string,
sortBy: string = 'followers_change',
sortOrder: string = 'desc',
limit: number = 20,
offset: number = 0
): Promise<KolPerformanceResponse> {
const startTime = Date.now();
logger.info('Fetching KOL performance overview', { timeRange, projectId, sortBy, sortOrder, limit, offset });
try {
// First let's check if we have any data in the tables
await this.checkDataExistence();
// Calculate date for time range
const currentDate = new Date();
const pastDate = new Date();
pastDate.setDate(currentDate.getDate() - timeRange);
// Format dates for SQL
const currentDateStr = this.formatDateForClickhouse(currentDate);
const pastDateStr = this.formatDateForClickhouse(pastDate);
// Build project filter condition
const projectFilter = projectId ? `AND project_id = '${projectId}'` : '';
// First get the influencers
const influencersQuery = `
SELECT
influencer_id,
name,
platform,
profile_url,
followers_count
FROM
influencers
WHERE
1=1 ${projectFilter}
ORDER BY
followers_count DESC
LIMIT ${limit}
OFFSET ${offset}
`;
// Query to get total count for pagination
const countQuery = `
SELECT
COUNT(*) as total
FROM
influencers
WHERE
1=1 ${projectFilter}
`;
logger.debug('Executing ClickHouse queries for influencers', {
influencersQuery: influencersQuery.replace(/\n\s+/g, ' ').trim(),
countQuery: countQuery.replace(/\n\s+/g, ' ').trim()
});
// Execute the influencers and count queries
const [influencersData, countResult] = await Promise.all([
this.executeClickhouseQuery(influencersQuery),
this.executeClickhouseQuery(countQuery)
]);
// If we have no influencers, return empty result
if (!Array.isArray(influencersData) || influencersData.length === 0) {
logger.info('No influencers found for the given criteria');
return {
kols: [],
total: 0
};
}
// Get the list of influencer IDs to fetch events for
const influencerIds = influencersData.map(inf => inf.influencer_id);
const influencerIdsList = influencerIds.map(id => `'${id}'`).join(', ');
// Now fetch the event metrics for these influencers
const eventsQuery = `
SELECT
influencer_id,
-- Current period metrics
SUM(CASE WHEN event_type = 'follow' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) -
SUM(CASE WHEN event_type = 'unfollow' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) AS followers_change,
SUM(CASE WHEN event_type = 'like' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) -
SUM(CASE WHEN event_type = 'unlike' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) AS likes_change,
SUM(CASE WHEN event_type = 'follow' AND date BETWEEN '${pastDateStr}' AND '${currentDateStr}' THEN 1 ELSE 0 END) AS follows_change
FROM
events
WHERE
influencer_id IN (${influencerIdsList})
GROUP BY
influencer_id
`;
logger.debug('Executing ClickHouse query for events', {
eventsQuery: eventsQuery.replace(/\n\s+/g, ' ').trim()
});
// Execute the events query
const eventsData = await this.executeClickhouseQuery(eventsQuery);
// Create a map of influencer_id to events data for fast lookup
const eventsMap: Record<string, any> = {};
if (Array.isArray(eventsData)) {
eventsData.forEach(event => {
if (event.influencer_id) {
eventsMap[event.influencer_id] = event;
}
});
}
// Combine the influencer data with events data
const kols = influencersData.map(influencer => {
const events = eventsMap[influencer.influencer_id] || {};
return {
influencer_id: String(influencer.influencer_id || ''),
name: String(influencer.name || ''),
platform: String(influencer.platform || ''),
profile_url: String(influencer.profile_url || ''),
followers_count: Number(influencer.followers_count || 0),
followers_change: Number(events.followers_change || 0),
followers_change_percentage: null, // We'll calculate this in a separate query if needed
likes_change: Number(events.likes_change || 0),
likes_change_percentage: null,
follows_change: Number(events.follows_change || 0),
follows_change_percentage: null
};
});
// Sort the results based on the requested sort field and order
kols.sort((a: any, b: any) => {
const aValue = a[sortBy] || 0;
const bValue = b[sortBy] || 0;
return sortOrder.toLowerCase() === 'asc'
? aValue - bValue
: bValue - aValue;
});
const total = this.parseCountResult(countResult);
const duration = Date.now() - startTime;
logger.info('KOL performance overview fetched successfully', {
duration,
kolCount: kols.length,
total
});
return {
kols,
total
};
} catch (error) {
const duration = Date.now() - startTime;
logger.error(`Error in getKolPerformanceOverview (${duration}ms)`, error);
throw error;
}
}
/**
* Check if tables contain data and log the results for diagnostics
*/
private async checkDataExistence(): Promise<void> {
try {
// Check if the influencers table has data
const influencerCountQuery = "SELECT COUNT(*) as count FROM influencers";
const eventsCountQuery = "SELECT COUNT(*) as count FROM events";
const [influencersResult, eventsResult] = await Promise.all([
this.executeClickhouseQuery(influencerCountQuery),
this.executeClickhouseQuery(eventsCountQuery)
]);
const influencersCount = Array.isArray(influencersResult) && influencersResult.length > 0
? influencersResult[0].count
: 0;
const eventsCount = Array.isArray(eventsResult) && eventsResult.length > 0
? eventsResult[0].count
: 0;
logger.info('Data existence check', {
influencers_count: influencersCount,
events_count: eventsCount
});
// Optional: Check for sample data
if (influencersCount > 0) {
const sampleQuery = "SELECT influencer_id, name, platform, profile_url, followers_count FROM influencers LIMIT 2";
const sampleResult = await this.executeClickhouseQuery(sampleQuery);
logger.info('Sample influencer data', { sample: sampleResult });
}
if (eventsCount > 0) {
const sampleQuery = `
SELECT
event_type,
COUNT(*) as count
FROM events
GROUP BY event_type
LIMIT 5
`;
const sampleResult = await this.executeClickhouseQuery(sampleQuery);
logger.info('Sample events data', { sample: sampleResult });
}
} catch (error) {
logger.warn('Error checking data existence', error);
// Don't throw, just continue with the query
}
}
/**
* Execute a ClickHouse query with proper error handling
* @param query SQL query to execute
* @returns Query result
*/
private async executeClickhouseQuery(query: string): Promise<any[]> {
try {
const result = await clickhouse.query({
query,
format: 'JSONEachRow'
});
// Handle different result formats
if ('json' in result) {
return await result.json();
} else if ('rows' in result) {
return result.rows;
}
return [];
} catch (error) {
logger.error('Error executing ClickHouse query', error);
// Re-throw to be handled by the caller
throw new Error(`ClickHouse query error: ${error instanceof Error ? error.message : String(error)}`);
}
}
/**
* Parse count result from ClickHouse
* @param data ClickHouse count query result
* @returns Total count
*/
private parseCountResult(data: any[]): number {
if (!Array.isArray(data) || data.length === 0) {
logger.warn('ClickHouse count result is invalid, returning 0');
return 0;
}
return Number(data[0]?.total || 0);
}
/**
* Format date for ClickHouse query
* @param date Date to format
* @returns Formatted date string (YYYY-MM-DD)
*/
private formatDateForClickhouse(date: Date): string {
return date.toISOString().split('T')[0];
}
/**
* Debug event data - only called if debug=true parameter is set
*/
async debugEventData(): Promise<void> {
try {
// Check events table
const eventCountQuery = "SELECT COUNT(*) as count FROM events";
const eventTypesQuery = "SELECT event_type, COUNT(*) as count FROM events GROUP BY event_type";
const sampleEventsQuery = "SELECT * FROM events LIMIT 5";
const sampleInfluencersEventsQuery = `
SELECT e.event_type, e.influencer_id, e.date, i.name
FROM events e
JOIN influencers i ON e.influencer_id = i.influencer_id
LIMIT 10
`;
const [eventCount, eventTypes, sampleEvents, joinedEvents] = await Promise.all([
this.executeClickhouseQuery(eventCountQuery),
this.executeClickhouseQuery(eventTypesQuery),
this.executeClickhouseQuery(sampleEventsQuery),
this.executeClickhouseQuery(sampleInfluencersEventsQuery).catch(() => [])
]);
logger.info('DEBUG: Event counts', {
total_events: eventCount[0]?.count || 0,
event_types: eventTypes,
});
logger.info('DEBUG: Sample events', {
sample_events: sampleEvents,
joined_events: joinedEvents
});
// Try executing our specific events query directly
const eventsQuery = `
SELECT
influencer_id,
SUM(CASE WHEN event_type = 'follow' THEN 1 ELSE 0 END) AS follows,
SUM(CASE WHEN event_type = 'unfollow' THEN 1 ELSE 0 END) AS unfollows,
SUM(CASE WHEN event_type = 'like' THEN 1 ELSE 0 END) AS likes,
SUM(CASE WHEN event_type = 'unlike' THEN 1 ELSE 0 END) AS unlikes
FROM events
GROUP BY influencer_id
LIMIT 10
`;
const eventsResult = await this.executeClickhouseQuery(eventsQuery).catch(err => {
logger.error('DEBUG: Error executing events query', err);
return [];
});
logger.info('DEBUG: Aggregated events', {
events_result: eventsResult
});
} catch (error) {
logger.warn('Error in debugEventData', error);
}
}
}
// Export singleton instance
export const analyticsService = new AnalyticsService();