import { Queue, Worker } from 'bullmq'; import supabase from './supabase'; import clickhouse from './clickhouse'; import { getRedisClient } from './redis'; interface ScheduledCollectionData { type: 'influencer_metrics' | 'post_metrics'; influencer_id?: string; post_id?: string; project_id?: string; } // Create a mock scheduler if BullMQ doesn't export QueueScheduler class MockQueueScheduler { constructor(queueName: string, options: any) { console.log(`Creating mock scheduler for queue: ${queueName}`); } } // Create scheduled collection queue const createScheduledTaskQueue = async () => { const connection = { host: process.env.BULL_REDIS_HOST || 'localhost', port: parseInt(process.env.BULL_REDIS_PORT || '6379'), password: process.env.BULL_REDIS_PASSWORD || '', }; const queueOptions = { connection, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 5000, }, }, }; // Create queue const scheduledCollectionQueue = new Queue('scheduled-data-collection', queueOptions); // Note about scheduler: // In a production environment, a QueueScheduler should be initialized // for handling delayed/repeatable jobs properly. // The QueueScheduler can be initialized separately if needed. return scheduledCollectionQueue; }; // Initialize scheduled collection workers export const initScheduledTaskWorkers = () => { const worker = new Worker( 'scheduled-data-collection', async (job) => { console.log(`Processing scheduled task: ${job.id}`, job.data); const { type, influencer_id, post_id, project_id } = job.data as ScheduledCollectionData; try { if (type === 'influencer_metrics') { await collectInfluencerMetrics(influencer_id); } else if (type === 'post_metrics') { await collectPostMetrics(post_id); } console.log(`Successfully completed scheduled task: ${job.id}`); return { success: true, timestamp: new Date().toISOString() }; } catch (error) { console.error(`Error processing scheduled task ${job.id}:`, error); throw error; } }, { connection: { host: process.env.BULL_REDIS_HOST || 'localhost', port: parseInt(process.env.BULL_REDIS_PORT || '6379'), password: process.env.BULL_REDIS_PASSWORD || '', }, concurrency: 5, } ); worker.on('completed', job => { console.log(`Scheduled task completed: ${job.id}`); }); worker.on('failed', (job, err) => { console.error(`Scheduled task failed: ${job?.id}`, err); }); return worker; }; // Schedule data collection jobs export const scheduleInfluencerCollection = async ( influencerId: string, cronExpression: string = '0 0 * * *' // Default: Every day at midnight ) => { const queue = await createScheduledTaskQueue(); await queue.add( `influencer-collection-${influencerId}`, { type: 'influencer_metrics', influencer_id: influencerId, scheduled_at: new Date().toISOString() }, { jobId: `influencer-${influencerId}-${Date.now()}`, repeat: { pattern: cronExpression } } ); return true; }; export const schedulePostCollection = async ( postId: string, cronExpression: string = '0 0 * * *' // Default: Every day at midnight ) => { const queue = await createScheduledTaskQueue(); await queue.add( `post-collection-${postId}`, { type: 'post_metrics', post_id: postId, scheduled_at: new Date().toISOString() }, { jobId: `post-${postId}-${Date.now()}`, repeat: { pattern: cronExpression } } ); return true; }; // Remove scheduled jobs export const removeScheduledJob = async (jobId: string) => { const queue = await createScheduledTaskQueue(); await queue.removeRepeatableByKey(jobId); return true; }; // Get all scheduled jobs export const getScheduledJobs = async () => { const queue = await createScheduledTaskQueue(); const repeatableJobs = await queue.getRepeatableJobs(); return repeatableJobs; }; // Implementation of collection functions // These functions would typically call APIs or scrape data from platforms async function collectInfluencerMetrics(influencerId?: string) { if (!influencerId) { throw new Error('Influencer ID is required'); } // Get influencer data from Supabase const { data: influencer, error } = await supabase .from('influencers') .select('influencer_id, name, platform, external_id') .eq('influencer_id', influencerId) .single(); if (error || !influencer) { throw new Error(`Failed to find influencer: ${error?.message}`); } // Here you would integrate with platform APIs to get updated metrics // This is a placeholder that would be replaced with actual API calls // Simulate collecting metrics (in a real scenario, this would come from APIs) const simulatedMetrics = { followers_count: Math.floor(50000 + Math.random() * 1000), video_count: Math.floor(100 + Math.random() * 5), views_count: Math.floor(1000000 + Math.random() * 50000), likes_count: Math.floor(500000 + Math.random() * 20000) }; // Record the metrics in both Supabase and ClickHouse // Get the current metrics to calculate changes const { data: currentMetrics, error: metricsError } = await supabase .from('influencers') .select('followers_count, video_count') .eq('influencer_id', influencerId) .single(); if (metricsError) { throw new Error(`Failed to get current metrics: ${metricsError.message}`); } // Calculate changes const followerChange = (simulatedMetrics.followers_count || 0) - (currentMetrics?.followers_count || 0); const videoChange = (simulatedMetrics.video_count || 0) - (currentMetrics?.video_count || 0); // Update Supabase const { error: updateError } = await supabase .from('influencers') .update(simulatedMetrics) .eq('influencer_id', influencerId); if (updateError) { throw new Error(`Failed to update influencer metrics: ${updateError.message}`); } // Record events in ClickHouse const timestamp = new Date().toISOString(); const eventPromises = []; if (followerChange !== 0) { eventPromises.push( clickhouse.query({ query: ` INSERT INTO promote.events ( event_type, influencer_id, timestamp, metric_name, metric_value, metric_total, recorded_by ) VALUES (?, ?, ?, ?, ?, ?, ?) `, values: [ 'followers_count_change', influencerId, timestamp, 'followers_count', followerChange, simulatedMetrics.followers_count, 'system' // Recorded by the system scheduler ] }) ); } if (videoChange !== 0) { eventPromises.push( clickhouse.query({ query: ` INSERT INTO promote.events ( event_type, influencer_id, timestamp, metric_name, metric_value, metric_total, recorded_by ) VALUES (?, ?, ?, ?, ?, ?, ?) `, values: [ 'video_count_change', influencerId, timestamp, 'video_count', videoChange, simulatedMetrics.video_count, 'system' // Recorded by the system scheduler ] }) ); } await Promise.all(eventPromises); return { influencer_id: influencerId, timestamp, metrics: simulatedMetrics, changes: { followers: followerChange, videos: videoChange } }; } async function collectPostMetrics(postId?: string) { if (!postId) { throw new Error('Post ID is required'); } // Get post data from Supabase const { data: post, error } = await supabase .from('posts') .select('post_id, influencer_id, platform, post_url, title') .eq('post_id', postId) .single(); if (error || !post) { throw new Error(`Failed to find post: ${error?.message}`); } // Here you would integrate with platform APIs to get updated metrics // This is a placeholder that would be replaced with actual API calls // Simulate collecting metrics (in a real scenario, this would come from APIs) const simulatedMetrics = { views_count: Math.floor(10000 + Math.random() * 5000), likes_count: Math.floor(5000 + Math.random() * 1000), comments_count: Math.floor(200 + Math.random() * 50), shares_count: Math.floor(100 + Math.random() * 20) }; // Get the current metrics to calculate changes const { data: currentMetrics, error: metricsError } = await supabase .from('posts') .select('views_count, likes_count, comments_count, shares_count') .eq('post_id', postId) .single(); if (metricsError) { throw new Error(`Failed to get current metrics: ${metricsError.message}`); } // Calculate changes const viewsChange = (simulatedMetrics.views_count || 0) - (currentMetrics?.views_count || 0); const likesChange = (simulatedMetrics.likes_count || 0) - (currentMetrics?.likes_count || 0); const commentsChange = (simulatedMetrics.comments_count || 0) - (currentMetrics?.comments_count || 0); const sharesChange = (simulatedMetrics.shares_count || 0) - (currentMetrics?.shares_count || 0); // Update Supabase const { error: updateError } = await supabase .from('posts') .update(simulatedMetrics) .eq('post_id', postId); if (updateError) { throw new Error(`Failed to update post metrics: ${updateError.message}`); } // Record events in ClickHouse const timestamp = new Date().toISOString(); const eventPromises = []; // Only record changes if they are non-zero interface MetricChanges { views: number; likes: number; comments: number; shares: number; } const changes: MetricChanges = { views: viewsChange, likes: likesChange, comments: commentsChange, shares: sharesChange }; const metricsMap = { views: simulatedMetrics.views_count, likes: simulatedMetrics.likes_count, comments: simulatedMetrics.comments_count, shares: simulatedMetrics.shares_count }; for (const [key, value] of Object.entries(changes)) { if (value !== 0) { const metricName = `${key}_count`; const metricTotal = metricsMap[key as keyof typeof metricsMap]; eventPromises.push( clickhouse.query({ query: ` INSERT INTO promote.events ( event_type, post_id, influencer_id, timestamp, metric_name, metric_value, metric_total, recorded_by ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) `, values: [ `post_${metricName}_change`, postId, post.influencer_id, timestamp, metricName, value, metricTotal, 'system' // Recorded by the system scheduler ] }) ); } } await Promise.all(eventPromises); return { post_id: postId, timestamp, metrics: simulatedMetrics, changes }; }