sync events done

This commit is contained in:
2025-03-12 23:53:36 +08:00
parent a25478e738
commit e179b9b6db
7 changed files with 599 additions and 42 deletions

View File

@@ -1,5 +1,7 @@
DROP TABLE IF EXISTS promote.sync_logs;
CREATE TABLE IF NOT EXISTS promote.sync_logs (
timestamp DateTime,
timestamp DateTime DEFAULT now(),
duration_ms UInt64,
posts_synced UInt32,
comments_synced UInt32,

View File

@@ -1,8 +1,16 @@
import { randomUUID } from 'crypto';
import { Client } from 'pg';
import clickhouse from '../utils/clickhouse';
import { randomUUID } from 'crypto';
// 数据库连接信息
const PG_CONNECTION_STRING = process.env.DATABASE_URL || '';
/**
* 简单的同步函数,只插入一条测试数据到ClickHouse
* 从PostgreSQL数据库同步数据到ClickHouse
* 同步三种事件类型:
* 1. 新增Posts
* 2. 新增Comments
* 3. Influencer变化
*/
export async function syncAllData(fromTimestamp: string): Promise<{
success: boolean;
@@ -15,53 +23,413 @@ export async function syncAllData(fromTimestamp: string): Promise<{
}> {
console.log(`开始同步数据,时间范围: ${fromTimestamp} - 现在`);
const errors: string[] = [];
const startTime = Date.now();
// 创建PostgreSQL客户端
const pgClient = new Client({
connectionString: PG_CONNECTION_STRING
});
try {
// 使用insert方法并仅提供必要字段让ClickHouse为其他字段使用默认值
await clickhouse.insert({
table: 'events',
values: [{
// 让ClickHouse自动生成event_id、timestamp、date和hour
user_id: 'test-user-123',
influencer_id: 'influencer-456',
content_id: 'content-789',
project_id: 'project-abc',
event_type: 'comment',
funnel_stage: 'consideration',
platform: 'instagram',
content_type: 'text',
content_status: 'approved',
sentiment: 'positive',
comment_text: '测试数据 - ClickHouse同步测试'
}],
format: 'JSONEachRow' // 使用JSONEachRow格式
});
// 连接到PostgreSQL
await pgClient.connect();
console.log('PostgreSQL连接成功');
console.log('数据插入成功');
// 同步各类数据
const postsCount = await syncPosts(pgClient, fromTimestamp);
const commentsCount = await syncComments(pgClient, fromTimestamp);
const influencerChangesCount = await syncInfluencerChanges(pgClient, fromTimestamp);
const projectsCount = await syncProjects(pgClient, fromTimestamp);
// 只计算了一条评论
const comments = 1;
// 关闭PostgreSQL连接
await pgClient.end();
const duration = (Date.now() - startTime) / 1000;
// 记录同步日志到ClickHouse
await logSyncResults(duration, postsCount, commentsCount, influencerChangesCount, projectsCount, errors.length === 0, errors);
return {
success: true,
message: '测试数据插入成功',
comments,
posts: 0,
influencer_changes: 0,
projects: 0,
success: errors.length === 0,
message: errors.length === 0 ? '同步成功' : '同步过程中有错误',
posts: postsCount,
comments: commentsCount,
influencer_changes: influencerChangesCount,
projects: projectsCount,
errors
};
} catch (err: any) {
console.error('数据插入失败:', err.message);
console.error('同步过程中发生错误:', err.message);
errors.push(err.message);
// 确保关闭PostgreSQL连接
try {
await pgClient.end();
} catch (e) {
console.error('关闭PostgreSQL连接失败:', e);
}
return {
success: false,
message: `插入失败: ${err.message}`,
comments: 0,
message: `同步失败: ${err.message}`,
posts: 0,
comments: 0,
influencer_changes: 0,
projects: 0,
errors
};
}
}
/**
* 同步Posts数据
*/
async function syncPosts(pgClient: Client, fromTimestamp: string): Promise<number> {
console.log('开始同步Posts数据...');
let count = 0;
try {
// 查询在指定时间范围内新增的posts
const query = `
SELECT
p.post_id,
p.influencer_id,
p.platform,
p.project_id,
p.title,
p.description,
p.published_at,
p.created_at
FROM
posts p
WHERE
p.created_at >= $1
ORDER BY
p.created_at ASC
`;
const result = await pgClient.query(query, [fromTimestamp]);
const posts = result.rows;
if (posts.length === 0) {
console.log('没有新的Posts数据需要同步');
return 0;
}
// 将每个post作为一个事件插入到ClickHouse
const values = posts.map(post => ({
user_id: 'system',
influencer_id: post.influencer_id,
content_id: post.post_id,
project_id: post.project_id || '',
event_type: 'impression', // 新帖子作为一个曝光事件
funnel_stage: 'exposure',
platform: post.platform || 'unknown',
content_type: 'video', // 默认类型,可根据实际情况调整
content_status: 'approved',
sentiment: 'neutral',
comment_text: post.title || post.description || '',
interaction_value: 1.0,
recorded_by: 'system'
}));
if (values.length > 0) {
// 批量插入事件
await clickhouse.insert({
table: 'events',
values,
format: 'JSONEachRow'
});
count = values.length;
console.log(`成功同步 ${count} 条新增Posts数据`);
}
return count;
} catch (error: any) {
console.error('同步Posts数据失败:', error.message);
throw error;
}
}
/**
* 同步Comments数据
*/
async function syncComments(pgClient: Client, fromTimestamp: string): Promise<number> {
console.log('开始同步Comments数据...');
let count = 0;
try {
// 查询在指定时间范围内新增的comments
const query = `
SELECT
c.comment_id,
c.post_id,
c.user_id,
c.content,
c.sentiment_score,
c.created_at,
p.influencer_id,
p.project_id,
p.platform
FROM
comments c
LEFT JOIN
posts p ON c.post_id = p.post_id
WHERE
c.created_at >= $1
ORDER BY
c.created_at ASC
`;
const result = await pgClient.query(query, [fromTimestamp]);
const comments = result.rows;
if (comments.length === 0) {
console.log('没有新的Comments数据需要同步');
return 0;
}
// 将每个comment作为一个事件插入到ClickHouse
const values = comments.map(comment => {
// 根据情感分数确定情感
let sentiment = 'neutral';
if (comment.sentiment_score > 0.5) {
sentiment = 'positive';
} else if (comment.sentiment_score < -0.5) {
sentiment = 'negative';
}
return {
user_id: comment.user_id || 'anonymous',
influencer_id: comment.influencer_id || '',
content_id: comment.post_id || '',
project_id: comment.project_id || '',
event_type: 'comment',
funnel_stage: 'consideration', // 评论通常表示用户在考虑阶段
platform: comment.platform || 'unknown',
content_type: 'text',
content_status: 'approved',
sentiment,
comment_text: comment.content || '',
interaction_value: 3.0, // 评论价值较高
recorded_by: 'system'
};
});
if (values.length > 0) {
// 批量插入事件
await clickhouse.insert({
table: 'events',
values,
format: 'JSONEachRow'
});
count = values.length;
console.log(`成功同步 ${count} 条新增Comments数据`);
}
return count;
} catch (error: any) {
console.error('同步Comments数据失败:', error.message);
throw error;
}
}
/**
* 同步Influencer变化数据
*/
async function syncInfluencerChanges(pgClient: Client, fromTimestamp: string): Promise<number> {
console.log('开始同步Influencer变化数据...');
let count = 0;
try {
// 查询在指定时间范围内更新的influencers
const query = `
SELECT
i.influencer_id,
i.name,
i.platform,
i.followers_count,
i.video_count,
i.updated_at
FROM
influencers i
WHERE
i.updated_at >= $1
ORDER BY
i.updated_at ASC
`;
const result = await pgClient.query(query, [fromTimestamp]);
const influencers = result.rows;
if (influencers.length === 0) {
console.log('没有Influencer变化数据需要同步');
return 0;
}
// 将每个influencer的变化作为一个事件插入到ClickHouse
const values = influencers.map(influencer => ({
user_id: 'system',
influencer_id: influencer.influencer_id,
content_id: '',
project_id: '',
event_type: 'follow', // KOL的粉丝变化
funnel_stage: 'interest',
platform: influencer.platform || 'unknown',
content_type: 'text',
content_status: 'approved',
sentiment: 'neutral',
comment_text: '',
followers_count: influencer.followers_count || 0,
followers_change: 0, // 这里可以通过查询历史记录计算变化值
interaction_value: 1.0,
recorded_by: 'system'
}));
if (values.length > 0) {
// 批量插入事件
await clickhouse.insert({
table: 'events',
values,
format: 'JSONEachRow'
});
count = values.length;
console.log(`成功同步 ${count} 条Influencer变化数据`);
}
return count;
} catch (error: any) {
console.error('同步Influencer变化数据失败:', error.message);
throw error;
}
}
/**
* 同步Projects数据
*/
async function syncProjects(pgClient: Client, fromTimestamp: string): Promise<number> {
console.log('开始同步Projects数据...');
let count = 0;
try {
// 查询在指定时间范围内新增的projects
const query = `
SELECT
p.id,
p.name,
p.description,
p.created_by,
p.created_at
FROM
projects p
WHERE
p.created_at >= $1
ORDER BY
p.created_at ASC
`;
const result = await pgClient.query(query, [fromTimestamp]);
const projects = result.rows;
if (projects.length === 0) {
console.log('没有新的Projects数据需要同步');
return 0;
}
// 将每个project作为一个事件插入到ClickHouse
const values = projects.map(project => ({
user_id: project.created_by || 'system',
influencer_id: '',
content_id: '',
project_id: project.id,
event_type: 'impression',
funnel_stage: 'exposure',
platform: 'internal',
content_type: 'text',
content_status: 'approved',
sentiment: 'neutral',
comment_text: project.name || '',
interaction_value: 1.0,
recorded_by: 'system'
}));
if (values.length > 0) {
// 批量插入事件
await clickhouse.insert({
table: 'events',
values,
format: 'JSONEachRow'
});
count = values.length;
console.log(`成功同步 ${count} 条新增Projects数据`);
}
return count;
} catch (error: any) {
console.error('同步Projects数据失败:', error.message);
throw error;
}
}
/**
* 记录同步结果到ClickHouse的sync_logs表
*/
async function logSyncResults(
duration: number,
postsCount: number,
commentsCount: number,
influencerChangesCount: number,
projectsCount: number,
success: boolean,
errors: string[]
): Promise<void> {
try {
// 使用insert方法插入数据timestamp字段由ClickHouse自动生成
await clickhouse.insert({
table: 'sync_logs',
values: [{
// 不提供timestamp字段由ClickHouse的DEFAULT now()自动生成
duration_ms: Math.round(duration * 1000),
posts_synced: postsCount,
comments_synced: commentsCount,
influencer_changes_synced: influencerChangesCount,
projects_synced: projectsCount,
success: success ? 1 : 0,
error_messages: errors.join('; ')
}],
format: 'JSONEachRow'
});
console.log('同步日志记录成功');
} catch (error: any) {
console.error('记录同步日志失败:', error.message);
}
}
/**
* 创建定时同步任务的函数
* 可以被调度系统调用比如使用node-cron
*/
export async function scheduledSync(): Promise<void> {
try {
// 获取上次同步时间如果没有则使用24小时前
const lastDay = new Date();
lastDay.setDate(lastDay.getDate() - 1);
const fromTimestamp = lastDay.toISOString();
console.log(`执行计划同步任务,同步时间范围: ${fromTimestamp} - 现在`);
const result = await syncAllData(fromTimestamp);
if (result.success) {
console.log(`计划同步任务成功完成,已同步: 帖子(${result.posts}), 评论(${result.comments}), KOL变化(${result.influencer_changes}), 项目(${result.projects})`);
} else {
console.error(`计划同步任务有错误:`, result.errors);
}
} catch (error: any) {
console.error('计划同步任务失败:', error.message);
}
}

View File

@@ -459,11 +459,7 @@ export const scheduleDatabaseSync = async (
* Syncs data from PostgreSQL to ClickHouse
* @param lastSyncTimestamp The timestamp of the last sync
*/
async function syncPostgresToClickhouse(lastSyncTimestamp?: string): Promise<{
posts: number;
comments: number;
influencerChanges: number;
}> {
async function syncPostgresToClickhouse(lastSyncTimestamp?: string): Promise<any> {
console.log(`Starting PostgreSQL to ClickHouse sync from timestamp: ${lastSyncTimestamp}`);
// If no last sync timestamp provided, use a timestamp from 1 hour ago
@@ -474,12 +470,28 @@ async function syncPostgresToClickhouse(lastSyncTimestamp?: string): Promise<{
}
try {
// Sync all data
// Use the enhanced syncAllData function from syncService
const result = await syncAllData(lastSyncTimestamp);
console.log('PostgreSQL to ClickHouse sync completed:', result);
console.log('PostgreSQL to ClickHouse sync completed:', {
success: result.success,
posts: result.posts,
comments: result.comments,
influencer_changes: result.influencer_changes,
projects: result.projects,
errors: result.errors.length > 0 ? result.errors : 'None'
});
return result;
if (!result.success) {
throw new Error(`Sync failed with errors: ${result.errors.join('; ')}`);
}
return {
posts: result.posts || 0,
comments: result.comments || 0,
influencerChanges: result.influencer_changes || 0,
projects: result.projects || 0
};
} catch (error) {
console.error('Error in PostgreSQL to ClickHouse sync:', error);
throw error;

View File

@@ -0,0 +1,52 @@
/**
* 任务调度器,用于定期执行数据同步任务
*/
import { scheduledSync } from '../services/syncService';
// 定义任务间隔时间(毫秒)
const SYNC_INTERVAL = 1000 * 60 * 60 * 6; // 每6小时同步一次
let syncIntervalId: NodeJS.Timeout | null = null;
/**
* 启动定时同步任务
*/
export function startScheduledTasks() {
// 停止现有任务(如果有)
stopScheduledTasks();
console.log('启动数据同步定时任务,间隔时间:', SYNC_INTERVAL / (1000 * 60 * 60), '小时');
// 立即执行一次同步
console.log('执行初始数据同步...');
scheduledSync().catch(err => {
console.error('初始同步任务失败:', err);
});
// 设置定时执行
syncIntervalId = setInterval(() => {
console.log('执行定时数据同步任务...');
scheduledSync().catch(err => {
console.error('定时同步任务失败:', err);
});
}, SYNC_INTERVAL);
console.log('数据同步定时任务已启动');
}
/**
* 停止定时同步任务
*/
export function stopScheduledTasks() {
if (syncIntervalId) {
clearInterval(syncIntervalId);
syncIntervalId = null;
console.log('数据同步定时任务已停止');
}
}
// 导出任务状态检查函数
export function isSchedulerRunning(): boolean {
return syncIntervalId !== null;
}