Compare commits
2 Commits
b9c2828e54
...
53e1611670
| Author | SHA1 | Date | |
|---|---|---|---|
| 53e1611670 | |||
| 6025641ab1 |
@@ -306,6 +306,8 @@ function AnalyticsContent() {
|
||||
const [geoData, setGeoData] = useState<GeoData[]>([]);
|
||||
const [deviceData, setDeviceData] = useState<DeviceAnalyticsType | null>(null);
|
||||
const [events, setEvents] = useState<Event[]>([]);
|
||||
const [isRefreshing, setIsRefreshing] = useState(false); // New state to track auto-refresh
|
||||
const [lastRefreshed, setLastRefreshed] = useState<Date | null>(null); // Track when data was last refreshed
|
||||
|
||||
// 添加 Snackbar 状态
|
||||
const [isSnackbarOpen, setIsSnackbarOpen] = useState(false);
|
||||
@@ -449,12 +451,133 @@ function AnalyticsContent() {
|
||||
setError(err instanceof Error ? err.message : 'An error occurred while fetching data');
|
||||
} finally {
|
||||
setLoading(false);
|
||||
setIsRefreshing(false); // Reset refreshing state
|
||||
setLastRefreshed(new Date()); // Update last refreshed timestamp
|
||||
}
|
||||
};
|
||||
|
||||
fetchData();
|
||||
}, [dateRange, selectedTeamIds, selectedProjectIds, selectedTagNames, selectedSubpath, currentPage, pageSize, selectedShortUrl, shouldFetchData]);
|
||||
|
||||
// Add auto-refresh functionality
|
||||
useEffect(() => {
|
||||
if (!shouldFetchData) return; // Don't set up refresh until initial data load is triggered
|
||||
|
||||
// Function to trigger a refresh of data
|
||||
const refreshData = () => {
|
||||
console.log('Auto-refreshing analytics data...');
|
||||
// Only refresh if not already loading or refreshing
|
||||
if (!loading && !isRefreshing) {
|
||||
setIsRefreshing(true);
|
||||
|
||||
// Create a new fetch function instead of reusing the effect's fetchData
|
||||
const fetchRefreshedData = async () => {
|
||||
try {
|
||||
const startTime = format(dateRange.from, "yyyy-MM-dd'T'HH:mm:ss'Z'");
|
||||
const endTime = format(dateRange.to, "yyyy-MM-dd'T'HH:mm:ss'Z'");
|
||||
|
||||
// 构建基础URL和查询参数
|
||||
const baseUrl = '/api/events';
|
||||
const params = new URLSearchParams({
|
||||
startTime,
|
||||
endTime,
|
||||
page: currentPage.toString(),
|
||||
pageSize: pageSize.toString()
|
||||
});
|
||||
|
||||
// Duplicate the parameters logic from the main fetch effect
|
||||
if (selectedShortUrl && selectedShortUrl.externalId) {
|
||||
params.append('linkId', selectedShortUrl.externalId);
|
||||
} else {
|
||||
const savedExternalId = sessionStorage.getItem('current_shorturl_external_id');
|
||||
if (savedExternalId) {
|
||||
params.append('linkId', savedExternalId);
|
||||
}
|
||||
}
|
||||
|
||||
if (selectedTeamIds.length > 0) {
|
||||
selectedTeamIds.forEach(teamId => {
|
||||
params.append('teamId', teamId);
|
||||
});
|
||||
}
|
||||
|
||||
if (selectedProjectIds.length > 0) {
|
||||
selectedProjectIds.forEach(projectId => {
|
||||
params.append('projectId', projectId);
|
||||
});
|
||||
}
|
||||
|
||||
if (selectedTagNames.length > 0) {
|
||||
selectedTagNames.forEach(tagName => {
|
||||
params.append('tagName', tagName);
|
||||
});
|
||||
}
|
||||
|
||||
if (selectedSubpath) {
|
||||
params.append('subpath', selectedSubpath);
|
||||
}
|
||||
|
||||
// Build all URLs with the same parameters
|
||||
const summaryUrl = `${baseUrl}/summary?${params.toString()}`;
|
||||
const timeSeriesUrl = `${baseUrl}/time-series?${params.toString()}`;
|
||||
const geoUrl = `${baseUrl}/geo?${params.toString()}`;
|
||||
const devicesUrl = `${baseUrl}/devices?${params.toString()}`;
|
||||
const eventsUrl = `${baseUrl}?${params.toString()}`;
|
||||
|
||||
// Parallel requests for all data
|
||||
const [summaryRes, timeSeriesRes, geoRes, deviceRes, eventsRes] = await Promise.all([
|
||||
fetch(summaryUrl),
|
||||
fetch(timeSeriesUrl),
|
||||
fetch(geoUrl),
|
||||
fetch(devicesUrl),
|
||||
fetch(eventsUrl)
|
||||
]);
|
||||
|
||||
const [summaryData, timeSeriesData, geoData, deviceData, eventsData] = await Promise.all([
|
||||
summaryRes.json(),
|
||||
timeSeriesRes.json(),
|
||||
geoRes.json(),
|
||||
deviceRes.json(),
|
||||
eventsRes.json()
|
||||
]);
|
||||
|
||||
// Update state with fresh data
|
||||
if (summaryRes.ok) setSummary(summaryData.data);
|
||||
if (timeSeriesRes.ok) setTimeSeriesData(timeSeriesData.data);
|
||||
if (geoRes.ok) setGeoData(geoData.data);
|
||||
if (deviceRes.ok) setDeviceData(deviceData.data);
|
||||
if (eventsRes.ok) {
|
||||
setEvents(eventsData.data || []);
|
||||
// Update pagination info
|
||||
if (eventsData.meta) {
|
||||
const totalCount = parseInt(String(eventsData.meta.total), 10);
|
||||
if (!isNaN(totalCount)) {
|
||||
setTotalEvents(totalCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Auto-refresh error:', err);
|
||||
// Don't show errors during auto-refresh to avoid disrupting the UI
|
||||
} finally {
|
||||
setIsRefreshing(false);
|
||||
setLastRefreshed(new Date()); // Update last refreshed timestamp
|
||||
}
|
||||
};
|
||||
|
||||
fetchRefreshedData();
|
||||
}
|
||||
};
|
||||
|
||||
// Set up the interval for auto-refresh every 30 seconds
|
||||
const intervalId = setInterval(refreshData, 30000);
|
||||
|
||||
// Clean up the interval when the component unmounts
|
||||
return () => {
|
||||
clearInterval(intervalId);
|
||||
};
|
||||
}, [shouldFetchData, loading, isRefreshing, dateRange, selectedTeamIds, selectedProjectIds, selectedTagNames, selectedSubpath, currentPage, pageSize, selectedShortUrl]);
|
||||
|
||||
// Function to clear the shorturl filter
|
||||
const handleClearShortUrlFilter = () => {
|
||||
// 先清除 store 中的数据
|
||||
@@ -499,7 +622,7 @@ function AnalyticsContent() {
|
||||
setCurrentPage(1);
|
||||
};
|
||||
|
||||
if (loading) {
|
||||
if (loading && !isRefreshing) {
|
||||
return (
|
||||
<div className="flex items-center justify-center min-h-screen">
|
||||
<div className="animate-spin rounded-full h-12 w-12 border-t-2 border-b-2 border-blue-500" />
|
||||
@@ -532,8 +655,24 @@ function AnalyticsContent() {
|
||||
)}
|
||||
|
||||
<div className="flex justify-between items-center mb-8">
|
||||
<div>
|
||||
<h1 className="text-2xl font-bold text-gray-900">Analytics Dashboard</h1>
|
||||
{lastRefreshed && (
|
||||
<div className="text-xs text-gray-500 mt-1">
|
||||
Last updated: {format(lastRefreshed, 'MMM d, yyyy HH:mm:ss')}
|
||||
{isRefreshing ? ' · Refreshing...' : ' · Auto-refreshes every 30 seconds'}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
<div className="flex flex-col gap-4 md:flex-row md:items-center">
|
||||
{/* Show refresh indicator */}
|
||||
{isRefreshing && (
|
||||
<div className="flex items-center text-blue-600 text-sm">
|
||||
<div className="animate-spin rounded-full h-4 w-4 border-t-2 border-b-2 border-blue-500 mr-2" />
|
||||
<span>Refreshing data...</span>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* 如果有选定的 shorturl,可以显示一个提示,显示更多详细信息 */}
|
||||
{selectedShortUrl && (
|
||||
<div className="bg-blue-100 text-blue-800 px-3 py-2 rounded-md text-sm flex flex-col">
|
||||
|
||||
1
scripts/db/sql/clickhouse/truncate_events.sh
Normal file
1
scripts/db/sql/clickhouse/truncate_events.sh
Normal file
@@ -0,0 +1 @@
|
||||
./ch-query.sh -q "TRUNCATE TABLE shorturl_analytics.events"
|
||||
522
windmill/sync_mongo_short_to_postgres_short_url_shorturl.ts
Normal file
522
windmill/sync_mongo_short_to_postgres_short_url_shorturl.ts
Normal file
@@ -0,0 +1,522 @@
|
||||
// 从MongoDB的main.short表同步数据到PostgreSQL的short_url.shorturl表
|
||||
import { getVariable, setVariable, getResource } from "npm:windmill-client@1";
|
||||
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
|
||||
import { Client } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
|
||||
|
||||
interface MongoConfig {
|
||||
host: string;
|
||||
port: string;
|
||||
db: string;
|
||||
username: string;
|
||||
password: string;
|
||||
}
|
||||
|
||||
interface PostgresConfig {
|
||||
host: string;
|
||||
port: number;
|
||||
database: string;
|
||||
user: string;
|
||||
password: string;
|
||||
schema: string;
|
||||
}
|
||||
|
||||
// 扩展ShortRecord接口以包含更多可能的字段
|
||||
interface ShortRecord {
|
||||
_id: ObjectId;
|
||||
origin: string;
|
||||
slug: string;
|
||||
domain: string | null;
|
||||
createTime: number | { $numberLong: string } | string;
|
||||
// 可选字段
|
||||
expiredAt?: number | { $numberLong: string } | string | null;
|
||||
expiredUrl?: string | null;
|
||||
password?: string | null;
|
||||
image?: string | null;
|
||||
title?: string | null;
|
||||
description?: string | null;
|
||||
}
|
||||
|
||||
interface SyncState {
|
||||
last_sync_time: number;
|
||||
records_synced: number;
|
||||
last_sync_id?: string;
|
||||
}
|
||||
|
||||
// 同步状态键名
|
||||
const SYNC_STATE_KEY = "f/limq/mongo_short_to_postgres_shorturl_shorturl_state";
|
||||
|
||||
export async function main(
|
||||
batch_size = 1000,
|
||||
max_records = 9999999,
|
||||
timeout_minutes = 60,
|
||||
skip_duplicate_check = false,
|
||||
force_insert = false,
|
||||
reset_sync_state = false,
|
||||
postgres_schema = "short_url", // 添加schema参数,允许运行时指定
|
||||
postgres_database = "postgres", // 添加数据库名称参数,默认为postgres
|
||||
domain = "upj.to" // 添加domain参数,允许用户指定域名
|
||||
) {
|
||||
const logWithTimestamp = (message: string) => {
|
||||
const now = new Date();
|
||||
console.log(`[${now.toISOString()}] ${message}`);
|
||||
};
|
||||
|
||||
logWithTimestamp("开始执行MongoDB到PostgreSQL的同步任务");
|
||||
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
|
||||
logWithTimestamp(`使用域名: ${domain}`);
|
||||
if (skip_duplicate_check) {
|
||||
logWithTimestamp("⚠️ 警告: 已启用跳过重复检查模式,不会检查记录是否已存在");
|
||||
}
|
||||
if (force_insert) {
|
||||
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
|
||||
}
|
||||
if (reset_sync_state) {
|
||||
logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据");
|
||||
}
|
||||
|
||||
// 设置超时
|
||||
const startTime = Date.now();
|
||||
const timeoutMs = timeout_minutes * 60 * 1000;
|
||||
|
||||
// 检查是否超时
|
||||
const checkTimeout = () => {
|
||||
if (Date.now() - startTime > timeoutMs) {
|
||||
logWithTimestamp(`运行时间超过${timeout_minutes}分钟,暂停执行`);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
// 日期解析函数,处理不同格式的日期
|
||||
const parseDate = (dateValue: any): Date | null => {
|
||||
if (!dateValue) return null;
|
||||
|
||||
// 处理 MongoDB $numberLong 格式
|
||||
if (dateValue.$numberLong) {
|
||||
return new Date(Number(dateValue.$numberLong));
|
||||
}
|
||||
|
||||
// 处理普通时间戳
|
||||
if (typeof dateValue === 'number') {
|
||||
return new Date(dateValue);
|
||||
}
|
||||
|
||||
// 处理 ISO 字符串格式
|
||||
if (typeof dateValue === 'string') {
|
||||
const date = new Date(dateValue);
|
||||
return isNaN(date.getTime()) ? null : date;
|
||||
}
|
||||
|
||||
return null;
|
||||
};
|
||||
|
||||
// 获取MongoDB和PostgreSQL的连接信息
|
||||
let mongoConfig: MongoConfig;
|
||||
let postgresConfig: PostgresConfig;
|
||||
|
||||
try {
|
||||
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
|
||||
if (typeof rawMongoConfig === "string") {
|
||||
try {
|
||||
mongoConfig = JSON.parse(rawMongoConfig);
|
||||
} catch (e) {
|
||||
console.error("MongoDB配置解析失败:", e);
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
mongoConfig = rawMongoConfig as MongoConfig;
|
||||
}
|
||||
|
||||
// 使用getResource获取PostgreSQL资源
|
||||
try {
|
||||
logWithTimestamp("正在获取PostgreSQL资源...");
|
||||
const resourceConfig = await getResource("f/limq/production_supabase");
|
||||
|
||||
// 将resource转换为PostgresConfig
|
||||
postgresConfig = {
|
||||
host: resourceConfig.host || "",
|
||||
port: Number(resourceConfig.port) || 5432,
|
||||
user: resourceConfig.user || "",
|
||||
password: resourceConfig.password || "",
|
||||
database: resourceConfig.database || postgres_database, // 使用提供的数据库名称作为备选
|
||||
schema: resourceConfig.schema || postgres_schema // 使用提供的schema作为备选
|
||||
};
|
||||
|
||||
// 检查并记录配置信息
|
||||
if (!postgresConfig.database || postgresConfig.database === "undefined") {
|
||||
postgresConfig.database = postgres_database;
|
||||
logWithTimestamp(`数据库名称未指定或为"undefined",使用提供的值: ${postgresConfig.database}`);
|
||||
}
|
||||
|
||||
if (!postgresConfig.schema || postgresConfig.schema === "undefined") {
|
||||
postgresConfig.schema = postgres_schema;
|
||||
logWithTimestamp(`Schema未指定或为"undefined",使用提供的值: ${postgresConfig.schema}`);
|
||||
}
|
||||
|
||||
logWithTimestamp(`PostgreSQL配置: 数据库=${postgresConfig.database}, Schema=${postgresConfig.schema}`);
|
||||
} catch (e) {
|
||||
console.error("获取PostgreSQL资源失败:", e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
console.log("MongoDB配置:", JSON.stringify({
|
||||
...mongoConfig,
|
||||
password: "****" // 隐藏密码
|
||||
}));
|
||||
console.log("PostgreSQL配置:", JSON.stringify({
|
||||
...postgresConfig,
|
||||
password: "****" // 隐藏密码
|
||||
}));
|
||||
} catch (error) {
|
||||
console.error("获取配置失败:", error);
|
||||
throw error;
|
||||
}
|
||||
|
||||
// 获取上次同步状态
|
||||
let lastSyncState: SyncState | null = null;
|
||||
if (!reset_sync_state) {
|
||||
try {
|
||||
const rawSyncState = await getVariable(SYNC_STATE_KEY);
|
||||
if (rawSyncState) {
|
||||
if (typeof rawSyncState === "string") {
|
||||
try {
|
||||
lastSyncState = JSON.parse(rawSyncState);
|
||||
} catch (e) {
|
||||
logWithTimestamp(`解析上次同步状态失败: ${e}, 将从头开始同步`);
|
||||
}
|
||||
} else {
|
||||
lastSyncState = rawSyncState as SyncState;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`);
|
||||
}
|
||||
}
|
||||
|
||||
if (lastSyncState) {
|
||||
logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`);
|
||||
if (lastSyncState.last_sync_id) {
|
||||
logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`);
|
||||
}
|
||||
} else {
|
||||
logWithTimestamp("没有找到上次同步状态,将从头开始同步");
|
||||
}
|
||||
|
||||
// 构建MongoDB连接URL
|
||||
let mongoUrl = "mongodb://";
|
||||
if (mongoConfig.username && mongoConfig.password) {
|
||||
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
|
||||
}
|
||||
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
|
||||
|
||||
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
|
||||
|
||||
// 构建PostgreSQL连接URL
|
||||
const pgConnectionString = `postgres://${postgresConfig.user}:${postgresConfig.password}@${postgresConfig.host}:${postgresConfig.port}/${postgresConfig.database}`;
|
||||
console.log(`PostgreSQL连接URL: ${pgConnectionString.replace(/:[^:]*@/, ":****@")}`);
|
||||
|
||||
// 连接MongoDB
|
||||
const mongoClient = new MongoClient();
|
||||
let pgClient: Client | null = null;
|
||||
|
||||
try {
|
||||
await mongoClient.connect(mongoUrl);
|
||||
logWithTimestamp("MongoDB连接成功");
|
||||
|
||||
// 连接PostgreSQL
|
||||
pgClient = new Client(pgConnectionString);
|
||||
await pgClient.connect();
|
||||
logWithTimestamp("PostgreSQL连接成功");
|
||||
|
||||
// 确认PostgreSQL schema存在
|
||||
try {
|
||||
await pgClient.queryArray(`SELECT 1 FROM information_schema.schemata WHERE schema_name = '${postgresConfig.schema}'`);
|
||||
logWithTimestamp(`PostgreSQL schema '${postgresConfig.schema}' 已确认存在`);
|
||||
} catch (error) {
|
||||
logWithTimestamp(`检查PostgreSQL schema失败: ${error}`);
|
||||
throw new Error(`Schema '${postgresConfig.schema}' 可能不存在`);
|
||||
}
|
||||
|
||||
const db = mongoClient.database(mongoConfig.db);
|
||||
const shortCollection = db.collection<ShortRecord>("short");
|
||||
|
||||
// 构建查询条件,根据上次同步状态获取新记录
|
||||
const query: Record<string, unknown> = {};
|
||||
|
||||
// 如果有上次同步状态,则只获取更新的记录
|
||||
if (lastSyncState && lastSyncState.last_sync_time) {
|
||||
// 使用上次同步时间作为过滤条件
|
||||
query.createTime = { $gt: lastSyncState.last_sync_time };
|
||||
logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`);
|
||||
}
|
||||
|
||||
// 计算总记录数
|
||||
const totalRecords = await shortCollection.countDocuments(query);
|
||||
logWithTimestamp(`找到 ${totalRecords} 条新记录需要同步`);
|
||||
|
||||
// 限制此次处理的记录数量
|
||||
const recordsToProcess = Math.min(totalRecords, max_records);
|
||||
logWithTimestamp(`本次将处理 ${recordsToProcess} 条记录`);
|
||||
|
||||
if (totalRecords === 0) {
|
||||
logWithTimestamp("没有新记录需要同步,任务完成");
|
||||
return {
|
||||
success: true,
|
||||
records_synced: 0,
|
||||
message: "没有新记录需要同步"
|
||||
};
|
||||
}
|
||||
|
||||
// 检查记录是否已经存在于PostgreSQL中
|
||||
const checkExistingRecords = async (records: ShortRecord[]): Promise<ShortRecord[]> => {
|
||||
if (records.length === 0) return [];
|
||||
|
||||
// 如果跳过重复检查或强制插入,则直接返回所有记录
|
||||
if (skip_duplicate_check || force_insert) {
|
||||
logWithTimestamp(`已跳过重复检查,准备处理所有 ${records.length} 条记录`);
|
||||
return records;
|
||||
}
|
||||
|
||||
logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于PostgreSQL中...`);
|
||||
|
||||
try {
|
||||
// 提取所有记录的slugs
|
||||
const slugs = records.map(record => record.slug);
|
||||
|
||||
// 查询PostgreSQL中是否已存在这些slugs
|
||||
const result = await pgClient!.queryArray(`
|
||||
SELECT slug FROM ${postgresConfig.schema}.shorturl
|
||||
WHERE slug = ANY($1::text[])
|
||||
`, [slugs]);
|
||||
|
||||
// 将已存在的slugs加入到集合中
|
||||
const existingSlugs = new Set<string>();
|
||||
for (const row of result.rows) {
|
||||
existingSlugs.add(row[0] as string);
|
||||
}
|
||||
|
||||
logWithTimestamp(`检测到 ${existingSlugs.size} 条记录已存在于PostgreSQL中`);
|
||||
|
||||
// 过滤出不存在的记录
|
||||
const newRecords = records.filter(record => !existingSlugs.has(record.slug));
|
||||
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
|
||||
|
||||
return newRecords;
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
logWithTimestamp(`PostgreSQL查询出错: ${error.message}`);
|
||||
if (skip_duplicate_check) {
|
||||
logWithTimestamp("已启用跳过重复检查,将继续处理所有记录");
|
||||
return records;
|
||||
} else {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// 处理记录的函数
|
||||
const processRecords = async (records: ShortRecord[]) => {
|
||||
if (records.length === 0) return 0;
|
||||
|
||||
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`);
|
||||
|
||||
// 检查记录是否已存在
|
||||
let newRecords;
|
||||
try {
|
||||
newRecords = await checkExistingRecords(records);
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
|
||||
if (!skip_duplicate_check && !force_insert) {
|
||||
throw error;
|
||||
}
|
||||
// 如果跳过检查或强制插入,则使用所有记录
|
||||
logWithTimestamp("将使用所有记录进行处理");
|
||||
newRecords = records;
|
||||
}
|
||||
|
||||
if (newRecords.length === 0) {
|
||||
logWithTimestamp("所有记录都已存在,跳过处理");
|
||||
return 0;
|
||||
}
|
||||
|
||||
logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`);
|
||||
|
||||
// 批量插入PostgreSQL
|
||||
try {
|
||||
// 开始事务
|
||||
await pgClient!.queryArray('BEGIN');
|
||||
|
||||
let insertedCount = 0;
|
||||
|
||||
// 由于参数可能很多,按小批次处理
|
||||
const smallBatchSize = 100;
|
||||
for (let i = 0; i < newRecords.length; i += smallBatchSize) {
|
||||
const batchRecords = newRecords.slice(i, i + smallBatchSize);
|
||||
|
||||
// 构造批量插入语句
|
||||
const placeholders = [];
|
||||
const values = [];
|
||||
let valueIndex = 1;
|
||||
|
||||
for (const record of batchRecords) {
|
||||
// 参考提供的字段处理方式处理数据
|
||||
const createdAt = parseDate(record.createTime);
|
||||
const updatedAt = createdAt; // 设置更新时间等于创建时间
|
||||
const fullShortUrl = `${domain}/${record.slug}`;
|
||||
|
||||
placeholders.push(`($${valueIndex}, $${valueIndex+1}, $${valueIndex+2}, $${valueIndex+3}, $${valueIndex+4}, $${valueIndex+5}, $${valueIndex+6}, $${valueIndex+7}, $${valueIndex+8}, $${valueIndex+9}, $${valueIndex+10}, $${valueIndex+11}, $${valueIndex+12})`);
|
||||
|
||||
values.push(
|
||||
record._id.toString(), // id
|
||||
record.slug, // slug
|
||||
domain, // domain (使用提供的域名)
|
||||
record.slug, // name (使用slug作为name)
|
||||
record.slug, // title (使用slug作为title)
|
||||
record.origin || '', // origin
|
||||
createdAt, // created_at
|
||||
updatedAt, // updated_at
|
||||
fullShortUrl, // full_short_url
|
||||
record.image || null, // image
|
||||
record.description || null, // description
|
||||
record.expiredUrl || null, // expired_url
|
||||
parseDate(record.expiredAt) // expired_at
|
||||
);
|
||||
|
||||
valueIndex += 13;
|
||||
}
|
||||
|
||||
const query = `
|
||||
INSERT INTO ${postgresConfig.schema}.shorturl
|
||||
(id, slug, domain, name, title, origin, created_at, updated_at, full_short_url, image, description, expired_url, expired_at)
|
||||
VALUES ${placeholders.join(', ')}
|
||||
`;
|
||||
|
||||
await pgClient!.queryArray(query, values);
|
||||
insertedCount += batchRecords.length;
|
||||
logWithTimestamp(`已插入 ${insertedCount}/${newRecords.length} 条记录`);
|
||||
}
|
||||
|
||||
// 提交事务
|
||||
await pgClient!.queryArray('COMMIT');
|
||||
|
||||
logWithTimestamp(`成功插入 ${insertedCount} 条记录到PostgreSQL`);
|
||||
return insertedCount;
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
// 发生错误,回滚事务
|
||||
await pgClient!.queryArray('ROLLBACK');
|
||||
logWithTimestamp(`向PostgreSQL插入数据失败: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
// 批量处理记录
|
||||
let processedRecords = 0;
|
||||
let totalBatchRecords = 0;
|
||||
let lastSyncTime = 0;
|
||||
let lastSyncId = "";
|
||||
|
||||
for (let page = 0; processedRecords < recordsToProcess; page++) {
|
||||
// 检查超时
|
||||
if (checkTimeout()) {
|
||||
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
|
||||
break;
|
||||
}
|
||||
|
||||
// 每批次都输出进度
|
||||
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
|
||||
|
||||
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
|
||||
const records = await shortCollection.find(
|
||||
query,
|
||||
{
|
||||
sort: { createTime: 1 },
|
||||
skip: page * batch_size,
|
||||
limit: batch_size
|
||||
}
|
||||
).toArray();
|
||||
|
||||
if (records.length === 0) {
|
||||
logWithTimestamp("没有找到更多数据,同步结束");
|
||||
break;
|
||||
}
|
||||
|
||||
// 找到数据,开始处理
|
||||
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
|
||||
|
||||
// 输出当前批次的部分数据信息
|
||||
if (records.length > 0) {
|
||||
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, slug=${records[0].slug}, 时间=${new Date(typeof records[0].createTime === 'number' ? records[0].createTime : 0).toISOString()}`);
|
||||
if (records.length > 1) {
|
||||
const lastRec = records[records.length-1];
|
||||
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${lastRec._id}, slug=${lastRec.slug}, 时间=${new Date(typeof lastRec.createTime === 'number' ? lastRec.createTime : 0).toISOString()}`);
|
||||
}
|
||||
}
|
||||
|
||||
const batchSize = await processRecords(records);
|
||||
processedRecords += records.length;
|
||||
totalBatchRecords += batchSize;
|
||||
|
||||
// 更新最后处理的记录时间和ID
|
||||
if (records.length > 0) {
|
||||
const lastRecord = records[records.length - 1];
|
||||
// 提取数字时间戳
|
||||
let lastCreateTime = 0;
|
||||
if (typeof lastRecord.createTime === 'number') {
|
||||
lastCreateTime = lastRecord.createTime;
|
||||
} else if (lastRecord.createTime && lastRecord.createTime.$numberLong) {
|
||||
lastCreateTime = Number(lastRecord.createTime.$numberLong);
|
||||
}
|
||||
|
||||
lastSyncTime = Math.max(lastSyncTime, lastCreateTime);
|
||||
lastSyncId = lastRecord._id.toString();
|
||||
}
|
||||
|
||||
logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
|
||||
}
|
||||
|
||||
// 更新同步状态
|
||||
if (processedRecords > 0 && lastSyncTime > 0) {
|
||||
// 创建新的同步状态
|
||||
const newSyncState: SyncState = {
|
||||
last_sync_time: lastSyncTime,
|
||||
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords,
|
||||
last_sync_id: lastSyncId
|
||||
};
|
||||
|
||||
try {
|
||||
// 保存同步状态
|
||||
await setVariable(SYNC_STATE_KEY, newSyncState);
|
||||
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
logWithTimestamp(`更新同步状态失败: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
records_processed: processedRecords,
|
||||
records_synced: totalBatchRecords,
|
||||
last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null,
|
||||
message: "数据同步完成"
|
||||
};
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
console.error("同步过程中发生错误:", error);
|
||||
return {
|
||||
success: false,
|
||||
error: error.message,
|
||||
stack: error.stack
|
||||
};
|
||||
} finally {
|
||||
// 关闭连接
|
||||
if (pgClient) {
|
||||
await pgClient.end();
|
||||
logWithTimestamp("PostgreSQL连接已关闭");
|
||||
}
|
||||
await mongoClient.close();
|
||||
logWithTimestamp("MongoDB连接已关闭");
|
||||
}
|
||||
}
|
||||
@@ -39,13 +39,17 @@ interface SyncState {
|
||||
last_sync_id?: string;
|
||||
}
|
||||
|
||||
// 同步状态键名
|
||||
const SYNC_STATE_KEY = "f/shorturl_analytics/mongo_sync_state";
|
||||
|
||||
export async function main(
|
||||
batch_size = 1000,
|
||||
max_records = 9999999,
|
||||
timeout_minutes = 60,
|
||||
skip_clickhouse_check = false,
|
||||
force_insert = false,
|
||||
database_override = "shorturl_analytics" // 添加数据库名称参数,默认为shorturl_analytics
|
||||
database_override = "shorturl_analytics", // 添加数据库名称参数,默认为shorturl_analytics
|
||||
reset_sync_state = false // 添加参数用于重置同步状态
|
||||
) {
|
||||
const logWithTimestamp = (message: string) => {
|
||||
const now = new Date();
|
||||
@@ -60,6 +64,9 @@ export async function main(
|
||||
if (force_insert) {
|
||||
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
|
||||
}
|
||||
if (reset_sync_state) {
|
||||
logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据");
|
||||
}
|
||||
|
||||
// 设置超时
|
||||
const startTime = Date.now();
|
||||
@@ -127,6 +134,36 @@ export async function main(
|
||||
throw error;
|
||||
}
|
||||
|
||||
// 获取上次同步状态
|
||||
let lastSyncState: SyncState | null = null;
|
||||
if (!reset_sync_state) {
|
||||
try {
|
||||
const rawSyncState = await getVariable(SYNC_STATE_KEY);
|
||||
if (rawSyncState) {
|
||||
if (typeof rawSyncState === "string") {
|
||||
try {
|
||||
lastSyncState = JSON.parse(rawSyncState);
|
||||
} catch (e) {
|
||||
logWithTimestamp(`解析上次同步状态失败: ${e}, 将从头开始同步`);
|
||||
}
|
||||
} else {
|
||||
lastSyncState = rawSyncState as SyncState;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`);
|
||||
}
|
||||
}
|
||||
|
||||
if (lastSyncState) {
|
||||
logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`);
|
||||
if (lastSyncState.last_sync_id) {
|
||||
logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`);
|
||||
}
|
||||
} else {
|
||||
logWithTimestamp("没有找到上次同步状态,将从头开始同步");
|
||||
}
|
||||
|
||||
// 构建MongoDB连接URL
|
||||
let mongoUrl = "mongodb://";
|
||||
if (mongoConfig.username && mongoConfig.password) {
|
||||
@@ -145,25 +182,32 @@ export async function main(
|
||||
const db = client.database(mongoConfig.db);
|
||||
const traceCollection = db.collection<TraceRecord>("trace");
|
||||
|
||||
// 构建查询条件,获取所有记录
|
||||
// 构建查询条件,根据上次同步状态获取新记录
|
||||
const query: Record<string, unknown> = {
|
||||
type: 1 // 只同步type为1的记录
|
||||
};
|
||||
|
||||
// 如果有上次同步状态,则只获取更新的记录
|
||||
if (lastSyncState && lastSyncState.last_sync_time) {
|
||||
// 使用上次同步时间作为过滤条件
|
||||
query.createTime = { $gt: lastSyncState.last_sync_time };
|
||||
logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`);
|
||||
}
|
||||
|
||||
// 计算总记录数
|
||||
const totalRecords = await traceCollection.countDocuments(query);
|
||||
console.log(`找到 ${totalRecords} 条记录需要同步`);
|
||||
console.log(`找到 ${totalRecords} 条新记录需要同步`);
|
||||
|
||||
// 限制此次处理的记录数量
|
||||
const recordsToProcess = Math.min(totalRecords, max_records);
|
||||
console.log(`本次将处理 ${recordsToProcess} 条记录`);
|
||||
|
||||
if (totalRecords === 0) {
|
||||
console.log("没有记录需要同步,任务完成");
|
||||
console.log("没有新记录需要同步,任务完成");
|
||||
return {
|
||||
success: true,
|
||||
records_synced: 0,
|
||||
message: "没有记录需要同步"
|
||||
message: "没有新记录需要同步"
|
||||
};
|
||||
}
|
||||
|
||||
@@ -464,6 +508,8 @@ export async function main(
|
||||
// 批量处理记录
|
||||
let processedRecords = 0;
|
||||
let totalBatchRecords = 0;
|
||||
let lastSyncTime = 0;
|
||||
let lastSyncId = "";
|
||||
|
||||
for (let page = 0; processedRecords < recordsToProcess; page++) {
|
||||
// 检查超时
|
||||
@@ -505,13 +551,40 @@ export async function main(
|
||||
processedRecords += records.length;
|
||||
totalBatchRecords += batchSize;
|
||||
|
||||
// 更新最后处理的记录时间和ID
|
||||
if (records.length > 0) {
|
||||
const lastRecord = records[records.length - 1];
|
||||
lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime);
|
||||
lastSyncId = lastRecord._id.toString();
|
||||
}
|
||||
|
||||
logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
|
||||
}
|
||||
|
||||
// 更新同步状态
|
||||
if (processedRecords > 0 && lastSyncTime > 0) {
|
||||
// 创建新的同步状态
|
||||
const newSyncState: SyncState = {
|
||||
last_sync_time: lastSyncTime,
|
||||
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords,
|
||||
last_sync_id: lastSyncId
|
||||
};
|
||||
|
||||
try {
|
||||
// 保存同步状态
|
||||
await setVariable(SYNC_STATE_KEY, newSyncState);
|
||||
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
|
||||
} catch (err) {
|
||||
const error = err as Error;
|
||||
logWithTimestamp(`更新同步状态失败: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
records_processed: processedRecords,
|
||||
records_synced: totalBatchRecords,
|
||||
last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null,
|
||||
message: "数据同步完成"
|
||||
};
|
||||
} catch (err) {
|
||||
|
||||
@@ -3,7 +3,17 @@
|
||||
// 创建日期: 2023-11-21
|
||||
|
||||
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
|
||||
import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
|
||||
import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
|
||||
|
||||
// 同步状态接口
|
||||
interface SyncState {
|
||||
last_sync_time: string; // 上次同步的结束时间
|
||||
records_synced: number; // 累计同步的记录数
|
||||
last_run: string; // 上次运行的时间
|
||||
}
|
||||
|
||||
// 同步状态键名
|
||||
const SYNC_STATE_KEY = "f/shorturl_analytics/shorturl_to_clickhouse_state";
|
||||
|
||||
// PostgreSQL配置接口
|
||||
interface PgConfig {
|
||||
@@ -42,24 +52,15 @@ interface ShortUrlData {
|
||||
* 同步PostgreSQL short_url.shorturl表数据到ClickHouse
|
||||
*/
|
||||
export async function main(
|
||||
params: {
|
||||
/** 同步数据的开始时间(ISO 8601格式)。默认为1小时前 */
|
||||
start_time?: string;
|
||||
/** 同步数据的结束时间(ISO 8601格式)。默认为当前时间 */
|
||||
end_time?: string;
|
||||
/** 是否为测试模式(不执行实际更新) */
|
||||
dry_run?: boolean;
|
||||
dry_run = false,
|
||||
/** 是否显示详细日志 */
|
||||
verbose?: boolean;
|
||||
}
|
||||
verbose = false,
|
||||
/** 是否重置同步状态(从头开始同步) */
|
||||
reset_sync_state = false,
|
||||
/** 如果没有同步状态,往前查询多少小时的数据(默认1小时) */
|
||||
default_hours_back = 1
|
||||
) {
|
||||
// 设置默认参数
|
||||
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000).toISOString();
|
||||
const start_time = params.start_time || oneHourAgo;
|
||||
const end_time = params.end_time || new Date().toISOString();
|
||||
const dry_run = params.dry_run || false;
|
||||
const verbose = params.verbose || false;
|
||||
|
||||
// 初始化日志函数
|
||||
const log = (message: string, isVerbose = false) => {
|
||||
if (!isVerbose || verbose) {
|
||||
@@ -67,6 +68,33 @@ export async function main(
|
||||
}
|
||||
};
|
||||
|
||||
// 获取同步状态
|
||||
let syncState: SyncState | null = null;
|
||||
if (!reset_sync_state) {
|
||||
try {
|
||||
log("获取同步状态...", true);
|
||||
const rawState = await getVariable(SYNC_STATE_KEY);
|
||||
if (rawState) {
|
||||
if (typeof rawState === "string") {
|
||||
syncState = JSON.parse(rawState);
|
||||
} else {
|
||||
syncState = rawState as SyncState;
|
||||
}
|
||||
log(`找到上次同步状态: 最后同步时间 ${syncState.last_sync_time}, 已同步记录数 ${syncState.records_synced}`, true);
|
||||
}
|
||||
} catch (error) {
|
||||
log(`获取同步状态失败: ${error}, 将使用默认设置`, true);
|
||||
}
|
||||
} else {
|
||||
log("重置同步状态,从头开始同步", true);
|
||||
}
|
||||
|
||||
// 设置时间范围
|
||||
const oneHourAgo = new Date(Date.now() - default_hours_back * 60 * 60 * 1000).toISOString();
|
||||
// 如果有同步状态,使用上次同步时间作为开始时间;否则使用默认时间
|
||||
const start_time = syncState ? syncState.last_sync_time : oneHourAgo;
|
||||
const end_time = new Date().toISOString();
|
||||
|
||||
log(`开始同步shorturl表数据: ${start_time} 至 ${end_time}`);
|
||||
|
||||
let pgPool: Pool | null = null;
|
||||
@@ -90,6 +118,10 @@ export async function main(
|
||||
log(`成功获取 ${shorturlData.length} 条shorturl数据`);
|
||||
|
||||
if (shorturlData.length === 0) {
|
||||
// 更新同步状态,即使没有新数据
|
||||
if (!dry_run) {
|
||||
await updateSyncState(end_time, syncState ? syncState.records_synced : 0, log);
|
||||
}
|
||||
return { success: true, message: "没有找到需要更新的数据", updated: 0 };
|
||||
}
|
||||
|
||||
@@ -100,10 +132,19 @@ export async function main(
|
||||
if (!dry_run) {
|
||||
const shorturlUpdated = await updateClickHouseShortUrl(shorturlData, chConfig, log);
|
||||
|
||||
// 更新同步状态
|
||||
const totalSynced = (syncState ? syncState.records_synced : 0) + shorturlUpdated;
|
||||
await updateSyncState(end_time, totalSynced, log);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: "shorturl表数据同步完成",
|
||||
shorturl_updated: shorturlUpdated
|
||||
shorturl_updated: shorturlUpdated,
|
||||
total_synced: totalSynced,
|
||||
sync_state: {
|
||||
last_sync_time: end_time,
|
||||
records_synced: totalSynced
|
||||
}
|
||||
};
|
||||
} else {
|
||||
log("测试模式: 不执行实际更新");
|
||||
@@ -129,6 +170,25 @@ export async function main(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新同步状态
|
||||
*/
|
||||
async function updateSyncState(lastSyncTime: string, recordsSynced: number, log: (message: string, isVerbose?: boolean) => void): Promise<void> {
|
||||
try {
|
||||
const newState: SyncState = {
|
||||
last_sync_time: lastSyncTime,
|
||||
records_synced: recordsSynced,
|
||||
last_run: new Date().toISOString()
|
||||
};
|
||||
|
||||
await setVariable(SYNC_STATE_KEY, newState);
|
||||
log(`同步状态已更新: 最后同步时间 ${lastSyncTime}, 累计同步记录数 ${recordsSynced}`, true);
|
||||
} catch (error) {
|
||||
log(`更新同步状态失败: ${error}`, true);
|
||||
// 继续执行,不中断同步过程
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从PostgreSQL获取shorturl数据
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user