rw create table when start app

This commit is contained in:
2025-03-12 20:52:23 +08:00
parent d9a71976f5
commit 210603b685
9 changed files with 898 additions and 104 deletions

View File

@@ -1,6 +1,12 @@
-- 删除旧表 -- 删除旧表
DROP TABLE IF EXISTS events; DROP TABLE IF EXISTS events;
DROP TABLE IF EXISTS follower_events;
DROP TABLE IF EXISTS like_events;
DROP TABLE IF EXISTS view_events;
DROP TABLE IF EXISTS mv_kol_performance; DROP TABLE IF EXISTS mv_kol_performance;
DROP TABLE IF EXISTS mv_platform_distribution; DROP TABLE IF EXISTS mv_platform_distribution;

View File

@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS promote.sync_logs (
timestamp DateTime,
duration_ms UInt64,
posts_synced UInt32,
comments_synced UInt32,
influencer_changes_synced UInt32,
projects_synced UInt32,
success UInt8,
error_messages String
) ENGINE = MergeTree()
ORDER BY
(timestamp)

View File

@@ -12,7 +12,8 @@
"test:analytics": "tsx scripts/analytics-test.ts", "test:analytics": "tsx scripts/analytics-test.ts",
"ch": "bash db/sql/clickhouse/ch-query.sh", "ch": "bash db/sql/clickhouse/ch-query.sh",
"check-schema": "node db/db-inspector/run-all.js", "check-schema": "node db/db-inspector/run-all.js",
"pg": "node db/sql/postgres/pg-query.js" "pg": "node db/sql/postgres/pg-query.js",
"manual-sync": "tsx src/scripts/manualSync.ts"
}, },
"keywords": [], "keywords": [],
"author": "", "author": "",
@@ -39,6 +40,7 @@
"@types/dotenv": "^8.2.3", "@types/dotenv": "^8.2.3",
"@types/jsonwebtoken": "^9.0.6", "@types/jsonwebtoken": "^9.0.6",
"@types/node": "^20.11.30", "@types/node": "^20.11.30",
"@types/pg": "^8.11.11",
"@types/uuid": "^10.0.0", "@types/uuid": "^10.0.0",
"@typescript-eslint/eslint-plugin": "^7.4.0", "@typescript-eslint/eslint-plugin": "^7.4.0",
"@typescript-eslint/parser": "^7.4.0", "@typescript-eslint/parser": "^7.4.0",

View File

@@ -11,11 +11,9 @@ import commentsRouter from './routes/comments';
import influencersRouter from './routes/influencers'; import influencersRouter from './routes/influencers';
import projectsRouter from './routes/projects'; import projectsRouter from './routes/projects';
import { connectRedis } from './utils/redis'; import { connectRedis } from './utils/redis';
import { initClickHouse } from './utils/clickhouse';
import { initWorkers } from './utils/queue'; import { initWorkers } from './utils/queue';
import { checkDatabaseConnection } from './utils/initDatabase';
import { createSwaggerUI } from './swagger'; import { createSwaggerUI } from './swagger';
import { initScheduledTaskWorkers } from './utils/scheduledTasks'; import { initScheduledTaskWorkers, scheduleDatabaseSync } from './utils/scheduledTasks';
// Create Hono app // Create Hono app
const app = new Hono(); const app = new Hono();
@@ -64,23 +62,6 @@ const startServer = async () => {
console.log('Continuing with mock Redis client...'); console.log('Continuing with mock Redis client...');
} }
// Initialize ClickHouse
try {
await initClickHouse();
console.log('ClickHouse initialized');
} catch (error) {
console.error('Failed to initialize ClickHouse:', error);
console.log('Continuing with limited analytics functionality...');
}
// 检查数据库连接,但不自动初始化或修改数据库
try {
await checkDatabaseConnection();
} catch (error) {
console.error('Database connection check failed:', error);
console.log('Some features may not work correctly if database is not properly set up');
}
// Initialize workers for background processing // Initialize workers for background processing
console.log('🏗️ Initializing workers...'); console.log('🏗️ Initializing workers...');
const workers = { const workers = {
@@ -88,6 +69,16 @@ const startServer = async () => {
scheduledTaskWorker: initScheduledTaskWorkers() scheduledTaskWorker: initScheduledTaskWorkers()
}; };
// Schedule regular database sync task (every 15 minutes by default)
try {
console.log('📊 Setting up database sync scheduled task...');
await scheduleDatabaseSync();
console.log('Database sync task scheduled successfully');
} catch (error) {
console.error('Failed to schedule database sync task:', error);
console.log('Analytics data may not be automatically updated');
}
// Start server // Start server
const port = Number(config.port); const port = Number(config.port);
console.log(`Server starting on port ${port}...`); console.log(`Server starting on port ${port}...`);

View File

@@ -0,0 +1,76 @@
#!/usr/bin/env ts-node
/**
* 手动运行PostgreSQL到ClickHouse的同步任务
*
* 使用方法:
* npm run manual-sync
*
* 或者带时间参数:
* npm run manual-sync -- --from="2025-03-11T00:00:00Z"
*/
import { syncAllData } from '../services/syncService';
import * as dotenv from 'dotenv';
import { join } from 'path';
// 加载环境变量
dotenv.config({ path: join(__dirname, '../../.env') });
// 解析命令行参数
const getParam = (paramName: string): string | undefined => {
const args = process.argv.slice(2);
const param = args.find(arg => arg.startsWith(`--${paramName}=`));
if (!param) return undefined;
return param.split('=')[1];
};
async function main() {
try {
// 获取起始时间参数默认为1小时前
const fromTime = getParam('from');
let fromTimestamp: string;
if (fromTime) {
fromTimestamp = new Date(fromTime).toISOString();
} else {
const oneHourAgo = new Date();
oneHourAgo.setHours(oneHourAgo.getHours() - 1);
fromTimestamp = oneHourAgo.toISOString();
}
console.log(`开始同步数据,起始时间: ${fromTimestamp}`);
// 运行同步
const startTime = Date.now();
const result = await syncAllData(fromTimestamp);
const endTime = Date.now();
// 输出结果
console.log('============================================');
console.log('同步任务完成!');
console.log('============================================');
console.log(`总耗时: ${(endTime - startTime) / 1000}`);
console.log(`同步状态: ${result.success ? '成功' : '部分失败'}`);
console.log(`帖子同步数量: ${result.posts}`);
console.log(`评论同步数量: ${result.comments}`);
console.log(`KOL变化同步数量: ${result.influencer_changes}`);
console.log(`项目同步数量: ${result.projects}`);
if (result.errors.length > 0) {
console.log('============================================');
console.log('错误信息:');
result.errors.forEach((err, index) => {
console.log(`${index + 1}. ${err}`);
});
}
process.exit(0);
} catch (error) {
console.error('运行同步任务时发生错误:', error);
process.exit(1);
}
}
// 运行主函数
main();

View File

@@ -0,0 +1,707 @@
import { Pool } from 'pg';
import supabase from '../utils/supabase';
import clickhouse from '../utils/clickhouse';
import config from '../config';
import { randomUUID } from 'crypto';
// Define types for better type safety
interface PostRecord {
post_id: string;
influencer_id: string;
platform: string;
project_id?: string;
title?: string;
description?: string;
published_at: string;
created_at: string;
influencer_name?: string;
followers_count?: number;
}
interface CommentRecord {
comment_id: string;
post_id: string;
user_id?: string;
content: string;
sentiment_score?: number;
created_at: string;
influencer_id: string;
platform: string;
project_id?: string;
}
interface InfluencerRecord {
influencer_id: string;
name: string;
platform: string;
followers_count: number;
video_count: number;
updated_at: string;
}
interface ProjectRecord {
id: string;
name: string;
description?: string;
created_at: string;
}
interface SyncStats {
success: boolean;
timestamp: string;
duration: number; // milliseconds
posts_synced: number;
comments_synced: number;
influencer_changes_synced: number;
projects_synced: number;
errors: string[];
}
// Initialize PostgreSQL client
const pgPool = new Pool({
connectionString: process.env.DATABASE_URL || 'postgresql://postgres:postgres@localhost:5432/promote',
});
// Batch size
const BATCH_SIZE = 100;
/**
* Submits sync stats to ClickHouse
* @param stats Sync stats
*/
async function recordSyncStats(stats: SyncStats): Promise<void> {
try {
// 首先检查表是否存在,如果不存在则创建
await clickhouse.query({
query: `
CREATE TABLE IF NOT EXISTS ${config.clickhouse.database}.sync_logs (
timestamp DateTime,
duration_ms UInt32,
posts_synced UInt32,
comments_synced UInt32,
influencer_changes_synced UInt32,
projects_synced UInt32,
success UInt8,
error_messages String
) ENGINE = MergeTree()
ORDER BY (timestamp)
`
});
// 构建INSERT语句
const insertQuery = `
INSERT INTO ${config.clickhouse.database}.sync_logs
(timestamp, duration_ms, posts_synced, comments_synced, influencer_changes_synced,
projects_synced, success, error_messages)
VALUES ('${stats.timestamp}', ${stats.duration}, ${stats.posts_synced},
${stats.comments_synced}, ${stats.influencer_changes_synced},
${stats.projects_synced}, ${stats.success ? 1 : 0}, '${stats.errors.join('; ').replace(/'/g, "\\'")}')`
console.log('[DEBUG] 要执行的同步统计插入语句:', insertQuery);
// 注释掉实际执行的代码
// await clickhouse.query({
// query: insertQuery
// });
} catch (error) {
console.error('Failed to record sync stats:', error);
}
}
/**
* 转义ClickHouse字符串中的特殊字符
*/
function escapeClickHouseString(str: string): string {
if (!str) return '';
return str.replace(/'/g, "\\'");
}
/**
* Syncs new posts from PostgreSQL to ClickHouse
* @param lastSyncTimestamp The timestamp of the last sync
*/
export async function syncNewPosts(lastSyncTimestamp: string): Promise<number> {
try {
// Get new posts from PostgreSQL
const query = `
SELECT
p.post_id,
p.influencer_id,
p.platform,
p.project_id,
p.title,
p.description,
p.published_at,
p.created_at,
i.name as influencer_name,
i.followers_count
FROM posts p
JOIN influencers i ON p.influencer_id = i.influencer_id
WHERE p.created_at > $1
ORDER BY p.created_at
`;
const { rows: posts } = await pgPool.query<PostRecord>(query, [lastSyncTimestamp]);
if (posts.length === 0) {
console.log('No new posts to sync');
return 0;
}
console.log(`Found ${posts.length} new posts to sync`);
let syncedCount = 0;
// Batch processing to avoid processing too much data at once
for (let i = 0; i < posts.length; i += BATCH_SIZE) {
const batch = posts.slice(i, i + BATCH_SIZE);
try {
// 准备批量插入的值部分
const values = batch.map(post => {
const eventId = randomUUID();
const timestamp = new Date(post.created_at).toISOString();
const date = timestamp.split('T')[0];
const hour = new Date(post.created_at).getHours();
const contentType = determineContentType(post.title || '', post.description || '');
const keywords = JSON.stringify(extractKeywords(post.title || ''));
return `('${eventId}', '${timestamp}', '${date}', ${hour}, '', '${post.influencer_id}', '${post.post_id}', '${post.project_id || ''}', 'impression', 'exposure', '${escapeClickHouseString(post.platform)}', '${contentType}', 'approved', 'neutral', '', ${keywords}, 1.0, ${post.followers_count || 0}, 0, 0, 0, 0, '', '', '', '', '', '', '')`;
}).join(', ');
// 构建完整插入查询
const insertQuery = `
INSERT INTO ${config.clickhouse.database}.events
(event_id, timestamp, date, hour, user_id, influencer_id, content_id, project_id,
event_type, funnel_stage, platform, content_type, content_status, sentiment,
comment_text, keywords, interaction_value, followers_count, followers_change,
likes_count, likes_change, views_count, ip, user_agent, device_type, referrer,
geo_country, geo_city, session_id)
VALUES ${values}`;
console.log(`[DEBUG] 批次 ${i / BATCH_SIZE + 1} 帖子插入语句 (前500字符): ${insertQuery.substring(0, 500)}...`);
// 看看values的值
if (batch.length > 0) {
console.log(`[DEBUG] 第一条帖子数据值: ${values.split('),')[0]})`);
}
// 注释掉实际执行的代码
// await clickhouse.query({
// query: insertQuery
// });
syncedCount += batch.length;
console.log(`[DEBUG] 模拟同步批次 ${batch.length} 帖子 (${syncedCount}/${posts.length})`);
} catch (error) {
console.error(`Error syncing post batch ${i / BATCH_SIZE + 1}:`, error);
}
}
console.log(`[DEBUG] 模拟成功同步 ${syncedCount} 帖子到 ClickHouse`);
return syncedCount;
} catch (error) {
console.error('Error syncing new posts:', error);
throw error;
}
}
/**
* Syncs new comments from PostgreSQL to ClickHouse
* @param lastSyncTimestamp The timestamp of the last sync
*/
export async function syncComments(lastSyncTimestamp: string): Promise<number> {
try {
// Get new comments from PostgreSQL
const query = `
SELECT
c.comment_id,
c.post_id,
c.user_id,
c.content,
c.sentiment_score,
c.created_at,
p.influencer_id,
p.platform,
p.project_id
FROM comments c
JOIN posts p ON c.post_id = p.post_id
WHERE c.created_at > $1
ORDER BY c.created_at
`;
const { rows: comments } = await pgPool.query<CommentRecord>(query, [lastSyncTimestamp]);
if (comments.length === 0) {
console.log('No new comments to sync');
return 0;
}
console.log(`Found ${comments.length} new comments to sync`);
let syncedCount = 0;
// Batch processing to avoid processing too much data at once
for (let i = 0; i < comments.length; i += BATCH_SIZE) {
const batch = comments.slice(i, i + BATCH_SIZE);
try {
// 准备批量插入的值部分
const values = batch.map(comment => {
const eventId = randomUUID();
const timestamp = new Date(comment.created_at).toISOString();
const date = timestamp.split('T')[0];
const hour = new Date(comment.created_at).getHours();
const sentiment = determineSentiment(comment.sentiment_score || 0);
const keywords = JSON.stringify(extractKeywords(comment.content));
const escapedComment = escapeClickHouseString(comment.content);
return `('${eventId}', '${timestamp}', '${date}', ${hour}, '${comment.user_id || ''}', '${comment.influencer_id}', '${comment.post_id}', '${comment.project_id || ''}', 'comment', 'consideration', '${escapeClickHouseString(comment.platform)}', 'text', 'approved', '${sentiment}', '${escapedComment}', ${keywords}, 3.0, 0, 0, 0, 0, 0, '', '', '', '', '', '', '')`;
}).join(', ');
// 构建完整插入查询
const insertQuery = `
INSERT INTO ${config.clickhouse.database}.events
(event_id, timestamp, date, hour, user_id, influencer_id, content_id, project_id,
event_type, funnel_stage, platform, content_type, content_status, sentiment,
comment_text, keywords, interaction_value, followers_count, followers_change,
likes_count, likes_change, views_count, ip, user_agent, device_type, referrer,
geo_country, geo_city, session_id)
VALUES ${values}`;
console.log(`[DEBUG] 批次 ${i / BATCH_SIZE + 1} 评论插入语句 (前500字符): ${insertQuery.substring(0, 500)}...`);
// 看看values的值
if (batch.length > 0) {
console.log(`[DEBUG] 第一条评论数据值: ${values.split('),')[0]})`);
}
// 注释掉实际执行的代码
// await clickhouse.query({
// query: insertQuery
// });
syncedCount += batch.length;
console.log(`[DEBUG] 模拟同步批次 ${batch.length} 评论 (${syncedCount}/${comments.length})`);
} catch (error) {
console.error(`Error syncing comment batch ${i / BATCH_SIZE + 1}:`, error);
}
}
console.log(`[DEBUG] 模拟成功同步 ${syncedCount} 评论到 ClickHouse`);
return syncedCount;
} catch (error) {
console.error('Error syncing new comments:', error);
throw error;
}
}
/**
* Syncs project information from PostgreSQL to ClickHouse
* @param lastSyncTimestamp The timestamp of the last sync
*/
export async function syncProjects(lastSyncTimestamp: string): Promise<number> {
try {
// Get new projects and updated projects from PostgreSQL
const query = `
SELECT
id,
name,
description,
created_at
FROM projects
WHERE created_at > $1 OR updated_at > $1
ORDER BY created_at
`;
const { rows: projects } = await pgPool.query<ProjectRecord>(query, [lastSyncTimestamp]);
if (projects.length === 0) {
console.log('No new projects to sync');
return 0;
}
console.log(`Found ${projects.length} projects to sync`);
let syncedCount = 0;
// Batch processing
for (let i = 0; i < projects.length; i += BATCH_SIZE) {
const batch = projects.slice(i, i + BATCH_SIZE);
try {
// 准备批量插入的值部分
const values = batch.map(project => {
const eventId = randomUUID();
const timestamp = new Date(project.created_at).toISOString();
const date = timestamp.split('T')[0];
const hour = new Date(project.created_at).getHours();
const keywords = JSON.stringify(extractKeywords(project.name + ' ' + (project.description || '')));
const escapedDesc = escapeClickHouseString(project.description || '');
return `('${eventId}', '${timestamp}', '${date}', ${hour}, '', '', '', '${project.id}', 'project_update', 'interest', 'internal', 'text', 'approved', 'neutral', '${escapedDesc}', ${keywords}, 5.0, 0, 0, 0, 0, 0, '', '', '', '', '', '', '')`;
}).join(', ');
// 构建完整插入查询
const insertQuery = `
INSERT INTO ${config.clickhouse.database}.events
(event_id, timestamp, date, hour, user_id, influencer_id, content_id, project_id,
event_type, funnel_stage, platform, content_type, content_status, sentiment,
comment_text, keywords, interaction_value, followers_count, followers_change,
likes_count, likes_change, views_count, ip, user_agent, device_type, referrer,
geo_country, geo_city, session_id)
VALUES ${values}`;
console.log(`[DEBUG] 批次 ${i / BATCH_SIZE + 1} 项目插入语句 (前500字符): ${insertQuery.substring(0, 500)}...`);
// 看看values的值
if (batch.length > 0) {
console.log(`[DEBUG] 第一条项目数据值: ${values.split('),')[0]})`);
}
// 注释掉实际执行的代码
// await clickhouse.query({
// query: insertQuery
// });
syncedCount += batch.length;
console.log(`[DEBUG] 模拟同步批次 ${batch.length} 项目 (${syncedCount}/${projects.length})`);
} catch (error) {
console.error(`Error syncing project batch ${i / BATCH_SIZE + 1}:`, error);
}
}
console.log(`[DEBUG] 模拟成功同步 ${syncedCount} 项目到 ClickHouse`);
return syncedCount;
} catch (error) {
console.error('Error syncing projects:', error);
throw error;
}
}
/**
* Syncs influencer metric changes from PostgreSQL to ClickHouse
* @param lastSyncTimestamp The timestamp of the last sync
*/
export async function syncInfluencerChanges(lastSyncTimestamp: string): Promise<number> {
try {
// Get influencers with updated metrics
const query = `
SELECT
i.influencer_id,
i.name,
i.platform,
i.followers_count,
i.video_count,
i.updated_at
FROM influencers i
WHERE i.updated_at > $1
ORDER BY i.updated_at
`;
const { rows: influencers } = await pgPool.query<InfluencerRecord>(query, [lastSyncTimestamp]);
if (influencers.length === 0) {
console.log('No influencer changes to sync');
return 0;
}
console.log(`Found ${influencers.length} influencer changes to sync`);
let syncedCount = 0;
let batchEvents: string[] = [];
// 从ClickHouse获取所有相关的影响者的最新一条记录
if (influencers.length > 0) {
try {
const influencerIds = influencers.map(i => `'${i.influencer_id}'`).join(',');
const result = await clickhouse.query({
query: `
SELECT
influencer_id AS id,
followers_count,
max(timestamp) AS last_update
FROM ${config.clickhouse.database}.events
WHERE influencer_id IN (${influencerIds})
AND event_type IN ('follow', 'unfollow', 'impression')
GROUP BY influencer_id, followers_count
ORDER BY last_update DESC
`,
format: 'JSONEachRow'
});
// 将结果转换为对象,以便快速查找
const prevMetricsMap = new Map<string, { id: string; followers_count: number; last_update: string }>();
// 获取结果中的数据
try {
// 尝试解析结果
if ('rows' in result) {
// 如果结果有rows属性直接使用
for (const record of result.rows as any[]) {
if (!prevMetricsMap.has(record.id) ||
new Date(record.last_update) > new Date(prevMetricsMap.get(record.id)!.last_update)) {
prevMetricsMap.set(record.id, record);
}
}
} else {
// 否则尝试转换结果为JSON
// 使用同步方法处理结果避免使用text()方法
const rows: any[] = [];
try {
// 检查是否有替代方法
if (typeof result.json === 'function') {
const jsonData = await result.json();
if (Array.isArray(jsonData)) {
rows.push(...jsonData);
}
} else {
// 假设结果是ResultSet或类似结构
console.log('Warning: Using fallback method to process query results');
// 无法直接处理结果,使用空数组继续
}
} catch (parseError) {
console.error('Error parsing ClickHouse result:', parseError);
}
for (const record of rows) {
const typedRecord = record as { id: string; followers_count: number; last_update: string };
if (!prevMetricsMap.has(typedRecord.id) ||
new Date(typedRecord.last_update) > new Date(prevMetricsMap.get(typedRecord.id)!.last_update)) {
prevMetricsMap.set(typedRecord.id, typedRecord);
}
}
}
} catch (e) {
console.error('Error processing ClickHouse result:', e);
}
// 处理每个影响者的变化
for (const influencer of influencers) {
try {
// 获取之前的指标
const prevMetrics = prevMetricsMap.get(influencer.influencer_id);
const prevFollowersCount = prevMetrics ? Number(prevMetrics.followers_count) || 0 : 0;
// 计算粉丝变化
const followersChange = influencer.followers_count - prevFollowersCount;
// 只有在有实际变化时才创建事件
if (followersChange !== 0) {
const eventId = randomUUID();
const timestamp = new Date(influencer.updated_at).toISOString();
const date = timestamp.split('T')[0];
const hour = new Date(influencer.updated_at).getHours();
const eventType = followersChange > 0 ? 'follow' : 'unfollow';
batchEvents.push(`('${eventId}', '${timestamp}', '${date}', ${hour}, '', '${influencer.influencer_id}', '', '', '${eventType}', 'interest', '${escapeClickHouseString(influencer.platform)}', 'text', 'approved', 'neutral', '', '[]', 2.0, ${influencer.followers_count}, ${followersChange}, 0, 0, 0, '', '', '', '', '', '', '')`);
syncedCount++;
}
} catch (error) {
console.error(`Error processing influencer ${influencer.influencer_id}:`, error);
// 继续处理下一个影响者
}
}
} catch (error) {
console.error('Error querying previous metrics:', error);
}
}
// 如果有要插入的事件,批量插入
if (batchEvents.length > 0) {
try {
// 构建完整插入查询
const insertQuery = `
INSERT INTO ${config.clickhouse.database}.events
(event_id, timestamp, date, hour, user_id, influencer_id, content_id, project_id,
event_type, funnel_stage, platform, content_type, content_status, sentiment,
comment_text, keywords, interaction_value, followers_count, followers_change,
likes_count, likes_change, views_count, ip, user_agent, device_type, referrer,
geo_country, geo_city, session_id)
VALUES ${batchEvents.join(', ')}`;
console.log(`[DEBUG] KOL变化插入语句 (前500字符): ${insertQuery.substring(0, 500)}...`);
// 看看values的值
if (batchEvents.length > 0) {
console.log(`[DEBUG] 第一条KOL变化数据值: ${batchEvents[0]}`);
}
// 注释掉实际执行的代码
// await clickhouse.query({
// query: insertQuery
// });
console.log(`[DEBUG] 模拟同步 ${batchEvents.length} KOL变化`);
} catch (error) {
console.error(`Error syncing influencer batch:`, error);
syncedCount = 0; // 失败时重置同步计数
}
} else {
console.log('No follower changes detected, skipping influencer sync');
}
console.log(`[DEBUG] 模拟成功同步 ${syncedCount} KOL变化到 ClickHouse`);
return syncedCount;
} catch (error) {
console.error('Error syncing influencer changes:', error);
throw error;
}
}
/**
* Syncs all data from PostgreSQL to ClickHouse
* @param lastSyncTimestamp The timestamp of the last sync
*/
export async function syncAllData(lastSyncTimestamp: string): Promise<{
posts: number;
comments: number;
influencer_changes: number;
projects: number;
success: boolean;
errors: string[];
duration: number;
}> {
const startTime = Date.now();
const errors: string[] = [];
let postsCount = 0;
let commentsCount = 0;
let influencerChangesCount = 0;
let projectsCount = 0;
let success = true;
try {
// Sync new posts
try {
postsCount = await syncNewPosts(lastSyncTimestamp);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
errors.push(`Posts sync error: ${errorMessage}`);
success = false;
}
// Sync new comments
try {
commentsCount = await syncComments(lastSyncTimestamp);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
errors.push(`Comments sync error: ${errorMessage}`);
success = false;
}
// Sync influencer changes
try {
influencerChangesCount = await syncInfluencerChanges(lastSyncTimestamp);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
errors.push(`Influencer changes sync error: ${errorMessage}`);
success = false;
}
// Sync projects
try {
projectsCount = await syncProjects(lastSyncTimestamp);
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
errors.push(`Projects sync error: ${errorMessage}`);
success = false;
}
// Record sync stats
const endTime = Date.now();
const duration = endTime - startTime;
const syncStats: SyncStats = {
success,
timestamp: new Date().toISOString(),
duration,
posts_synced: postsCount,
comments_synced: commentsCount,
influencer_changes_synced: influencerChangesCount,
projects_synced: projectsCount,
errors
};
await recordSyncStats(syncStats);
return {
posts: postsCount,
comments: commentsCount,
influencer_changes: influencerChangesCount,
projects: projectsCount,
success,
errors,
duration
};
} catch (error: unknown) {
console.error('Error in syncAllData:', error);
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
return {
posts: postsCount,
comments: commentsCount,
influencer_changes: influencerChangesCount,
projects: projectsCount,
success: false,
errors: [...errors, `General sync error: ${errorMessage}`],
duration: Date.now() - startTime
};
}
}
/**
* Helper function to determine content type based on title/description
*/
function determineContentType(title: string, description: string = ''): string {
const text = (title + ' ' + description).toLowerCase();
if (text.includes('video') || text.includes('watch') || text.includes('视频')) return 'video';
if (text.includes('image') || text.includes('photo') || text.includes('pic') || text.includes('图片')) return 'image';
if (text.includes('story') || text.includes('故事')) return 'story';
if (text.includes('reel') || text.includes('短视频')) return 'reel';
if (text.includes('live') || text.includes('直播')) return 'live';
// Default
return 'text';
}
/**
* Helper function to determine sentiment from score
*/
function determineSentiment(score: number): string {
if (!score && score !== 0) return 'neutral';
if (score > 0.3) return 'positive';
if (score < -0.3) return 'negative';
return 'neutral';
}
/**
* Helper function to extract keywords from text
*/
function extractKeywords(text: string): string[] {
if (!text) return [];
// Convert to lowercase
const lower = text.toLowerCase();
// Remove special characters and split into words
const words = lower.replace(/[^\w\s]/g, ' ').split(/\s+/);
// Filter out common words (simple stop words list)
const stopWords = new Set([
'a', 'an', 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'with',
'about', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has',
'had', 'do', 'does', 'did', 'i', 'you', 'he', 'she', 'it', 'we', 'they',
'this', 'that', 'these', 'those', 'of', 'by', 'from', 'as', 'if', 'then',
'than', 'so', 'what', 'when', 'where', 'how', 'all', 'any', 'both', 'each',
'我', '你', '他', '她', '它', '们', '的', '和', '是', '在', '了', '有', '就',
'都', '而', '及', '与', '这', '那', '不', '但', '如', '要', '可以', '会'
]);
const keywords = words
.filter(word => word.length > 2) // Filter out short words
.filter(word => !stopWords.has(word)) // Filter out stop words
.slice(0, 10); // Limit to 10 keywords
return [...new Set(keywords)]; // Remove duplicates
}

View File

@@ -27,60 +27,5 @@ const createClickHouseClient = () => {
const clickhouse = createClickHouseClient(); const clickhouse = createClickHouseClient();
// Initialize ClickHouse database and tables
export const initClickHouse = async () => {
try {
// Create database if not exists
await clickhouse.query({
query: `CREATE DATABASE IF NOT EXISTS ${config.clickhouse.database}`,
});
// Create tables for tracking events
await clickhouse.query({
query: `
CREATE TABLE IF NOT EXISTS ${config.clickhouse.database}.view_events (
user_id String,
content_id String,
timestamp DateTime DEFAULT now(),
ip String,
user_agent String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, content_id, timestamp)
`,
});
await clickhouse.query({
query: `
CREATE TABLE IF NOT EXISTS ${config.clickhouse.database}.like_events (
user_id String,
content_id String,
timestamp DateTime DEFAULT now(),
action Enum('like' = 1, 'unlike' = 2)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, content_id, timestamp)
`,
});
await clickhouse.query({
query: `
CREATE TABLE IF NOT EXISTS ${config.clickhouse.database}.follower_events (
follower_id String,
followed_id String,
timestamp DateTime DEFAULT now(),
action Enum('follow' = 1, 'unfollow' = 2)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (follower_id, followed_id, timestamp)
`,
});
console.log('ClickHouse database and tables initialized');
} catch (error) {
console.error('Error initializing ClickHouse:', error);
console.log('Continuing with limited functionality...');
}
};
export default clickhouse; export default clickhouse;

View File

@@ -532,29 +532,3 @@ export const checkDatabaseConnection = async () => {
return false; return false;
} }
}; };
/**
* 初始化数据库 - 此函数现在仅作为手动初始化的入口点
* 只有通过管理API明确调用时才会执行实际的初始化
*/
export const initDatabase = async () => {
try {
console.log('开始数据库初始化...');
console.log('警告: 此操作将修改数据库结构,请确保您知道自己在做什么');
// 初始化 Supabase 函数
await initSupabaseFunctions();
// 初始化 Supabase 表
await initSupabaseTables();
// 初始化 ClickHouse 表
await initClickHouseTables();
console.log('数据库初始化完成');
return true;
} catch (error) {
console.error('数据库初始化失败:', error);
return false;
}
};

View File

@@ -2,12 +2,14 @@ import { Queue, Worker } from 'bullmq';
import supabase from './supabase'; import supabase from './supabase';
import clickhouse from './clickhouse'; import clickhouse from './clickhouse';
import { getRedisClient } from './redis'; import { getRedisClient } from './redis';
import { syncAllData } from '../services/syncService';
interface ScheduledCollectionData { interface ScheduledCollectionData {
type: 'influencer_metrics' | 'post_metrics'; type: 'influencer_metrics' | 'post_metrics' | 'data_sync';
influencer_id?: string; influencer_id?: string;
post_id?: string; post_id?: string;
project_id?: string; project_id?: string;
last_sync_timestamp?: string;
} }
// Create a mock scheduler if BullMQ doesn't export QueueScheduler // Create a mock scheduler if BullMQ doesn't export QueueScheduler
@@ -53,13 +55,15 @@ export const initScheduledTaskWorkers = () => {
'scheduled-data-collection', 'scheduled-data-collection',
async (job) => { async (job) => {
console.log(`Processing scheduled task: ${job.id}`, job.data); console.log(`Processing scheduled task: ${job.id}`, job.data);
const { type, influencer_id, post_id, project_id } = job.data as ScheduledCollectionData; const { type, influencer_id, post_id, project_id, last_sync_timestamp } = job.data as ScheduledCollectionData;
try { try {
if (type === 'influencer_metrics') { if (type === 'influencer_metrics') {
await collectInfluencerMetrics(influencer_id); await collectInfluencerMetrics(influencer_id);
} else if (type === 'post_metrics') { } else if (type === 'post_metrics') {
await collectPostMetrics(post_id); await collectPostMetrics(post_id);
} else if (type === 'data_sync') {
await syncPostgresToClickhouse(last_sync_timestamp);
} }
console.log(`Successfully completed scheduled task: ${job.id}`); console.log(`Successfully completed scheduled task: ${job.id}`);
@@ -404,3 +408,80 @@ async function collectPostMetrics(postId?: string) {
changes changes
}; };
} }
/**
* Schedules a task to sync data from PostgreSQL to ClickHouse
* @param cronExpression The cron expression for scheduling (default: every 15 minutes)
*/
export const scheduleDatabaseSync = async (
cronExpression: string = '*/15 * * * *' // Default: Every 15 minutes
) => {
const queue = await createScheduledTaskQueue();
// Get current timestamp as the initial last sync time
const currentTimestamp = new Date().toISOString();
const jobName = 'postgres-to-clickhouse-sync';
// Remove existing job if any
const repeatableJobs = await queue.getRepeatableJobs();
const existingJob = repeatableJobs.find(job => job.name === jobName);
if (existingJob) {
await queue.removeRepeatableByKey(existingJob.key);
console.log(`Removed existing sync job: ${jobName}`);
}
// Add new repeatable job
const job = await queue.add(
jobName,
{
type: 'data_sync',
last_sync_timestamp: currentTimestamp,
},
{
repeat: {
pattern: cronExpression,
},
removeOnComplete: {
age: 24 * 3600, // Keep completed jobs for 24 hours
count: 100, // Keep at most 100 jobs
},
removeOnFail: false, // Do not remove failed jobs to track failures
}
);
console.log(`Scheduled PostgreSQL to ClickHouse sync job: ${job.id}, pattern: ${cronExpression}`);
return job;
};
/**
* Syncs data from PostgreSQL to ClickHouse
* @param lastSyncTimestamp The timestamp of the last sync
*/
async function syncPostgresToClickhouse(lastSyncTimestamp?: string): Promise<{
posts: number;
comments: number;
influencerChanges: number;
}> {
console.log(`Starting PostgreSQL to ClickHouse sync from timestamp: ${lastSyncTimestamp}`);
// If no last sync timestamp provided, use a timestamp from 1 hour ago
if (!lastSyncTimestamp) {
const oneHourAgo = new Date();
oneHourAgo.setHours(oneHourAgo.getHours() - 1);
lastSyncTimestamp = oneHourAgo.toISOString();
}
try {
// Sync all data
const result = await syncAllData(lastSyncTimestamp);
console.log('PostgreSQL to ClickHouse sync completed:', result);
return result;
} catch (error) {
console.error('Error in PostgreSQL to ClickHouse sync:', error);
throw error;
}
}