Compare commits
2 Commits
b9c2828e54
...
53e1611670
| Author | SHA1 | Date | |
|---|---|---|---|
| 53e1611670 | |||
| 6025641ab1 |
@@ -306,6 +306,8 @@ function AnalyticsContent() {
|
|||||||
const [geoData, setGeoData] = useState<GeoData[]>([]);
|
const [geoData, setGeoData] = useState<GeoData[]>([]);
|
||||||
const [deviceData, setDeviceData] = useState<DeviceAnalyticsType | null>(null);
|
const [deviceData, setDeviceData] = useState<DeviceAnalyticsType | null>(null);
|
||||||
const [events, setEvents] = useState<Event[]>([]);
|
const [events, setEvents] = useState<Event[]>([]);
|
||||||
|
const [isRefreshing, setIsRefreshing] = useState(false); // New state to track auto-refresh
|
||||||
|
const [lastRefreshed, setLastRefreshed] = useState<Date | null>(null); // Track when data was last refreshed
|
||||||
|
|
||||||
// 添加 Snackbar 状态
|
// 添加 Snackbar 状态
|
||||||
const [isSnackbarOpen, setIsSnackbarOpen] = useState(false);
|
const [isSnackbarOpen, setIsSnackbarOpen] = useState(false);
|
||||||
@@ -449,12 +451,133 @@ function AnalyticsContent() {
|
|||||||
setError(err instanceof Error ? err.message : 'An error occurred while fetching data');
|
setError(err instanceof Error ? err.message : 'An error occurred while fetching data');
|
||||||
} finally {
|
} finally {
|
||||||
setLoading(false);
|
setLoading(false);
|
||||||
|
setIsRefreshing(false); // Reset refreshing state
|
||||||
|
setLastRefreshed(new Date()); // Update last refreshed timestamp
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
fetchData();
|
fetchData();
|
||||||
}, [dateRange, selectedTeamIds, selectedProjectIds, selectedTagNames, selectedSubpath, currentPage, pageSize, selectedShortUrl, shouldFetchData]);
|
}, [dateRange, selectedTeamIds, selectedProjectIds, selectedTagNames, selectedSubpath, currentPage, pageSize, selectedShortUrl, shouldFetchData]);
|
||||||
|
|
||||||
|
// Add auto-refresh functionality
|
||||||
|
useEffect(() => {
|
||||||
|
if (!shouldFetchData) return; // Don't set up refresh until initial data load is triggered
|
||||||
|
|
||||||
|
// Function to trigger a refresh of data
|
||||||
|
const refreshData = () => {
|
||||||
|
console.log('Auto-refreshing analytics data...');
|
||||||
|
// Only refresh if not already loading or refreshing
|
||||||
|
if (!loading && !isRefreshing) {
|
||||||
|
setIsRefreshing(true);
|
||||||
|
|
||||||
|
// Create a new fetch function instead of reusing the effect's fetchData
|
||||||
|
const fetchRefreshedData = async () => {
|
||||||
|
try {
|
||||||
|
const startTime = format(dateRange.from, "yyyy-MM-dd'T'HH:mm:ss'Z'");
|
||||||
|
const endTime = format(dateRange.to, "yyyy-MM-dd'T'HH:mm:ss'Z'");
|
||||||
|
|
||||||
|
// 构建基础URL和查询参数
|
||||||
|
const baseUrl = '/api/events';
|
||||||
|
const params = new URLSearchParams({
|
||||||
|
startTime,
|
||||||
|
endTime,
|
||||||
|
page: currentPage.toString(),
|
||||||
|
pageSize: pageSize.toString()
|
||||||
|
});
|
||||||
|
|
||||||
|
// Duplicate the parameters logic from the main fetch effect
|
||||||
|
if (selectedShortUrl && selectedShortUrl.externalId) {
|
||||||
|
params.append('linkId', selectedShortUrl.externalId);
|
||||||
|
} else {
|
||||||
|
const savedExternalId = sessionStorage.getItem('current_shorturl_external_id');
|
||||||
|
if (savedExternalId) {
|
||||||
|
params.append('linkId', savedExternalId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (selectedTeamIds.length > 0) {
|
||||||
|
selectedTeamIds.forEach(teamId => {
|
||||||
|
params.append('teamId', teamId);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (selectedProjectIds.length > 0) {
|
||||||
|
selectedProjectIds.forEach(projectId => {
|
||||||
|
params.append('projectId', projectId);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (selectedTagNames.length > 0) {
|
||||||
|
selectedTagNames.forEach(tagName => {
|
||||||
|
params.append('tagName', tagName);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (selectedSubpath) {
|
||||||
|
params.append('subpath', selectedSubpath);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build all URLs with the same parameters
|
||||||
|
const summaryUrl = `${baseUrl}/summary?${params.toString()}`;
|
||||||
|
const timeSeriesUrl = `${baseUrl}/time-series?${params.toString()}`;
|
||||||
|
const geoUrl = `${baseUrl}/geo?${params.toString()}`;
|
||||||
|
const devicesUrl = `${baseUrl}/devices?${params.toString()}`;
|
||||||
|
const eventsUrl = `${baseUrl}?${params.toString()}`;
|
||||||
|
|
||||||
|
// Parallel requests for all data
|
||||||
|
const [summaryRes, timeSeriesRes, geoRes, deviceRes, eventsRes] = await Promise.all([
|
||||||
|
fetch(summaryUrl),
|
||||||
|
fetch(timeSeriesUrl),
|
||||||
|
fetch(geoUrl),
|
||||||
|
fetch(devicesUrl),
|
||||||
|
fetch(eventsUrl)
|
||||||
|
]);
|
||||||
|
|
||||||
|
const [summaryData, timeSeriesData, geoData, deviceData, eventsData] = await Promise.all([
|
||||||
|
summaryRes.json(),
|
||||||
|
timeSeriesRes.json(),
|
||||||
|
geoRes.json(),
|
||||||
|
deviceRes.json(),
|
||||||
|
eventsRes.json()
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Update state with fresh data
|
||||||
|
if (summaryRes.ok) setSummary(summaryData.data);
|
||||||
|
if (timeSeriesRes.ok) setTimeSeriesData(timeSeriesData.data);
|
||||||
|
if (geoRes.ok) setGeoData(geoData.data);
|
||||||
|
if (deviceRes.ok) setDeviceData(deviceData.data);
|
||||||
|
if (eventsRes.ok) {
|
||||||
|
setEvents(eventsData.data || []);
|
||||||
|
// Update pagination info
|
||||||
|
if (eventsData.meta) {
|
||||||
|
const totalCount = parseInt(String(eventsData.meta.total), 10);
|
||||||
|
if (!isNaN(totalCount)) {
|
||||||
|
setTotalEvents(totalCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Auto-refresh error:', err);
|
||||||
|
// Don't show errors during auto-refresh to avoid disrupting the UI
|
||||||
|
} finally {
|
||||||
|
setIsRefreshing(false);
|
||||||
|
setLastRefreshed(new Date()); // Update last refreshed timestamp
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
fetchRefreshedData();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Set up the interval for auto-refresh every 30 seconds
|
||||||
|
const intervalId = setInterval(refreshData, 30000);
|
||||||
|
|
||||||
|
// Clean up the interval when the component unmounts
|
||||||
|
return () => {
|
||||||
|
clearInterval(intervalId);
|
||||||
|
};
|
||||||
|
}, [shouldFetchData, loading, isRefreshing, dateRange, selectedTeamIds, selectedProjectIds, selectedTagNames, selectedSubpath, currentPage, pageSize, selectedShortUrl]);
|
||||||
|
|
||||||
// Function to clear the shorturl filter
|
// Function to clear the shorturl filter
|
||||||
const handleClearShortUrlFilter = () => {
|
const handleClearShortUrlFilter = () => {
|
||||||
// 先清除 store 中的数据
|
// 先清除 store 中的数据
|
||||||
@@ -499,7 +622,7 @@ function AnalyticsContent() {
|
|||||||
setCurrentPage(1);
|
setCurrentPage(1);
|
||||||
};
|
};
|
||||||
|
|
||||||
if (loading) {
|
if (loading && !isRefreshing) {
|
||||||
return (
|
return (
|
||||||
<div className="flex items-center justify-center min-h-screen">
|
<div className="flex items-center justify-center min-h-screen">
|
||||||
<div className="animate-spin rounded-full h-12 w-12 border-t-2 border-b-2 border-blue-500" />
|
<div className="animate-spin rounded-full h-12 w-12 border-t-2 border-b-2 border-blue-500" />
|
||||||
@@ -532,8 +655,24 @@ function AnalyticsContent() {
|
|||||||
)}
|
)}
|
||||||
|
|
||||||
<div className="flex justify-between items-center mb-8">
|
<div className="flex justify-between items-center mb-8">
|
||||||
|
<div>
|
||||||
<h1 className="text-2xl font-bold text-gray-900">Analytics Dashboard</h1>
|
<h1 className="text-2xl font-bold text-gray-900">Analytics Dashboard</h1>
|
||||||
|
{lastRefreshed && (
|
||||||
|
<div className="text-xs text-gray-500 mt-1">
|
||||||
|
Last updated: {format(lastRefreshed, 'MMM d, yyyy HH:mm:ss')}
|
||||||
|
{isRefreshing ? ' · Refreshing...' : ' · Auto-refreshes every 30 seconds'}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
<div className="flex flex-col gap-4 md:flex-row md:items-center">
|
<div className="flex flex-col gap-4 md:flex-row md:items-center">
|
||||||
|
{/* Show refresh indicator */}
|
||||||
|
{isRefreshing && (
|
||||||
|
<div className="flex items-center text-blue-600 text-sm">
|
||||||
|
<div className="animate-spin rounded-full h-4 w-4 border-t-2 border-b-2 border-blue-500 mr-2" />
|
||||||
|
<span>Refreshing data...</span>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
|
||||||
{/* 如果有选定的 shorturl,可以显示一个提示,显示更多详细信息 */}
|
{/* 如果有选定的 shorturl,可以显示一个提示,显示更多详细信息 */}
|
||||||
{selectedShortUrl && (
|
{selectedShortUrl && (
|
||||||
<div className="bg-blue-100 text-blue-800 px-3 py-2 rounded-md text-sm flex flex-col">
|
<div className="bg-blue-100 text-blue-800 px-3 py-2 rounded-md text-sm flex flex-col">
|
||||||
|
|||||||
1
scripts/db/sql/clickhouse/truncate_events.sh
Normal file
1
scripts/db/sql/clickhouse/truncate_events.sh
Normal file
@@ -0,0 +1 @@
|
|||||||
|
./ch-query.sh -q "TRUNCATE TABLE shorturl_analytics.events"
|
||||||
522
windmill/sync_mongo_short_to_postgres_short_url_shorturl.ts
Normal file
522
windmill/sync_mongo_short_to_postgres_short_url_shorturl.ts
Normal file
@@ -0,0 +1,522 @@
|
|||||||
|
// 从MongoDB的main.short表同步数据到PostgreSQL的short_url.shorturl表
|
||||||
|
import { getVariable, setVariable, getResource } from "npm:windmill-client@1";
|
||||||
|
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
|
||||||
|
|
||||||
|
interface MongoConfig {
|
||||||
|
host: string;
|
||||||
|
port: string;
|
||||||
|
db: string;
|
||||||
|
username: string;
|
||||||
|
password: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface PostgresConfig {
|
||||||
|
host: string;
|
||||||
|
port: number;
|
||||||
|
database: string;
|
||||||
|
user: string;
|
||||||
|
password: string;
|
||||||
|
schema: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 扩展ShortRecord接口以包含更多可能的字段
|
||||||
|
interface ShortRecord {
|
||||||
|
_id: ObjectId;
|
||||||
|
origin: string;
|
||||||
|
slug: string;
|
||||||
|
domain: string | null;
|
||||||
|
createTime: number | { $numberLong: string } | string;
|
||||||
|
// 可选字段
|
||||||
|
expiredAt?: number | { $numberLong: string } | string | null;
|
||||||
|
expiredUrl?: string | null;
|
||||||
|
password?: string | null;
|
||||||
|
image?: string | null;
|
||||||
|
title?: string | null;
|
||||||
|
description?: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface SyncState {
|
||||||
|
last_sync_time: number;
|
||||||
|
records_synced: number;
|
||||||
|
last_sync_id?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 同步状态键名
|
||||||
|
const SYNC_STATE_KEY = "f/limq/mongo_short_to_postgres_shorturl_shorturl_state";
|
||||||
|
|
||||||
|
export async function main(
|
||||||
|
batch_size = 1000,
|
||||||
|
max_records = 9999999,
|
||||||
|
timeout_minutes = 60,
|
||||||
|
skip_duplicate_check = false,
|
||||||
|
force_insert = false,
|
||||||
|
reset_sync_state = false,
|
||||||
|
postgres_schema = "short_url", // 添加schema参数,允许运行时指定
|
||||||
|
postgres_database = "postgres", // 添加数据库名称参数,默认为postgres
|
||||||
|
domain = "upj.to" // 添加domain参数,允许用户指定域名
|
||||||
|
) {
|
||||||
|
const logWithTimestamp = (message: string) => {
|
||||||
|
const now = new Date();
|
||||||
|
console.log(`[${now.toISOString()}] ${message}`);
|
||||||
|
};
|
||||||
|
|
||||||
|
logWithTimestamp("开始执行MongoDB到PostgreSQL的同步任务");
|
||||||
|
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
|
||||||
|
logWithTimestamp(`使用域名: ${domain}`);
|
||||||
|
if (skip_duplicate_check) {
|
||||||
|
logWithTimestamp("⚠️ 警告: 已启用跳过重复检查模式,不会检查记录是否已存在");
|
||||||
|
}
|
||||||
|
if (force_insert) {
|
||||||
|
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
|
||||||
|
}
|
||||||
|
if (reset_sync_state) {
|
||||||
|
logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置超时
|
||||||
|
const startTime = Date.now();
|
||||||
|
const timeoutMs = timeout_minutes * 60 * 1000;
|
||||||
|
|
||||||
|
// 检查是否超时
|
||||||
|
const checkTimeout = () => {
|
||||||
|
if (Date.now() - startTime > timeoutMs) {
|
||||||
|
logWithTimestamp(`运行时间超过${timeout_minutes}分钟,暂停执行`);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
// 日期解析函数,处理不同格式的日期
|
||||||
|
const parseDate = (dateValue: any): Date | null => {
|
||||||
|
if (!dateValue) return null;
|
||||||
|
|
||||||
|
// 处理 MongoDB $numberLong 格式
|
||||||
|
if (dateValue.$numberLong) {
|
||||||
|
return new Date(Number(dateValue.$numberLong));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理普通时间戳
|
||||||
|
if (typeof dateValue === 'number') {
|
||||||
|
return new Date(dateValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理 ISO 字符串格式
|
||||||
|
if (typeof dateValue === 'string') {
|
||||||
|
const date = new Date(dateValue);
|
||||||
|
return isNaN(date.getTime()) ? null : date;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
};
|
||||||
|
|
||||||
|
// 获取MongoDB和PostgreSQL的连接信息
|
||||||
|
let mongoConfig: MongoConfig;
|
||||||
|
let postgresConfig: PostgresConfig;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
|
||||||
|
if (typeof rawMongoConfig === "string") {
|
||||||
|
try {
|
||||||
|
mongoConfig = JSON.parse(rawMongoConfig);
|
||||||
|
} catch (e) {
|
||||||
|
console.error("MongoDB配置解析失败:", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
mongoConfig = rawMongoConfig as MongoConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 使用getResource获取PostgreSQL资源
|
||||||
|
try {
|
||||||
|
logWithTimestamp("正在获取PostgreSQL资源...");
|
||||||
|
const resourceConfig = await getResource("f/limq/production_supabase");
|
||||||
|
|
||||||
|
// 将resource转换为PostgresConfig
|
||||||
|
postgresConfig = {
|
||||||
|
host: resourceConfig.host || "",
|
||||||
|
port: Number(resourceConfig.port) || 5432,
|
||||||
|
user: resourceConfig.user || "",
|
||||||
|
password: resourceConfig.password || "",
|
||||||
|
database: resourceConfig.database || postgres_database, // 使用提供的数据库名称作为备选
|
||||||
|
schema: resourceConfig.schema || postgres_schema // 使用提供的schema作为备选
|
||||||
|
};
|
||||||
|
|
||||||
|
// 检查并记录配置信息
|
||||||
|
if (!postgresConfig.database || postgresConfig.database === "undefined") {
|
||||||
|
postgresConfig.database = postgres_database;
|
||||||
|
logWithTimestamp(`数据库名称未指定或为"undefined",使用提供的值: ${postgresConfig.database}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!postgresConfig.schema || postgresConfig.schema === "undefined") {
|
||||||
|
postgresConfig.schema = postgres_schema;
|
||||||
|
logWithTimestamp(`Schema未指定或为"undefined",使用提供的值: ${postgresConfig.schema}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
logWithTimestamp(`PostgreSQL配置: 数据库=${postgresConfig.database}, Schema=${postgresConfig.schema}`);
|
||||||
|
} catch (e) {
|
||||||
|
console.error("获取PostgreSQL资源失败:", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log("MongoDB配置:", JSON.stringify({
|
||||||
|
...mongoConfig,
|
||||||
|
password: "****" // 隐藏密码
|
||||||
|
}));
|
||||||
|
console.log("PostgreSQL配置:", JSON.stringify({
|
||||||
|
...postgresConfig,
|
||||||
|
password: "****" // 隐藏密码
|
||||||
|
}));
|
||||||
|
} catch (error) {
|
||||||
|
console.error("获取配置失败:", error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取上次同步状态
|
||||||
|
let lastSyncState: SyncState | null = null;
|
||||||
|
if (!reset_sync_state) {
|
||||||
|
try {
|
||||||
|
const rawSyncState = await getVariable(SYNC_STATE_KEY);
|
||||||
|
if (rawSyncState) {
|
||||||
|
if (typeof rawSyncState === "string") {
|
||||||
|
try {
|
||||||
|
lastSyncState = JSON.parse(rawSyncState);
|
||||||
|
} catch (e) {
|
||||||
|
logWithTimestamp(`解析上次同步状态失败: ${e}, 将从头开始同步`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
lastSyncState = rawSyncState as SyncState;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lastSyncState) {
|
||||||
|
logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`);
|
||||||
|
if (lastSyncState.last_sync_id) {
|
||||||
|
logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logWithTimestamp("没有找到上次同步状态,将从头开始同步");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 构建MongoDB连接URL
|
||||||
|
let mongoUrl = "mongodb://";
|
||||||
|
if (mongoConfig.username && mongoConfig.password) {
|
||||||
|
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
|
||||||
|
}
|
||||||
|
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
|
||||||
|
|
||||||
|
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
|
||||||
|
|
||||||
|
// 构建PostgreSQL连接URL
|
||||||
|
const pgConnectionString = `postgres://${postgresConfig.user}:${postgresConfig.password}@${postgresConfig.host}:${postgresConfig.port}/${postgresConfig.database}`;
|
||||||
|
console.log(`PostgreSQL连接URL: ${pgConnectionString.replace(/:[^:]*@/, ":****@")}`);
|
||||||
|
|
||||||
|
// 连接MongoDB
|
||||||
|
const mongoClient = new MongoClient();
|
||||||
|
let pgClient: Client | null = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await mongoClient.connect(mongoUrl);
|
||||||
|
logWithTimestamp("MongoDB连接成功");
|
||||||
|
|
||||||
|
// 连接PostgreSQL
|
||||||
|
pgClient = new Client(pgConnectionString);
|
||||||
|
await pgClient.connect();
|
||||||
|
logWithTimestamp("PostgreSQL连接成功");
|
||||||
|
|
||||||
|
// 确认PostgreSQL schema存在
|
||||||
|
try {
|
||||||
|
await pgClient.queryArray(`SELECT 1 FROM information_schema.schemata WHERE schema_name = '${postgresConfig.schema}'`);
|
||||||
|
logWithTimestamp(`PostgreSQL schema '${postgresConfig.schema}' 已确认存在`);
|
||||||
|
} catch (error) {
|
||||||
|
logWithTimestamp(`检查PostgreSQL schema失败: ${error}`);
|
||||||
|
throw new Error(`Schema '${postgresConfig.schema}' 可能不存在`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const db = mongoClient.database(mongoConfig.db);
|
||||||
|
const shortCollection = db.collection<ShortRecord>("short");
|
||||||
|
|
||||||
|
// 构建查询条件,根据上次同步状态获取新记录
|
||||||
|
const query: Record<string, unknown> = {};
|
||||||
|
|
||||||
|
// 如果有上次同步状态,则只获取更新的记录
|
||||||
|
if (lastSyncState && lastSyncState.last_sync_time) {
|
||||||
|
// 使用上次同步时间作为过滤条件
|
||||||
|
query.createTime = { $gt: lastSyncState.last_sync_time };
|
||||||
|
logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 计算总记录数
|
||||||
|
const totalRecords = await shortCollection.countDocuments(query);
|
||||||
|
logWithTimestamp(`找到 ${totalRecords} 条新记录需要同步`);
|
||||||
|
|
||||||
|
// 限制此次处理的记录数量
|
||||||
|
const recordsToProcess = Math.min(totalRecords, max_records);
|
||||||
|
logWithTimestamp(`本次将处理 ${recordsToProcess} 条记录`);
|
||||||
|
|
||||||
|
if (totalRecords === 0) {
|
||||||
|
logWithTimestamp("没有新记录需要同步,任务完成");
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
records_synced: 0,
|
||||||
|
message: "没有新记录需要同步"
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查记录是否已经存在于PostgreSQL中
|
||||||
|
const checkExistingRecords = async (records: ShortRecord[]): Promise<ShortRecord[]> => {
|
||||||
|
if (records.length === 0) return [];
|
||||||
|
|
||||||
|
// 如果跳过重复检查或强制插入,则直接返回所有记录
|
||||||
|
if (skip_duplicate_check || force_insert) {
|
||||||
|
logWithTimestamp(`已跳过重复检查,准备处理所有 ${records.length} 条记录`);
|
||||||
|
return records;
|
||||||
|
}
|
||||||
|
|
||||||
|
logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于PostgreSQL中...`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 提取所有记录的slugs
|
||||||
|
const slugs = records.map(record => record.slug);
|
||||||
|
|
||||||
|
// 查询PostgreSQL中是否已存在这些slugs
|
||||||
|
const result = await pgClient!.queryArray(`
|
||||||
|
SELECT slug FROM ${postgresConfig.schema}.shorturl
|
||||||
|
WHERE slug = ANY($1::text[])
|
||||||
|
`, [slugs]);
|
||||||
|
|
||||||
|
// 将已存在的slugs加入到集合中
|
||||||
|
const existingSlugs = new Set<string>();
|
||||||
|
for (const row of result.rows) {
|
||||||
|
existingSlugs.add(row[0] as string);
|
||||||
|
}
|
||||||
|
|
||||||
|
logWithTimestamp(`检测到 ${existingSlugs.size} 条记录已存在于PostgreSQL中`);
|
||||||
|
|
||||||
|
// 过滤出不存在的记录
|
||||||
|
const newRecords = records.filter(record => !existingSlugs.has(record.slug));
|
||||||
|
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
|
||||||
|
|
||||||
|
return newRecords;
|
||||||
|
} catch (err) {
|
||||||
|
const error = err as Error;
|
||||||
|
logWithTimestamp(`PostgreSQL查询出错: ${error.message}`);
|
||||||
|
if (skip_duplicate_check) {
|
||||||
|
logWithTimestamp("已启用跳过重复检查,将继续处理所有记录");
|
||||||
|
return records;
|
||||||
|
} else {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 处理记录的函数
|
||||||
|
const processRecords = async (records: ShortRecord[]) => {
|
||||||
|
if (records.length === 0) return 0;
|
||||||
|
|
||||||
|
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`);
|
||||||
|
|
||||||
|
// 检查记录是否已存在
|
||||||
|
let newRecords;
|
||||||
|
try {
|
||||||
|
newRecords = await checkExistingRecords(records);
|
||||||
|
} catch (err) {
|
||||||
|
const error = err as Error;
|
||||||
|
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
|
||||||
|
if (!skip_duplicate_check && !force_insert) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
// 如果跳过检查或强制插入,则使用所有记录
|
||||||
|
logWithTimestamp("将使用所有记录进行处理");
|
||||||
|
newRecords = records;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newRecords.length === 0) {
|
||||||
|
logWithTimestamp("所有记录都已存在,跳过处理");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`);
|
||||||
|
|
||||||
|
// 批量插入PostgreSQL
|
||||||
|
try {
|
||||||
|
// 开始事务
|
||||||
|
await pgClient!.queryArray('BEGIN');
|
||||||
|
|
||||||
|
let insertedCount = 0;
|
||||||
|
|
||||||
|
// 由于参数可能很多,按小批次处理
|
||||||
|
const smallBatchSize = 100;
|
||||||
|
for (let i = 0; i < newRecords.length; i += smallBatchSize) {
|
||||||
|
const batchRecords = newRecords.slice(i, i + smallBatchSize);
|
||||||
|
|
||||||
|
// 构造批量插入语句
|
||||||
|
const placeholders = [];
|
||||||
|
const values = [];
|
||||||
|
let valueIndex = 1;
|
||||||
|
|
||||||
|
for (const record of batchRecords) {
|
||||||
|
// 参考提供的字段处理方式处理数据
|
||||||
|
const createdAt = parseDate(record.createTime);
|
||||||
|
const updatedAt = createdAt; // 设置更新时间等于创建时间
|
||||||
|
const fullShortUrl = `${domain}/${record.slug}`;
|
||||||
|
|
||||||
|
placeholders.push(`($${valueIndex}, $${valueIndex+1}, $${valueIndex+2}, $${valueIndex+3}, $${valueIndex+4}, $${valueIndex+5}, $${valueIndex+6}, $${valueIndex+7}, $${valueIndex+8}, $${valueIndex+9}, $${valueIndex+10}, $${valueIndex+11}, $${valueIndex+12})`);
|
||||||
|
|
||||||
|
values.push(
|
||||||
|
record._id.toString(), // id
|
||||||
|
record.slug, // slug
|
||||||
|
domain, // domain (使用提供的域名)
|
||||||
|
record.slug, // name (使用slug作为name)
|
||||||
|
record.slug, // title (使用slug作为title)
|
||||||
|
record.origin || '', // origin
|
||||||
|
createdAt, // created_at
|
||||||
|
updatedAt, // updated_at
|
||||||
|
fullShortUrl, // full_short_url
|
||||||
|
record.image || null, // image
|
||||||
|
record.description || null, // description
|
||||||
|
record.expiredUrl || null, // expired_url
|
||||||
|
parseDate(record.expiredAt) // expired_at
|
||||||
|
);
|
||||||
|
|
||||||
|
valueIndex += 13;
|
||||||
|
}
|
||||||
|
|
||||||
|
const query = `
|
||||||
|
INSERT INTO ${postgresConfig.schema}.shorturl
|
||||||
|
(id, slug, domain, name, title, origin, created_at, updated_at, full_short_url, image, description, expired_url, expired_at)
|
||||||
|
VALUES ${placeholders.join(', ')}
|
||||||
|
`;
|
||||||
|
|
||||||
|
await pgClient!.queryArray(query, values);
|
||||||
|
insertedCount += batchRecords.length;
|
||||||
|
logWithTimestamp(`已插入 ${insertedCount}/${newRecords.length} 条记录`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 提交事务
|
||||||
|
await pgClient!.queryArray('COMMIT');
|
||||||
|
|
||||||
|
logWithTimestamp(`成功插入 ${insertedCount} 条记录到PostgreSQL`);
|
||||||
|
return insertedCount;
|
||||||
|
} catch (err) {
|
||||||
|
const error = err as Error;
|
||||||
|
// 发生错误,回滚事务
|
||||||
|
await pgClient!.queryArray('ROLLBACK');
|
||||||
|
logWithTimestamp(`向PostgreSQL插入数据失败: ${error.message}`);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 批量处理记录
|
||||||
|
let processedRecords = 0;
|
||||||
|
let totalBatchRecords = 0;
|
||||||
|
let lastSyncTime = 0;
|
||||||
|
let lastSyncId = "";
|
||||||
|
|
||||||
|
for (let page = 0; processedRecords < recordsToProcess; page++) {
|
||||||
|
// 检查超时
|
||||||
|
if (checkTimeout()) {
|
||||||
|
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 每批次都输出进度
|
||||||
|
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
|
||||||
|
|
||||||
|
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
|
||||||
|
const records = await shortCollection.find(
|
||||||
|
query,
|
||||||
|
{
|
||||||
|
sort: { createTime: 1 },
|
||||||
|
skip: page * batch_size,
|
||||||
|
limit: batch_size
|
||||||
|
}
|
||||||
|
).toArray();
|
||||||
|
|
||||||
|
if (records.length === 0) {
|
||||||
|
logWithTimestamp("没有找到更多数据,同步结束");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 找到数据,开始处理
|
||||||
|
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
|
||||||
|
|
||||||
|
// 输出当前批次的部分数据信息
|
||||||
|
if (records.length > 0) {
|
||||||
|
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, slug=${records[0].slug}, 时间=${new Date(typeof records[0].createTime === 'number' ? records[0].createTime : 0).toISOString()}`);
|
||||||
|
if (records.length > 1) {
|
||||||
|
const lastRec = records[records.length-1];
|
||||||
|
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${lastRec._id}, slug=${lastRec.slug}, 时间=${new Date(typeof lastRec.createTime === 'number' ? lastRec.createTime : 0).toISOString()}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const batchSize = await processRecords(records);
|
||||||
|
processedRecords += records.length;
|
||||||
|
totalBatchRecords += batchSize;
|
||||||
|
|
||||||
|
// 更新最后处理的记录时间和ID
|
||||||
|
if (records.length > 0) {
|
||||||
|
const lastRecord = records[records.length - 1];
|
||||||
|
// 提取数字时间戳
|
||||||
|
let lastCreateTime = 0;
|
||||||
|
if (typeof lastRecord.createTime === 'number') {
|
||||||
|
lastCreateTime = lastRecord.createTime;
|
||||||
|
} else if (lastRecord.createTime && lastRecord.createTime.$numberLong) {
|
||||||
|
lastCreateTime = Number(lastRecord.createTime.$numberLong);
|
||||||
|
}
|
||||||
|
|
||||||
|
lastSyncTime = Math.max(lastSyncTime, lastCreateTime);
|
||||||
|
lastSyncId = lastRecord._id.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 更新同步状态
|
||||||
|
if (processedRecords > 0 && lastSyncTime > 0) {
|
||||||
|
// 创建新的同步状态
|
||||||
|
const newSyncState: SyncState = {
|
||||||
|
last_sync_time: lastSyncTime,
|
||||||
|
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords,
|
||||||
|
last_sync_id: lastSyncId
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 保存同步状态
|
||||||
|
await setVariable(SYNC_STATE_KEY, newSyncState);
|
||||||
|
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
|
||||||
|
} catch (err) {
|
||||||
|
const error = err as Error;
|
||||||
|
logWithTimestamp(`更新同步状态失败: ${error.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
records_processed: processedRecords,
|
||||||
|
records_synced: totalBatchRecords,
|
||||||
|
last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null,
|
||||||
|
message: "数据同步完成"
|
||||||
|
};
|
||||||
|
} catch (err) {
|
||||||
|
const error = err as Error;
|
||||||
|
console.error("同步过程中发生错误:", error);
|
||||||
|
return {
|
||||||
|
success: false,
|
||||||
|
error: error.message,
|
||||||
|
stack: error.stack
|
||||||
|
};
|
||||||
|
} finally {
|
||||||
|
// 关闭连接
|
||||||
|
if (pgClient) {
|
||||||
|
await pgClient.end();
|
||||||
|
logWithTimestamp("PostgreSQL连接已关闭");
|
||||||
|
}
|
||||||
|
await mongoClient.close();
|
||||||
|
logWithTimestamp("MongoDB连接已关闭");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -39,13 +39,17 @@ interface SyncState {
|
|||||||
last_sync_id?: string;
|
last_sync_id?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 同步状态键名
|
||||||
|
const SYNC_STATE_KEY = "f/shorturl_analytics/mongo_sync_state";
|
||||||
|
|
||||||
export async function main(
|
export async function main(
|
||||||
batch_size = 1000,
|
batch_size = 1000,
|
||||||
max_records = 9999999,
|
max_records = 9999999,
|
||||||
timeout_minutes = 60,
|
timeout_minutes = 60,
|
||||||
skip_clickhouse_check = false,
|
skip_clickhouse_check = false,
|
||||||
force_insert = false,
|
force_insert = false,
|
||||||
database_override = "shorturl_analytics" // 添加数据库名称参数,默认为shorturl_analytics
|
database_override = "shorturl_analytics", // 添加数据库名称参数,默认为shorturl_analytics
|
||||||
|
reset_sync_state = false // 添加参数用于重置同步状态
|
||||||
) {
|
) {
|
||||||
const logWithTimestamp = (message: string) => {
|
const logWithTimestamp = (message: string) => {
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
@@ -60,6 +64,9 @@ export async function main(
|
|||||||
if (force_insert) {
|
if (force_insert) {
|
||||||
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
|
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
|
||||||
}
|
}
|
||||||
|
if (reset_sync_state) {
|
||||||
|
logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据");
|
||||||
|
}
|
||||||
|
|
||||||
// 设置超时
|
// 设置超时
|
||||||
const startTime = Date.now();
|
const startTime = Date.now();
|
||||||
@@ -127,6 +134,36 @@ export async function main(
|
|||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 获取上次同步状态
|
||||||
|
let lastSyncState: SyncState | null = null;
|
||||||
|
if (!reset_sync_state) {
|
||||||
|
try {
|
||||||
|
const rawSyncState = await getVariable(SYNC_STATE_KEY);
|
||||||
|
if (rawSyncState) {
|
||||||
|
if (typeof rawSyncState === "string") {
|
||||||
|
try {
|
||||||
|
lastSyncState = JSON.parse(rawSyncState);
|
||||||
|
} catch (e) {
|
||||||
|
logWithTimestamp(`解析上次同步状态失败: ${e}, 将从头开始同步`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
lastSyncState = rawSyncState as SyncState;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lastSyncState) {
|
||||||
|
logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`);
|
||||||
|
if (lastSyncState.last_sync_id) {
|
||||||
|
logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logWithTimestamp("没有找到上次同步状态,将从头开始同步");
|
||||||
|
}
|
||||||
|
|
||||||
// 构建MongoDB连接URL
|
// 构建MongoDB连接URL
|
||||||
let mongoUrl = "mongodb://";
|
let mongoUrl = "mongodb://";
|
||||||
if (mongoConfig.username && mongoConfig.password) {
|
if (mongoConfig.username && mongoConfig.password) {
|
||||||
@@ -145,25 +182,32 @@ export async function main(
|
|||||||
const db = client.database(mongoConfig.db);
|
const db = client.database(mongoConfig.db);
|
||||||
const traceCollection = db.collection<TraceRecord>("trace");
|
const traceCollection = db.collection<TraceRecord>("trace");
|
||||||
|
|
||||||
// 构建查询条件,获取所有记录
|
// 构建查询条件,根据上次同步状态获取新记录
|
||||||
const query: Record<string, unknown> = {
|
const query: Record<string, unknown> = {
|
||||||
type: 1 // 只同步type为1的记录
|
type: 1 // 只同步type为1的记录
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// 如果有上次同步状态,则只获取更新的记录
|
||||||
|
if (lastSyncState && lastSyncState.last_sync_time) {
|
||||||
|
// 使用上次同步时间作为过滤条件
|
||||||
|
query.createTime = { $gt: lastSyncState.last_sync_time };
|
||||||
|
logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`);
|
||||||
|
}
|
||||||
|
|
||||||
// 计算总记录数
|
// 计算总记录数
|
||||||
const totalRecords = await traceCollection.countDocuments(query);
|
const totalRecords = await traceCollection.countDocuments(query);
|
||||||
console.log(`找到 ${totalRecords} 条记录需要同步`);
|
console.log(`找到 ${totalRecords} 条新记录需要同步`);
|
||||||
|
|
||||||
// 限制此次处理的记录数量
|
// 限制此次处理的记录数量
|
||||||
const recordsToProcess = Math.min(totalRecords, max_records);
|
const recordsToProcess = Math.min(totalRecords, max_records);
|
||||||
console.log(`本次将处理 ${recordsToProcess} 条记录`);
|
console.log(`本次将处理 ${recordsToProcess} 条记录`);
|
||||||
|
|
||||||
if (totalRecords === 0) {
|
if (totalRecords === 0) {
|
||||||
console.log("没有记录需要同步,任务完成");
|
console.log("没有新记录需要同步,任务完成");
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
records_synced: 0,
|
records_synced: 0,
|
||||||
message: "没有记录需要同步"
|
message: "没有新记录需要同步"
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -464,6 +508,8 @@ export async function main(
|
|||||||
// 批量处理记录
|
// 批量处理记录
|
||||||
let processedRecords = 0;
|
let processedRecords = 0;
|
||||||
let totalBatchRecords = 0;
|
let totalBatchRecords = 0;
|
||||||
|
let lastSyncTime = 0;
|
||||||
|
let lastSyncId = "";
|
||||||
|
|
||||||
for (let page = 0; processedRecords < recordsToProcess; page++) {
|
for (let page = 0; processedRecords < recordsToProcess; page++) {
|
||||||
// 检查超时
|
// 检查超时
|
||||||
@@ -505,13 +551,40 @@ export async function main(
|
|||||||
processedRecords += records.length;
|
processedRecords += records.length;
|
||||||
totalBatchRecords += batchSize;
|
totalBatchRecords += batchSize;
|
||||||
|
|
||||||
|
// 更新最后处理的记录时间和ID
|
||||||
|
if (records.length > 0) {
|
||||||
|
const lastRecord = records[records.length - 1];
|
||||||
|
lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime);
|
||||||
|
lastSyncId = lastRecord._id.toString();
|
||||||
|
}
|
||||||
|
|
||||||
logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
|
logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 更新同步状态
|
||||||
|
if (processedRecords > 0 && lastSyncTime > 0) {
|
||||||
|
// 创建新的同步状态
|
||||||
|
const newSyncState: SyncState = {
|
||||||
|
last_sync_time: lastSyncTime,
|
||||||
|
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords,
|
||||||
|
last_sync_id: lastSyncId
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 保存同步状态
|
||||||
|
await setVariable(SYNC_STATE_KEY, newSyncState);
|
||||||
|
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
|
||||||
|
} catch (err) {
|
||||||
|
const error = err as Error;
|
||||||
|
logWithTimestamp(`更新同步状态失败: ${error.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
records_processed: processedRecords,
|
records_processed: processedRecords,
|
||||||
records_synced: totalBatchRecords,
|
records_synced: totalBatchRecords,
|
||||||
|
last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null,
|
||||||
message: "数据同步完成"
|
message: "数据同步完成"
|
||||||
};
|
};
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|||||||
@@ -3,7 +3,17 @@
|
|||||||
// 创建日期: 2023-11-21
|
// 创建日期: 2023-11-21
|
||||||
|
|
||||||
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
|
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
|
||||||
import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
|
import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
|
||||||
|
|
||||||
|
// 同步状态接口
|
||||||
|
interface SyncState {
|
||||||
|
last_sync_time: string; // 上次同步的结束时间
|
||||||
|
records_synced: number; // 累计同步的记录数
|
||||||
|
last_run: string; // 上次运行的时间
|
||||||
|
}
|
||||||
|
|
||||||
|
// 同步状态键名
|
||||||
|
const SYNC_STATE_KEY = "f/shorturl_analytics/shorturl_to_clickhouse_state";
|
||||||
|
|
||||||
// PostgreSQL配置接口
|
// PostgreSQL配置接口
|
||||||
interface PgConfig {
|
interface PgConfig {
|
||||||
@@ -42,24 +52,15 @@ interface ShortUrlData {
|
|||||||
* 同步PostgreSQL short_url.shorturl表数据到ClickHouse
|
* 同步PostgreSQL short_url.shorturl表数据到ClickHouse
|
||||||
*/
|
*/
|
||||||
export async function main(
|
export async function main(
|
||||||
params: {
|
|
||||||
/** 同步数据的开始时间(ISO 8601格式)。默认为1小时前 */
|
|
||||||
start_time?: string;
|
|
||||||
/** 同步数据的结束时间(ISO 8601格式)。默认为当前时间 */
|
|
||||||
end_time?: string;
|
|
||||||
/** 是否为测试模式(不执行实际更新) */
|
/** 是否为测试模式(不执行实际更新) */
|
||||||
dry_run?: boolean;
|
dry_run = false,
|
||||||
/** 是否显示详细日志 */
|
/** 是否显示详细日志 */
|
||||||
verbose?: boolean;
|
verbose = false,
|
||||||
}
|
/** 是否重置同步状态(从头开始同步) */
|
||||||
|
reset_sync_state = false,
|
||||||
|
/** 如果没有同步状态,往前查询多少小时的数据(默认1小时) */
|
||||||
|
default_hours_back = 1
|
||||||
) {
|
) {
|
||||||
// 设置默认参数
|
|
||||||
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000).toISOString();
|
|
||||||
const start_time = params.start_time || oneHourAgo;
|
|
||||||
const end_time = params.end_time || new Date().toISOString();
|
|
||||||
const dry_run = params.dry_run || false;
|
|
||||||
const verbose = params.verbose || false;
|
|
||||||
|
|
||||||
// 初始化日志函数
|
// 初始化日志函数
|
||||||
const log = (message: string, isVerbose = false) => {
|
const log = (message: string, isVerbose = false) => {
|
||||||
if (!isVerbose || verbose) {
|
if (!isVerbose || verbose) {
|
||||||
@@ -67,6 +68,33 @@ export async function main(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// 获取同步状态
|
||||||
|
let syncState: SyncState | null = null;
|
||||||
|
if (!reset_sync_state) {
|
||||||
|
try {
|
||||||
|
log("获取同步状态...", true);
|
||||||
|
const rawState = await getVariable(SYNC_STATE_KEY);
|
||||||
|
if (rawState) {
|
||||||
|
if (typeof rawState === "string") {
|
||||||
|
syncState = JSON.parse(rawState);
|
||||||
|
} else {
|
||||||
|
syncState = rawState as SyncState;
|
||||||
|
}
|
||||||
|
log(`找到上次同步状态: 最后同步时间 ${syncState.last_sync_time}, 已同步记录数 ${syncState.records_synced}`, true);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
log(`获取同步状态失败: ${error}, 将使用默认设置`, true);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log("重置同步状态,从头开始同步", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置时间范围
|
||||||
|
const oneHourAgo = new Date(Date.now() - default_hours_back * 60 * 60 * 1000).toISOString();
|
||||||
|
// 如果有同步状态,使用上次同步时间作为开始时间;否则使用默认时间
|
||||||
|
const start_time = syncState ? syncState.last_sync_time : oneHourAgo;
|
||||||
|
const end_time = new Date().toISOString();
|
||||||
|
|
||||||
log(`开始同步shorturl表数据: ${start_time} 至 ${end_time}`);
|
log(`开始同步shorturl表数据: ${start_time} 至 ${end_time}`);
|
||||||
|
|
||||||
let pgPool: Pool | null = null;
|
let pgPool: Pool | null = null;
|
||||||
@@ -90,6 +118,10 @@ export async function main(
|
|||||||
log(`成功获取 ${shorturlData.length} 条shorturl数据`);
|
log(`成功获取 ${shorturlData.length} 条shorturl数据`);
|
||||||
|
|
||||||
if (shorturlData.length === 0) {
|
if (shorturlData.length === 0) {
|
||||||
|
// 更新同步状态,即使没有新数据
|
||||||
|
if (!dry_run) {
|
||||||
|
await updateSyncState(end_time, syncState ? syncState.records_synced : 0, log);
|
||||||
|
}
|
||||||
return { success: true, message: "没有找到需要更新的数据", updated: 0 };
|
return { success: true, message: "没有找到需要更新的数据", updated: 0 };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,10 +132,19 @@ export async function main(
|
|||||||
if (!dry_run) {
|
if (!dry_run) {
|
||||||
const shorturlUpdated = await updateClickHouseShortUrl(shorturlData, chConfig, log);
|
const shorturlUpdated = await updateClickHouseShortUrl(shorturlData, chConfig, log);
|
||||||
|
|
||||||
|
// 更新同步状态
|
||||||
|
const totalSynced = (syncState ? syncState.records_synced : 0) + shorturlUpdated;
|
||||||
|
await updateSyncState(end_time, totalSynced, log);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
message: "shorturl表数据同步完成",
|
message: "shorturl表数据同步完成",
|
||||||
shorturl_updated: shorturlUpdated
|
shorturl_updated: shorturlUpdated,
|
||||||
|
total_synced: totalSynced,
|
||||||
|
sync_state: {
|
||||||
|
last_sync_time: end_time,
|
||||||
|
records_synced: totalSynced
|
||||||
|
}
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
log("测试模式: 不执行实际更新");
|
log("测试模式: 不执行实际更新");
|
||||||
@@ -129,6 +170,25 @@ export async function main(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新同步状态
|
||||||
|
*/
|
||||||
|
async function updateSyncState(lastSyncTime: string, recordsSynced: number, log: (message: string, isVerbose?: boolean) => void): Promise<void> {
|
||||||
|
try {
|
||||||
|
const newState: SyncState = {
|
||||||
|
last_sync_time: lastSyncTime,
|
||||||
|
records_synced: recordsSynced,
|
||||||
|
last_run: new Date().toISOString()
|
||||||
|
};
|
||||||
|
|
||||||
|
await setVariable(SYNC_STATE_KEY, newState);
|
||||||
|
log(`同步状态已更新: 最后同步时间 ${lastSyncTime}, 累计同步记录数 ${recordsSynced}`, true);
|
||||||
|
} catch (error) {
|
||||||
|
log(`更新同步状态失败: ${error}`, true);
|
||||||
|
// 继续执行,不中断同步过程
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 从PostgreSQL获取shorturl数据
|
* 从PostgreSQL获取shorturl数据
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user