auto refresh

This commit is contained in:
2025-04-17 18:28:08 +08:00
parent 6025641ab1
commit 53e1611670
4 changed files with 820 additions and 26 deletions

View File

@@ -306,6 +306,8 @@ function AnalyticsContent() {
const [geoData, setGeoData] = useState<GeoData[]>([]);
const [deviceData, setDeviceData] = useState<DeviceAnalyticsType | null>(null);
const [events, setEvents] = useState<Event[]>([]);
const [isRefreshing, setIsRefreshing] = useState(false); // New state to track auto-refresh
const [lastRefreshed, setLastRefreshed] = useState<Date | null>(null); // Track when data was last refreshed
// 添加 Snackbar 状态
const [isSnackbarOpen, setIsSnackbarOpen] = useState(false);
@@ -449,12 +451,133 @@ function AnalyticsContent() {
setError(err instanceof Error ? err.message : 'An error occurred while fetching data');
} finally {
setLoading(false);
setIsRefreshing(false); // Reset refreshing state
setLastRefreshed(new Date()); // Update last refreshed timestamp
}
};
fetchData();
}, [dateRange, selectedTeamIds, selectedProjectIds, selectedTagNames, selectedSubpath, currentPage, pageSize, selectedShortUrl, shouldFetchData]);
// Add auto-refresh functionality
useEffect(() => {
if (!shouldFetchData) return; // Don't set up refresh until initial data load is triggered
// Function to trigger a refresh of data
const refreshData = () => {
console.log('Auto-refreshing analytics data...');
// Only refresh if not already loading or refreshing
if (!loading && !isRefreshing) {
setIsRefreshing(true);
// Create a new fetch function instead of reusing the effect's fetchData
const fetchRefreshedData = async () => {
try {
const startTime = format(dateRange.from, "yyyy-MM-dd'T'HH:mm:ss'Z'");
const endTime = format(dateRange.to, "yyyy-MM-dd'T'HH:mm:ss'Z'");
// 构建基础URL和查询参数
const baseUrl = '/api/events';
const params = new URLSearchParams({
startTime,
endTime,
page: currentPage.toString(),
pageSize: pageSize.toString()
});
// Duplicate the parameters logic from the main fetch effect
if (selectedShortUrl && selectedShortUrl.externalId) {
params.append('linkId', selectedShortUrl.externalId);
} else {
const savedExternalId = sessionStorage.getItem('current_shorturl_external_id');
if (savedExternalId) {
params.append('linkId', savedExternalId);
}
}
if (selectedTeamIds.length > 0) {
selectedTeamIds.forEach(teamId => {
params.append('teamId', teamId);
});
}
if (selectedProjectIds.length > 0) {
selectedProjectIds.forEach(projectId => {
params.append('projectId', projectId);
});
}
if (selectedTagNames.length > 0) {
selectedTagNames.forEach(tagName => {
params.append('tagName', tagName);
});
}
if (selectedSubpath) {
params.append('subpath', selectedSubpath);
}
// Build all URLs with the same parameters
const summaryUrl = `${baseUrl}/summary?${params.toString()}`;
const timeSeriesUrl = `${baseUrl}/time-series?${params.toString()}`;
const geoUrl = `${baseUrl}/geo?${params.toString()}`;
const devicesUrl = `${baseUrl}/devices?${params.toString()}`;
const eventsUrl = `${baseUrl}?${params.toString()}`;
// Parallel requests for all data
const [summaryRes, timeSeriesRes, geoRes, deviceRes, eventsRes] = await Promise.all([
fetch(summaryUrl),
fetch(timeSeriesUrl),
fetch(geoUrl),
fetch(devicesUrl),
fetch(eventsUrl)
]);
const [summaryData, timeSeriesData, geoData, deviceData, eventsData] = await Promise.all([
summaryRes.json(),
timeSeriesRes.json(),
geoRes.json(),
deviceRes.json(),
eventsRes.json()
]);
// Update state with fresh data
if (summaryRes.ok) setSummary(summaryData.data);
if (timeSeriesRes.ok) setTimeSeriesData(timeSeriesData.data);
if (geoRes.ok) setGeoData(geoData.data);
if (deviceRes.ok) setDeviceData(deviceData.data);
if (eventsRes.ok) {
setEvents(eventsData.data || []);
// Update pagination info
if (eventsData.meta) {
const totalCount = parseInt(String(eventsData.meta.total), 10);
if (!isNaN(totalCount)) {
setTotalEvents(totalCount);
}
}
}
} catch (err) {
console.error('Auto-refresh error:', err);
// Don't show errors during auto-refresh to avoid disrupting the UI
} finally {
setIsRefreshing(false);
setLastRefreshed(new Date()); // Update last refreshed timestamp
}
};
fetchRefreshedData();
}
};
// Set up the interval for auto-refresh every 30 seconds
const intervalId = setInterval(refreshData, 30000);
// Clean up the interval when the component unmounts
return () => {
clearInterval(intervalId);
};
}, [shouldFetchData, loading, isRefreshing, dateRange, selectedTeamIds, selectedProjectIds, selectedTagNames, selectedSubpath, currentPage, pageSize, selectedShortUrl]);
// Function to clear the shorturl filter
const handleClearShortUrlFilter = () => {
// 先清除 store 中的数据
@@ -499,7 +622,7 @@ function AnalyticsContent() {
setCurrentPage(1);
};
if (loading) {
if (loading && !isRefreshing) {
return (
<div className="flex items-center justify-center min-h-screen">
<div className="animate-spin rounded-full h-12 w-12 border-t-2 border-b-2 border-blue-500" />
@@ -532,8 +655,24 @@ function AnalyticsContent() {
)}
<div className="flex justify-between items-center mb-8">
<h1 className="text-2xl font-bold text-gray-900">Analytics Dashboard</h1>
<div>
<h1 className="text-2xl font-bold text-gray-900">Analytics Dashboard</h1>
{lastRefreshed && (
<div className="text-xs text-gray-500 mt-1">
Last updated: {format(lastRefreshed, 'MMM d, yyyy HH:mm:ss')}
{isRefreshing ? ' · Refreshing...' : ' · Auto-refreshes every 30 seconds'}
</div>
)}
</div>
<div className="flex flex-col gap-4 md:flex-row md:items-center">
{/* Show refresh indicator */}
{isRefreshing && (
<div className="flex items-center text-blue-600 text-sm">
<div className="animate-spin rounded-full h-4 w-4 border-t-2 border-b-2 border-blue-500 mr-2" />
<span>Refreshing data...</span>
</div>
)}
{/* 如果有选定的 shorturl可以显示一个提示显示更多详细信息 */}
{selectedShortUrl && (
<div className="bg-blue-100 text-blue-800 px-3 py-2 rounded-md text-sm flex flex-col">

View File

@@ -0,0 +1,522 @@
// 从MongoDB的main.short表同步数据到PostgreSQL的short_url.shorturl表
import { getVariable, setVariable, getResource } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
import { Client } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
interface MongoConfig {
host: string;
port: string;
db: string;
username: string;
password: string;
}
interface PostgresConfig {
host: string;
port: number;
database: string;
user: string;
password: string;
schema: string;
}
// 扩展ShortRecord接口以包含更多可能的字段
interface ShortRecord {
_id: ObjectId;
origin: string;
slug: string;
domain: string | null;
createTime: number | { $numberLong: string } | string;
// 可选字段
expiredAt?: number | { $numberLong: string } | string | null;
expiredUrl?: string | null;
password?: string | null;
image?: string | null;
title?: string | null;
description?: string | null;
}
interface SyncState {
last_sync_time: number;
records_synced: number;
last_sync_id?: string;
}
// 同步状态键名
const SYNC_STATE_KEY = "f/limq/mongo_short_to_postgres_shorturl_shorturl_state";
export async function main(
batch_size = 1000,
max_records = 9999999,
timeout_minutes = 60,
skip_duplicate_check = false,
force_insert = false,
reset_sync_state = false,
postgres_schema = "short_url", // 添加schema参数允许运行时指定
postgres_database = "postgres", // 添加数据库名称参数默认为postgres
domain = "upj.to" // 添加domain参数允许用户指定域名
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("开始执行MongoDB到PostgreSQL的同步任务");
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
logWithTimestamp(`使用域名: ${domain}`);
if (skip_duplicate_check) {
logWithTimestamp("⚠️ 警告: 已启用跳过重复检查模式,不会检查记录是否已存在");
}
if (force_insert) {
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
}
if (reset_sync_state) {
logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据");
}
// 设置超时
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
// 检查是否超时
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
logWithTimestamp(`运行时间超过${timeout_minutes}分钟,暂停执行`);
return true;
}
return false;
};
// 日期解析函数,处理不同格式的日期
const parseDate = (dateValue: any): Date | null => {
if (!dateValue) return null;
// 处理 MongoDB $numberLong 格式
if (dateValue.$numberLong) {
return new Date(Number(dateValue.$numberLong));
}
// 处理普通时间戳
if (typeof dateValue === 'number') {
return new Date(dateValue);
}
// 处理 ISO 字符串格式
if (typeof dateValue === 'string') {
const date = new Date(dateValue);
return isNaN(date.getTime()) ? null : date;
}
return null;
};
// 获取MongoDB和PostgreSQL的连接信息
let mongoConfig: MongoConfig;
let postgresConfig: PostgresConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
if (typeof rawMongoConfig === "string") {
try {
mongoConfig = JSON.parse(rawMongoConfig);
} catch (e) {
console.error("MongoDB配置解析失败:", e);
throw e;
}
} else {
mongoConfig = rawMongoConfig as MongoConfig;
}
// 使用getResource获取PostgreSQL资源
try {
logWithTimestamp("正在获取PostgreSQL资源...");
const resourceConfig = await getResource("f/limq/production_supabase");
// 将resource转换为PostgresConfig
postgresConfig = {
host: resourceConfig.host || "",
port: Number(resourceConfig.port) || 5432,
user: resourceConfig.user || "",
password: resourceConfig.password || "",
database: resourceConfig.database || postgres_database, // 使用提供的数据库名称作为备选
schema: resourceConfig.schema || postgres_schema // 使用提供的schema作为备选
};
// 检查并记录配置信息
if (!postgresConfig.database || postgresConfig.database === "undefined") {
postgresConfig.database = postgres_database;
logWithTimestamp(`数据库名称未指定或为"undefined",使用提供的值: ${postgresConfig.database}`);
}
if (!postgresConfig.schema || postgresConfig.schema === "undefined") {
postgresConfig.schema = postgres_schema;
logWithTimestamp(`Schema未指定或为"undefined",使用提供的值: ${postgresConfig.schema}`);
}
logWithTimestamp(`PostgreSQL配置: 数据库=${postgresConfig.database}, Schema=${postgresConfig.schema}`);
} catch (e) {
console.error("获取PostgreSQL资源失败:", e);
throw e;
}
console.log("MongoDB配置:", JSON.stringify({
...mongoConfig,
password: "****" // 隐藏密码
}));
console.log("PostgreSQL配置:", JSON.stringify({
...postgresConfig,
password: "****" // 隐藏密码
}));
} catch (error) {
console.error("获取配置失败:", error);
throw error;
}
// 获取上次同步状态
let lastSyncState: SyncState | null = null;
if (!reset_sync_state) {
try {
const rawSyncState = await getVariable(SYNC_STATE_KEY);
if (rawSyncState) {
if (typeof rawSyncState === "string") {
try {
lastSyncState = JSON.parse(rawSyncState);
} catch (e) {
logWithTimestamp(`解析上次同步状态失败: ${e}, 将从头开始同步`);
}
} else {
lastSyncState = rawSyncState as SyncState;
}
}
} catch (error) {
logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`);
}
}
if (lastSyncState) {
logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`);
if (lastSyncState.last_sync_id) {
logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`);
}
} else {
logWithTimestamp("没有找到上次同步状态,将从头开始同步");
}
// 构建MongoDB连接URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 构建PostgreSQL连接URL
const pgConnectionString = `postgres://${postgresConfig.user}:${postgresConfig.password}@${postgresConfig.host}:${postgresConfig.port}/${postgresConfig.database}`;
console.log(`PostgreSQL连接URL: ${pgConnectionString.replace(/:[^:]*@/, ":****@")}`);
// 连接MongoDB
const mongoClient = new MongoClient();
let pgClient: Client | null = null;
try {
await mongoClient.connect(mongoUrl);
logWithTimestamp("MongoDB连接成功");
// 连接PostgreSQL
pgClient = new Client(pgConnectionString);
await pgClient.connect();
logWithTimestamp("PostgreSQL连接成功");
// 确认PostgreSQL schema存在
try {
await pgClient.queryArray(`SELECT 1 FROM information_schema.schemata WHERE schema_name = '${postgresConfig.schema}'`);
logWithTimestamp(`PostgreSQL schema '${postgresConfig.schema}' 已确认存在`);
} catch (error) {
logWithTimestamp(`检查PostgreSQL schema失败: ${error}`);
throw new Error(`Schema '${postgresConfig.schema}' 可能不存在`);
}
const db = mongoClient.database(mongoConfig.db);
const shortCollection = db.collection<ShortRecord>("short");
// 构建查询条件,根据上次同步状态获取新记录
const query: Record<string, unknown> = {};
// 如果有上次同步状态,则只获取更新的记录
if (lastSyncState && lastSyncState.last_sync_time) {
// 使用上次同步时间作为过滤条件
query.createTime = { $gt: lastSyncState.last_sync_time };
logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`);
}
// 计算总记录数
const totalRecords = await shortCollection.countDocuments(query);
logWithTimestamp(`找到 ${totalRecords} 条新记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
logWithTimestamp(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
logWithTimestamp("没有新记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
message: "没有新记录需要同步"
};
}
// 检查记录是否已经存在于PostgreSQL中
const checkExistingRecords = async (records: ShortRecord[]): Promise<ShortRecord[]> => {
if (records.length === 0) return [];
// 如果跳过重复检查或强制插入,则直接返回所有记录
if (skip_duplicate_check || force_insert) {
logWithTimestamp(`已跳过重复检查,准备处理所有 ${records.length} 条记录`);
return records;
}
logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于PostgreSQL中...`);
try {
// 提取所有记录的slugs
const slugs = records.map(record => record.slug);
// 查询PostgreSQL中是否已存在这些slugs
const result = await pgClient!.queryArray(`
SELECT slug FROM ${postgresConfig.schema}.shorturl
WHERE slug = ANY($1::text[])
`, [slugs]);
// 将已存在的slugs加入到集合中
const existingSlugs = new Set<string>();
for (const row of result.rows) {
existingSlugs.add(row[0] as string);
}
logWithTimestamp(`检测到 ${existingSlugs.size} 条记录已存在于PostgreSQL中`);
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingSlugs.has(record.slug));
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
const error = err as Error;
logWithTimestamp(`PostgreSQL查询出错: ${error.message}`);
if (skip_duplicate_check) {
logWithTimestamp("已启用跳过重复检查,将继续处理所有记录");
return records;
} else {
throw error;
}
}
};
// 处理记录的函数
const processRecords = async (records: ShortRecord[]) => {
if (records.length === 0) return 0;
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`);
// 检查记录是否已存在
let newRecords;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_duplicate_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理");
return 0;
}
logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`);
// 批量插入PostgreSQL
try {
// 开始事务
await pgClient!.queryArray('BEGIN');
let insertedCount = 0;
// 由于参数可能很多,按小批次处理
const smallBatchSize = 100;
for (let i = 0; i < newRecords.length; i += smallBatchSize) {
const batchRecords = newRecords.slice(i, i + smallBatchSize);
// 构造批量插入语句
const placeholders = [];
const values = [];
let valueIndex = 1;
for (const record of batchRecords) {
// 参考提供的字段处理方式处理数据
const createdAt = parseDate(record.createTime);
const updatedAt = createdAt; // 设置更新时间等于创建时间
const fullShortUrl = `${domain}/${record.slug}`;
placeholders.push(`($${valueIndex}, $${valueIndex+1}, $${valueIndex+2}, $${valueIndex+3}, $${valueIndex+4}, $${valueIndex+5}, $${valueIndex+6}, $${valueIndex+7}, $${valueIndex+8}, $${valueIndex+9}, $${valueIndex+10}, $${valueIndex+11}, $${valueIndex+12})`);
values.push(
record._id.toString(), // id
record.slug, // slug
domain, // domain (使用提供的域名)
record.slug, // name (使用slug作为name)
record.slug, // title (使用slug作为title)
record.origin || '', // origin
createdAt, // created_at
updatedAt, // updated_at
fullShortUrl, // full_short_url
record.image || null, // image
record.description || null, // description
record.expiredUrl || null, // expired_url
parseDate(record.expiredAt) // expired_at
);
valueIndex += 13;
}
const query = `
INSERT INTO ${postgresConfig.schema}.shorturl
(id, slug, domain, name, title, origin, created_at, updated_at, full_short_url, image, description, expired_url, expired_at)
VALUES ${placeholders.join(', ')}
`;
await pgClient!.queryArray(query, values);
insertedCount += batchRecords.length;
logWithTimestamp(`已插入 ${insertedCount}/${newRecords.length} 条记录`);
}
// 提交事务
await pgClient!.queryArray('COMMIT');
logWithTimestamp(`成功插入 ${insertedCount} 条记录到PostgreSQL`);
return insertedCount;
} catch (err) {
const error = err as Error;
// 发生错误,回滚事务
await pgClient!.queryArray('ROLLBACK');
logWithTimestamp(`向PostgreSQL插入数据失败: ${error.message}`);
throw error;
}
};
// 批量处理记录
let processedRecords = 0;
let totalBatchRecords = 0;
let lastSyncTime = 0;
let lastSyncId = "";
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
break;
}
// 每批次都输出进度
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await shortCollection.find(
query,
{
sort: { createTime: 1 },
skip: page * batch_size,
limit: batch_size
}
).toArray();
if (records.length === 0) {
logWithTimestamp("没有找到更多数据,同步结束");
break;
}
// 找到数据,开始处理
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, slug=${records[0].slug}, 时间=${new Date(typeof records[0].createTime === 'number' ? records[0].createTime : 0).toISOString()}`);
if (records.length > 1) {
const lastRec = records[records.length-1];
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${lastRec._id}, slug=${lastRec.slug}, 时间=${new Date(typeof lastRec.createTime === 'number' ? lastRec.createTime : 0).toISOString()}`);
}
}
const batchSize = await processRecords(records);
processedRecords += records.length;
totalBatchRecords += batchSize;
// 更新最后处理的记录时间和ID
if (records.length > 0) {
const lastRecord = records[records.length - 1];
// 提取数字时间戳
let lastCreateTime = 0;
if (typeof lastRecord.createTime === 'number') {
lastCreateTime = lastRecord.createTime;
} else if (lastRecord.createTime && lastRecord.createTime.$numberLong) {
lastCreateTime = Number(lastRecord.createTime.$numberLong);
}
lastSyncTime = Math.max(lastSyncTime, lastCreateTime);
lastSyncId = lastRecord._id.toString();
}
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
}
// 更新同步状态
if (processedRecords > 0 && lastSyncTime > 0) {
// 创建新的同步状态
const newSyncState: SyncState = {
last_sync_time: lastSyncTime,
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords,
last_sync_id: lastSyncId
};
try {
// 保存同步状态
await setVariable(SYNC_STATE_KEY, newSyncState);
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
} catch (err) {
const error = err as Error;
logWithTimestamp(`更新同步状态失败: ${error.message}`);
}
}
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null,
message: "数据同步完成"
};
} catch (err) {
const error = err as Error;
console.error("同步过程中发生错误:", error);
return {
success: false,
error: error.message,
stack: error.stack
};
} finally {
// 关闭连接
if (pgClient) {
await pgClient.end();
logWithTimestamp("PostgreSQL连接已关闭");
}
await mongoClient.close();
logWithTimestamp("MongoDB连接已关闭");
}
}

View File

@@ -39,13 +39,17 @@ interface SyncState {
last_sync_id?: string;
}
// 同步状态键名
const SYNC_STATE_KEY = "f/shorturl_analytics/mongo_sync_state";
export async function main(
batch_size = 1000,
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false,
database_override = "shorturl_analytics" // 添加数据库名称参数默认为shorturl_analytics
database_override = "shorturl_analytics", // 添加数据库名称参数默认为shorturl_analytics
reset_sync_state = false // 添加参数用于重置同步状态
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
@@ -60,6 +64,9 @@ export async function main(
if (force_insert) {
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
}
if (reset_sync_state) {
logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据");
}
// 设置超时
const startTime = Date.now();
@@ -127,6 +134,36 @@ export async function main(
throw error;
}
// 获取上次同步状态
let lastSyncState: SyncState | null = null;
if (!reset_sync_state) {
try {
const rawSyncState = await getVariable(SYNC_STATE_KEY);
if (rawSyncState) {
if (typeof rawSyncState === "string") {
try {
lastSyncState = JSON.parse(rawSyncState);
} catch (e) {
logWithTimestamp(`解析上次同步状态失败: ${e}, 将从头开始同步`);
}
} else {
lastSyncState = rawSyncState as SyncState;
}
}
} catch (error) {
logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`);
}
}
if (lastSyncState) {
logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`);
if (lastSyncState.last_sync_id) {
logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`);
}
} else {
logWithTimestamp("没有找到上次同步状态,将从头开始同步");
}
// 构建MongoDB连接URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
@@ -145,25 +182,32 @@ export async function main(
const db = client.database(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace");
// 构建查询条件,获取所有记录
// 构建查询条件,根据上次同步状态获取新记录
const query: Record<string, unknown> = {
type: 1 // 只同步type为1的记录
};
// 如果有上次同步状态,则只获取更新的记录
if (lastSyncState && lastSyncState.last_sync_time) {
// 使用上次同步时间作为过滤条件
query.createTime = { $gt: lastSyncState.last_sync_time };
logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`);
}
// 计算总记录数
const totalRecords = await traceCollection.countDocuments(query);
console.log(`找到 ${totalRecords} 条记录需要同步`);
console.log(`找到 ${totalRecords}记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
console.log("没有记录需要同步,任务完成");
console.log("没有记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
message: "没有记录需要同步"
message: "没有记录需要同步"
};
}
@@ -464,6 +508,8 @@ export async function main(
// 批量处理记录
let processedRecords = 0;
let totalBatchRecords = 0;
let lastSyncTime = 0;
let lastSyncId = "";
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
@@ -505,13 +551,40 @@ export async function main(
processedRecords += records.length;
totalBatchRecords += batchSize;
// 更新最后处理的记录时间和ID
if (records.length > 0) {
const lastRecord = records[records.length - 1];
lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime);
lastSyncId = lastRecord._id.toString();
}
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
}
// 更新同步状态
if (processedRecords > 0 && lastSyncTime > 0) {
// 创建新的同步状态
const newSyncState: SyncState = {
last_sync_time: lastSyncTime,
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords,
last_sync_id: lastSyncId
};
try {
// 保存同步状态
await setVariable(SYNC_STATE_KEY, newSyncState);
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
} catch (err) {
const error = err as Error;
logWithTimestamp(`更新同步状态失败: ${error.message}`);
}
}
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null,
message: "数据同步完成"
};
} catch (err) {

View File

@@ -3,7 +3,17 @@
// 创建日期: 2023-11-21
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
// 同步状态接口
interface SyncState {
last_sync_time: string; // 上次同步的结束时间
records_synced: number; // 累计同步的记录数
last_run: string; // 上次运行的时间
}
// 同步状态键名
const SYNC_STATE_KEY = "f/shorturl_analytics/shorturl_to_clickhouse_state";
// PostgreSQL配置接口
interface PgConfig {
@@ -42,24 +52,15 @@ interface ShortUrlData {
* 同步PostgreSQL short_url.shorturl表数据到ClickHouse
*/
export async function main(
params: {
/** 同步数据的开始时间ISO 8601格式。默认为1小时前 */
start_time?: string;
/** 同步数据的结束时间ISO 8601格式。默认为当前时间 */
end_time?: string;
/** 是否为测试模式(不执行实际更新) */
dry_run?: boolean;
/** 是否显示详细日志 */
verbose?: boolean;
}
/** 是否为测试模式(不执行实际更新) */
dry_run = false,
/** 是否显示详细日志 */
verbose = false,
/** 是否重置同步状态(从头开始同步) */
reset_sync_state = false,
/** 如果没有同步状态往前查询多少小时的数据默认1小时 */
default_hours_back = 1
) {
// 设置默认参数
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000).toISOString();
const start_time = params.start_time || oneHourAgo;
const end_time = params.end_time || new Date().toISOString();
const dry_run = params.dry_run || false;
const verbose = params.verbose || false;
// 初始化日志函数
const log = (message: string, isVerbose = false) => {
if (!isVerbose || verbose) {
@@ -67,6 +68,33 @@ export async function main(
}
};
// 获取同步状态
let syncState: SyncState | null = null;
if (!reset_sync_state) {
try {
log("获取同步状态...", true);
const rawState = await getVariable(SYNC_STATE_KEY);
if (rawState) {
if (typeof rawState === "string") {
syncState = JSON.parse(rawState);
} else {
syncState = rawState as SyncState;
}
log(`找到上次同步状态: 最后同步时间 ${syncState.last_sync_time}, 已同步记录数 ${syncState.records_synced}`, true);
}
} catch (error) {
log(`获取同步状态失败: ${error}, 将使用默认设置`, true);
}
} else {
log("重置同步状态,从头开始同步", true);
}
// 设置时间范围
const oneHourAgo = new Date(Date.now() - default_hours_back * 60 * 60 * 1000).toISOString();
// 如果有同步状态,使用上次同步时间作为开始时间;否则使用默认时间
const start_time = syncState ? syncState.last_sync_time : oneHourAgo;
const end_time = new Date().toISOString();
log(`开始同步shorturl表数据: ${start_time}${end_time}`);
let pgPool: Pool | null = null;
@@ -90,6 +118,10 @@ export async function main(
log(`成功获取 ${shorturlData.length} 条shorturl数据`);
if (shorturlData.length === 0) {
// 更新同步状态,即使没有新数据
if (!dry_run) {
await updateSyncState(end_time, syncState ? syncState.records_synced : 0, log);
}
return { success: true, message: "没有找到需要更新的数据", updated: 0 };
}
@@ -100,10 +132,19 @@ export async function main(
if (!dry_run) {
const shorturlUpdated = await updateClickHouseShortUrl(shorturlData, chConfig, log);
// 更新同步状态
const totalSynced = (syncState ? syncState.records_synced : 0) + shorturlUpdated;
await updateSyncState(end_time, totalSynced, log);
return {
success: true,
message: "shorturl表数据同步完成",
shorturl_updated: shorturlUpdated
shorturl_updated: shorturlUpdated,
total_synced: totalSynced,
sync_state: {
last_sync_time: end_time,
records_synced: totalSynced
}
};
} else {
log("测试模式: 不执行实际更新");
@@ -129,6 +170,25 @@ export async function main(
}
}
/**
* 更新同步状态
*/
async function updateSyncState(lastSyncTime: string, recordsSynced: number, log: (message: string, isVerbose?: boolean) => void): Promise<void> {
try {
const newState: SyncState = {
last_sync_time: lastSyncTime,
records_synced: recordsSynced,
last_run: new Date().toISOString()
};
await setVariable(SYNC_STATE_KEY, newState);
log(`同步状态已更新: 最后同步时间 ${lastSyncTime}, 累计同步记录数 ${recordsSynced}`, true);
} catch (error) {
log(`更新同步状态失败: ${error}`, true);
// 继续执行,不中断同步过程
}
}
/**
* 从PostgreSQL获取shorturl数据
*/