Files
promote/backend/src/routes/analytics.ts.bak
2025-03-10 18:55:33 +08:00

802 lines
22 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { Hono } from 'hono';
import { authMiddleware } from '../middlewares/auth';
import clickhouse from '../utils/clickhouse';
import { addAnalyticsJob } from '../utils/queue';
import { getRedisClient } from '../utils/redis';
import supabase from '../utils/supabase';
import {
scheduleInfluencerCollection,
schedulePostCollection,
removeScheduledJob,
getScheduledJobs
} from '../utils/scheduledTasks';
// Define user type
interface User {
id: string;
email: string;
name?: string;
}
// Extend Hono's Context type
declare module 'hono' {
interface ContextVariableMap {
user: User;
}
}
const analyticsRouter = new Hono();
// Apply auth middleware to all routes
analyticsRouter.use('*', authMiddleware);
// Track a view event
analyticsRouter.post('/view', async (c) => {
try {
const { content_id } = await c.req.json();
const user = c.get('user');
if (!content_id) {
return c.json({ error: 'Content ID is required' }, 400);
}
// Get IP and user agent
const ip = c.req.header('x-forwarded-for') || c.req.header('x-real-ip') || '0.0.0.0';
const userAgent = c.req.header('user-agent') || 'unknown';
// Insert view event into ClickHouse
await clickhouse.query({
query: `
INSERT INTO promote.view_events (user_id, content_id, ip, user_agent)
VALUES (?, ?, ?, ?)
`,
values: [
user.id,
content_id,
ip,
userAgent
]
});
// Queue analytics processing job
await addAnalyticsJob('process_views', {
user_id: user.id,
content_id,
timestamp: new Date().toISOString()
});
// Increment view count in Redis cache
const redis = await getRedisClient();
await redis.incr(`views:${content_id}`);
return c.json({ message: 'View tracked successfully' });
} catch (error) {
console.error('View tracking error:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// Track a like event
analyticsRouter.post('/like', async (c) => {
try {
const { content_id, action } = await c.req.json();
const user = c.get('user');
if (!content_id || !action) {
return c.json({ error: 'Content ID and action are required' }, 400);
}
if (action !== 'like' && action !== 'unlike') {
return c.json({ error: 'Action must be either "like" or "unlike"' }, 400);
}
// Insert like event into ClickHouse
await clickhouse.query({
query: `
INSERT INTO promote.like_events (user_id, content_id, action)
VALUES (?, ?, ?)
`,
values: [
user.id,
content_id,
action === 'like' ? 1 : 2
]
});
// Queue analytics processing job
await addAnalyticsJob('process_likes', {
user_id: user.id,
content_id,
action,
timestamp: new Date().toISOString()
});
// Update like count in Redis cache
const redis = await getRedisClient();
const likeKey = `likes:${content_id}`;
if (action === 'like') {
await redis.incr(likeKey);
} else {
await redis.decr(likeKey);
}
return c.json({ message: `${action} tracked successfully` });
} catch (error) {
console.error('Like tracking error:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// Track a follow event
analyticsRouter.post('/follow', async (c) => {
try {
const { followed_id, action } = await c.req.json();
const user = c.get('user');
if (!followed_id || !action) {
return c.json({ error: 'Followed ID and action are required' }, 400);
}
if (action !== 'follow' && action !== 'unfollow') {
return c.json({ error: 'Action must be either "follow" or "unfollow"' }, 400);
}
// Insert follower event into ClickHouse
await clickhouse.query({
query: `
INSERT INTO promote.follower_events (follower_id, followed_id, action)
VALUES (?, ?, ?)
`,
values: [
user.id,
followed_id,
action === 'follow' ? 1 : 2
]
});
// Queue analytics processing job
await addAnalyticsJob('process_followers', {
follower_id: user.id,
followed_id,
action,
timestamp: new Date().toISOString()
});
// Update follower count in Redis cache
const redis = await getRedisClient();
const followerKey = `followers:${followed_id}`;
if (action === 'follow') {
await redis.incr(followerKey);
} else {
await redis.decr(followerKey);
}
return c.json({ message: `${action} tracked successfully` });
} catch (error) {
console.error('Follow tracking error:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// Get analytics for a content
analyticsRouter.get('/content/:id', async (c) => {
try {
const contentId = c.req.param('id');
// Get counts from Redis cache
const redis = await getRedisClient();
const [views, likes] = await Promise.all([
redis.get(`views:${contentId}`),
redis.get(`likes:${contentId}`)
]);
return c.json({
content_id: contentId,
views: parseInt(views || '0'),
likes: parseInt(likes || '0')
});
} catch (error) {
console.error('Content analytics error:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// Get analytics for a user
analyticsRouter.get('/user/:id', async (c) => {
try {
const userId = c.req.param('id');
// Get follower count from Redis cache
const redis = await getRedisClient();
const followers = await redis.get(`followers:${userId}`);
// Get content view and like counts from ClickHouse
const viewsResult = await clickhouse.query({
query: `
SELECT content_id, COUNT(*) as view_count
FROM promote.view_events
WHERE user_id = ?
GROUP BY content_id
`,
values: [userId]
});
const likesResult = await clickhouse.query({
query: `
SELECT content_id, SUM(CASE WHEN action = 1 THEN 1 ELSE -1 END) as like_count
FROM promote.like_events
WHERE user_id = ?
GROUP BY content_id
`,
values: [userId]
});
// Extract data from results
const viewsData = 'rows' in viewsResult ? viewsResult.rows : [];
const likesData = 'rows' in likesResult ? likesResult.rows : [];
return c.json({
user_id: userId,
followers: parseInt(followers || '0'),
content_analytics: {
views: viewsData,
likes: likesData
}
});
} catch (error) {
console.error('User analytics error:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// 社群分析相关路由
// 获取项目的顶级影响者
analyticsRouter.get('/project/:id/top-influencers', async (c) => {
try {
const projectId = c.req.param('id');
// 从ClickHouse查询项目的顶级影响者
const result = await clickhouse.query({
query: `
SELECT
influencer_id,
SUM(metric_value) AS total_views
FROM events
WHERE
project_id = ? AND
event_type = 'post_view_change'
GROUP BY influencer_id
ORDER BY total_views DESC
LIMIT 10
`,
values: [projectId]
});
// 提取数据
const influencerData = 'rows' in result ? result.rows : [];
// 如果有数据从Supabase获取影响者详细信息
if (influencerData.length > 0) {
const influencerIds = influencerData.map((item: any) => item.influencer_id);
const { data: influencerDetails, error } = await supabase
.from('influencers')
.select('influencer_id, name, platform, followers_count, video_count')
.in('influencer_id', influencerIds);
if (error) {
console.error('Error fetching influencer details:', error);
return c.json({ error: 'Error fetching influencer details' }, 500);
}
// 合并数据
const enrichedData = influencerData.map((item: any) => {
const details = influencerDetails?.find(
(detail) => detail.influencer_id === item.influencer_id
) || {};
return {
...item,
...details
};
});
return c.json(enrichedData);
}
return c.json(influencerData);
} catch (error) {
console.error('Error fetching top influencers:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// 获取影响者的粉丝变化趋势过去6个月
analyticsRouter.get('/influencer/:id/follower-trend', async (c) => {
try {
const influencerId = c.req.param('id');
// 从ClickHouse查询影响者的粉丝变化趋势
const result = await clickhouse.query({
query: `
SELECT
toStartOfMonth(timestamp) AS month,
SUM(metric_value) AS follower_change
FROM events
WHERE
influencer_id = ? AND
event_type = 'follower_change' AND
timestamp >= subtractMonths(now(), 6)
GROUP BY month
ORDER BY month ASC
`,
values: [influencerId]
});
// 提取数据
const trendData = 'rows' in result ? result.rows : [];
return c.json({
influencer_id: influencerId,
follower_trend: trendData
});
} catch (error) {
console.error('Error fetching follower trend:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// 获取网红增长趋势(支持不同指标和时间粒度)
analyticsRouter.get('/influencer/:id/growth', async (c) => {
try {
const influencerId = c.req.param('id');
const {
metric = 'followers_count',
timeframe = '6months',
interval = 'month'
} = c.req.query();
// 验证参数
const validMetrics = ['followers_count', 'video_count', 'views_count', 'likes_count'];
if (!validMetrics.includes(metric)) {
return c.json({ error: 'Invalid metric specified' }, 400);
}
// 确定时间范围和间隔函数
let timeRangeSql: string;
let intervalFunction: string;
switch (timeframe) {
case '30days':
timeRangeSql = 'timestamp >= subtractDays(now(), 30)';
break;
case '90days':
timeRangeSql = 'timestamp >= subtractDays(now(), 90)';
break;
case '6months':
default:
timeRangeSql = 'timestamp >= subtractMonths(now(), 6)';
break;
case '1year':
timeRangeSql = 'timestamp >= subtractYears(now(), 1)';
break;
}
switch (interval) {
case 'day':
intervalFunction = 'toDate(timestamp)';
break;
case 'week':
intervalFunction = 'toStartOfWeek(timestamp)';
break;
case 'month':
default:
intervalFunction = 'toStartOfMonth(timestamp)';
break;
}
// 从ClickHouse查询数据
const result = await clickhouse.query({
query: `
SELECT
${intervalFunction} AS time_period,
sumIf(metric_value, metric_name = ?) AS change,
maxIf(metric_total, metric_name = ?) AS total_value
FROM promote.events
WHERE
influencer_id = ? AND
event_type = ? AND
${timeRangeSql}
GROUP BY time_period
ORDER BY time_period ASC
`,
values: [
metric,
metric,
influencerId,
`${metric}_change`
]
});
// 提取数据
const trendData = 'rows' in result ? result.rows : [];
// 获取网红基本信息
const { data: influencerInfo, error } = await supabase
.from('influencers')
.select('name, platform, followers_count, video_count')
.eq('influencer_id', influencerId)
.single();
if (error) {
console.error('Error fetching influencer details:', error);
}
return c.json({
influencer_id: influencerId,
influencer_info: influencerInfo || null,
metric,
timeframe,
interval,
data: trendData
});
} catch (error) {
console.error('Error fetching influencer growth trend:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// 获取帖子的点赞变化过去30天
analyticsRouter.get('/post/:id/like-trend', async (c) => {
try {
const postId = c.req.param('id');
// 从ClickHouse查询帖子的点赞变化
const result = await clickhouse.query({
query: `
SELECT
toDate(timestamp) AS day,
SUM(metric_value) AS like_change
FROM events
WHERE
post_id = ? AND
event_type = 'post_like_change' AND
timestamp >= subtractDays(now(), 30)
GROUP BY day
ORDER BY day ASC
`,
values: [postId]
});
// 提取数据
const trendData = 'rows' in result ? result.rows : [];
return c.json({
post_id: postId,
like_trend: trendData
});
} catch (error) {
console.error('Error fetching like trend:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// 获取影响者详细信息
analyticsRouter.get('/influencer/:id/details', async (c) => {
try {
const influencerId = c.req.param('id');
// 从Supabase获取影响者详细信息
const { data, error } = await supabase
.from('influencers')
.select('influencer_id, name, platform, profile_url, external_id, followers_count, video_count, platform_count, created_at')
.eq('influencer_id', influencerId)
.single();
if (error) {
console.error('Error fetching influencer details:', error);
return c.json({ error: 'Error fetching influencer details' }, 500);
}
if (!data) {
return c.json({ error: 'Influencer not found' }, 404);
}
return c.json(data);
} catch (error) {
console.error('Error fetching influencer details:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// 获取影响者的帖子列表
analyticsRouter.get('/influencer/:id/posts', async (c) => {
try {
const influencerId = c.req.param('id');
// 从Supabase获取影响者的帖子列表
const { data, error } = await supabase
.from('posts')
.select('post_id, influencer_id, platform, post_url, title, description, published_at, created_at')
.eq('influencer_id', influencerId)
.order('published_at', { ascending: false });
if (error) {
console.error('Error fetching influencer posts:', error);
return c.json({ error: 'Error fetching influencer posts' }, 500);
}
return c.json(data || []);
} catch (error) {
console.error('Error fetching influencer posts:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// 获取帖子的评论列表
analyticsRouter.get('/post/:id/comments', async (c) => {
try {
const postId = c.req.param('id');
// 从Supabase获取帖子的评论列表
const { data, error } = await supabase
.from('comments')
.select('comment_id, post_id, user_id, content, sentiment_score, created_at')
.eq('post_id', postId)
.order('created_at', { ascending: false });
if (error) {
console.error('Error fetching post comments:', error);
return c.json({ error: 'Error fetching post comments' }, 500);
}
return c.json(data || []);
} catch (error) {
console.error('Error fetching post comments:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// 获取项目的平台分布
analyticsRouter.get('/project/:id/platform-distribution', async (c) => {
try {
const projectId = c.req.param('id');
// 从ClickHouse查询项目的平台分布
const result = await clickhouse.query({
query: `
SELECT
platform,
COUNT(DISTINCT influencer_id) AS influencer_count
FROM events
WHERE project_id = ?
GROUP BY platform
ORDER BY influencer_count DESC
`,
values: [projectId]
});
// 提取数据
const distributionData = 'rows' in result ? result.rows : [];
return c.json({
project_id: projectId,
platform_distribution: distributionData
});
} catch (error) {
console.error('Error fetching platform distribution:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// 获取项目的互动类型分布
analyticsRouter.get('/project/:id/interaction-types', async (c) => {
try {
const projectId = c.req.param('id');
// 从ClickHouse查询项目的互动类型分布
const result = await clickhouse.query({
query: `
SELECT
event_type,
COUNT(*) AS event_count,
SUM(metric_value) AS total_value
FROM events
WHERE
project_id = ? AND
event_type IN ('click', 'comment', 'share')
GROUP BY event_type
ORDER BY event_count DESC
`,
values: [projectId]
});
// 提取数据
const interactionData = 'rows' in result ? result.rows : [];
return c.json({
project_id: projectId,
interaction_types: interactionData
});
} catch (error) {
console.error('Error fetching interaction types:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// ===== Scheduled Collection Endpoints =====
// Schedule automated data collection for an influencer
analyticsRouter.post('/schedule/influencer', async (c) => {
try {
const { influencer_id, cron_expression } = await c.req.json();
if (!influencer_id) {
return c.json({ error: 'Influencer ID is required' }, 400);
}
// Validate that the influencer exists
const { data, error } = await supabase
.from('influencers')
.select('influencer_id')
.eq('influencer_id', influencer_id)
.single();
if (error || !data) {
return c.json({ error: 'Influencer not found' }, 404);
}
// Schedule the collection job
await scheduleInfluencerCollection(
influencer_id,
cron_expression || '0 0 * * *' // Default: Every day at midnight
);
return c.json({
message: 'Influencer metrics collection scheduled successfully',
influencer_id,
cron_expression: cron_expression || '0 0 * * *'
});
} catch (error) {
console.error('Error scheduling influencer collection:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// Schedule automated data collection for a post
analyticsRouter.post('/schedule/post', async (c) => {
try {
const { post_id, cron_expression } = await c.req.json();
if (!post_id) {
return c.json({ error: 'Post ID is required' }, 400);
}
// Validate that the post exists
const { data, error } = await supabase
.from('posts')
.select('post_id')
.eq('post_id', post_id)
.single();
if (error || !data) {
return c.json({ error: 'Post not found' }, 404);
}
// Schedule the collection job
await schedulePostCollection(
post_id,
cron_expression || '0 0 * * *' // Default: Every day at midnight
);
return c.json({
message: 'Post metrics collection scheduled successfully',
post_id,
cron_expression: cron_expression || '0 0 * * *'
});
} catch (error) {
console.error('Error scheduling post collection:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// Get all scheduled collection jobs
analyticsRouter.get('/schedule', async (c) => {
try {
const scheduledJobs = await getScheduledJobs();
return c.json({
scheduled_jobs: scheduledJobs
});
} catch (error) {
console.error('Error fetching scheduled jobs:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// Delete a scheduled collection job
analyticsRouter.delete('/schedule/:job_id', async (c) => {
try {
const jobId = c.req.param('job_id');
await removeScheduledJob(jobId);
return c.json({
message: 'Scheduled job removed successfully',
job_id: jobId
});
} catch (error) {
console.error('Error removing scheduled job:', error);
return c.json({ error: 'Internal server error' }, 500);
}
});
// ===== Data Export Endpoints =====
// Export influencer growth data (CSV format)
analyticsRouter.get('/export/influencer/:id/growth', async (c) => {
try {
const influencerId = c.req.param('id');
const {
metric = 'followers_count',
timeframe = '6months',
interval = 'month'
} = c.req.query();
// The same logic as the influencer growth endpoint, but return CSV format
// Validate parameters
const validMetrics = ['followers_count', 'video_count', 'views_count', 'likes_count'];
if (!validMetrics.includes(metric)) {
return c.json({ error: 'Invalid metric specified' }, 400);
}
// Determine time range and interval function
let timeRangeSql: string;
let intervalFunction: string;
switch (timeframe) {
case '30days':
timeRangeSql = 'timestamp >= subtractDays(now(), 30)';
break;
case '90days':
timeRangeSql = 'timestamp >= subtractDays(now(), 90)';
break;
case '6months':
default:
timeRangeSql = 'timestamp >= subtractMonths(now(), 6)';
break;
case '1year':
timeRangeSql = 'timestamp >= subtractYears(now(), 1)';
break;
}
switch (interval) {
case 'day':
intervalFunction = 'toDate(timestamp)';
break;
case 'week':
intervalFunction = 'toStartOfWeek(timestamp)';
break;
case 'month':
default:
intervalFunction = 'toStartOfMonth(timestamp)';
break;
}
// Query ClickHouse for data
const result = await clickhouse.query({
query: `
SELECT
${intervalFunction} AS time_period,
sumIf(metric_value, metric_name = ?) AS change,
maxIf(metric_total, metric_name = ?) AS total_value
FROM promote.events
WHERE
influencer_id = ? AND
event_type = ? AND
${timeRangeSql}
GROUP BY time_period
ORDER BY time_period ASC
`