Files
promote/backend/dist/utils/queue.js
2025-03-07 18:04:27 +08:00

159 lines
5.7 KiB
JavaScript

"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.addNotificationJob = exports.addAnalyticsJob = exports.initWorkers = exports.QUEUE_NAMES = void 0;
const bullmq_1 = require("bullmq");
const config_1 = __importDefault(require("../config"));
// Define queue names
exports.QUEUE_NAMES = {
ANALYTICS: 'analytics',
NOTIFICATIONS: 'notifications',
};
// Create Redis connection options
const redisOptions = {
host: config_1.default.bull.redis.host,
port: config_1.default.bull.redis.port,
password: config_1.default.bull.redis.password,
};
// Create queues with error handling
let analyticsQueue;
let notificationsQueue;
try {
analyticsQueue = new bullmq_1.Queue(exports.QUEUE_NAMES.ANALYTICS, {
connection: redisOptions,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
});
notificationsQueue = new bullmq_1.Queue(exports.QUEUE_NAMES.NOTIFICATIONS, {
connection: redisOptions,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
});
}
catch (error) {
console.error('Error initializing BullMQ queues:', error);
// Create mock queues for development
analyticsQueue = {
add: async (name, data) => {
console.log(`Mock analytics job added: ${name}`, data);
return { id: 'mock-job-id' };
},
close: async () => console.log('Mock analytics queue closed'),
};
notificationsQueue = {
add: async (name, data) => {
console.log(`Mock notification job added: ${name}`, data);
return { id: 'mock-job-id' };
},
close: async () => console.log('Mock notifications queue closed'),
};
}
// Initialize workers
const initWorkers = () => {
try {
// Analytics worker
const analyticsWorker = new bullmq_1.Worker(exports.QUEUE_NAMES.ANALYTICS, async (job) => {
console.log(`Processing analytics job ${job.id}`);
const { type, data } = job.data;
switch (type) {
case 'process_views':
// Process view analytics
console.log('Processing view analytics', data);
break;
case 'process_likes':
// Process like analytics
console.log('Processing like analytics', data);
break;
case 'process_followers':
// Process follower analytics
console.log('Processing follower analytics', data);
break;
default:
console.log(`Unknown analytics job type: ${type}`);
}
}, { connection: redisOptions });
// Notifications worker
const notificationsWorker = new bullmq_1.Worker(exports.QUEUE_NAMES.NOTIFICATIONS, async (job) => {
console.log(`Processing notification job ${job.id}`);
const { type, data } = job.data;
switch (type) {
case 'new_follower':
// Send new follower notification
console.log('Sending new follower notification', data);
break;
case 'new_like':
// Send new like notification
console.log('Sending new like notification', data);
break;
default:
console.log(`Unknown notification job type: ${type}`);
}
}, { connection: redisOptions });
// Handle worker events
analyticsWorker.on('completed', (job) => {
console.log(`Analytics job ${job.id} completed`);
});
analyticsWorker.on('failed', (job, err) => {
console.error(`Analytics job ${job?.id} failed with error ${err.message}`);
});
notificationsWorker.on('completed', (job) => {
console.log(`Notification job ${job.id} completed`);
});
notificationsWorker.on('failed', (job, err) => {
console.error(`Notification job ${job?.id} failed with error ${err.message}`);
});
return {
analyticsWorker,
notificationsWorker,
};
}
catch (error) {
console.error('Error initializing BullMQ workers:', error);
// Return mock workers
return {
analyticsWorker: {
close: async () => console.log('Mock analytics worker closed'),
},
notificationsWorker: {
close: async () => console.log('Mock notifications worker closed'),
},
};
}
};
exports.initWorkers = initWorkers;
// Helper function to add jobs to queues
const addAnalyticsJob = async (type, data, options = {}) => {
try {
return await analyticsQueue.add(type, { type, data }, options);
}
catch (error) {
console.error('Error adding analytics job:', error);
console.log('Job details:', { type, data });
return null;
}
};
exports.addAnalyticsJob = addAnalyticsJob;
const addNotificationJob = async (type, data, options = {}) => {
try {
return await notificationsQueue.add(type, { type, data }, options);
}
catch (error) {
console.error('Error adding notification job:', error);
console.log('Job details:', { type, data });
return null;
}
};
exports.addNotificationJob = addNotificationJob;