diff --git a/app/analytics/page.tsx b/app/analytics/page.tsx index e791a51..8315169 100644 --- a/app/analytics/page.tsx +++ b/app/analytics/page.tsx @@ -306,6 +306,8 @@ function AnalyticsContent() { const [geoData, setGeoData] = useState([]); const [deviceData, setDeviceData] = useState(null); const [events, setEvents] = useState([]); + const [isRefreshing, setIsRefreshing] = useState(false); // New state to track auto-refresh + const [lastRefreshed, setLastRefreshed] = useState(null); // Track when data was last refreshed // 添加 Snackbar 状态 const [isSnackbarOpen, setIsSnackbarOpen] = useState(false); @@ -449,12 +451,133 @@ function AnalyticsContent() { setError(err instanceof Error ? err.message : 'An error occurred while fetching data'); } finally { setLoading(false); + setIsRefreshing(false); // Reset refreshing state + setLastRefreshed(new Date()); // Update last refreshed timestamp } }; fetchData(); }, [dateRange, selectedTeamIds, selectedProjectIds, selectedTagNames, selectedSubpath, currentPage, pageSize, selectedShortUrl, shouldFetchData]); + // Add auto-refresh functionality + useEffect(() => { + if (!shouldFetchData) return; // Don't set up refresh until initial data load is triggered + + // Function to trigger a refresh of data + const refreshData = () => { + console.log('Auto-refreshing analytics data...'); + // Only refresh if not already loading or refreshing + if (!loading && !isRefreshing) { + setIsRefreshing(true); + + // Create a new fetch function instead of reusing the effect's fetchData + const fetchRefreshedData = async () => { + try { + const startTime = format(dateRange.from, "yyyy-MM-dd'T'HH:mm:ss'Z'"); + const endTime = format(dateRange.to, "yyyy-MM-dd'T'HH:mm:ss'Z'"); + + // 构建基础URL和查询参数 + const baseUrl = '/api/events'; + const params = new URLSearchParams({ + startTime, + endTime, + page: currentPage.toString(), + pageSize: pageSize.toString() + }); + + // Duplicate the parameters logic from the main fetch effect + if (selectedShortUrl && selectedShortUrl.externalId) { + params.append('linkId', selectedShortUrl.externalId); + } else { + const savedExternalId = sessionStorage.getItem('current_shorturl_external_id'); + if (savedExternalId) { + params.append('linkId', savedExternalId); + } + } + + if (selectedTeamIds.length > 0) { + selectedTeamIds.forEach(teamId => { + params.append('teamId', teamId); + }); + } + + if (selectedProjectIds.length > 0) { + selectedProjectIds.forEach(projectId => { + params.append('projectId', projectId); + }); + } + + if (selectedTagNames.length > 0) { + selectedTagNames.forEach(tagName => { + params.append('tagName', tagName); + }); + } + + if (selectedSubpath) { + params.append('subpath', selectedSubpath); + } + + // Build all URLs with the same parameters + const summaryUrl = `${baseUrl}/summary?${params.toString()}`; + const timeSeriesUrl = `${baseUrl}/time-series?${params.toString()}`; + const geoUrl = `${baseUrl}/geo?${params.toString()}`; + const devicesUrl = `${baseUrl}/devices?${params.toString()}`; + const eventsUrl = `${baseUrl}?${params.toString()}`; + + // Parallel requests for all data + const [summaryRes, timeSeriesRes, geoRes, deviceRes, eventsRes] = await Promise.all([ + fetch(summaryUrl), + fetch(timeSeriesUrl), + fetch(geoUrl), + fetch(devicesUrl), + fetch(eventsUrl) + ]); + + const [summaryData, timeSeriesData, geoData, deviceData, eventsData] = await Promise.all([ + summaryRes.json(), + timeSeriesRes.json(), + geoRes.json(), + deviceRes.json(), + eventsRes.json() + ]); + + // Update state with fresh data + if (summaryRes.ok) setSummary(summaryData.data); + if (timeSeriesRes.ok) setTimeSeriesData(timeSeriesData.data); + if (geoRes.ok) setGeoData(geoData.data); + if (deviceRes.ok) setDeviceData(deviceData.data); + if (eventsRes.ok) { + setEvents(eventsData.data || []); + // Update pagination info + if (eventsData.meta) { + const totalCount = parseInt(String(eventsData.meta.total), 10); + if (!isNaN(totalCount)) { + setTotalEvents(totalCount); + } + } + } + } catch (err) { + console.error('Auto-refresh error:', err); + // Don't show errors during auto-refresh to avoid disrupting the UI + } finally { + setIsRefreshing(false); + setLastRefreshed(new Date()); // Update last refreshed timestamp + } + }; + + fetchRefreshedData(); + } + }; + + // Set up the interval for auto-refresh every 30 seconds + const intervalId = setInterval(refreshData, 30000); + + // Clean up the interval when the component unmounts + return () => { + clearInterval(intervalId); + }; + }, [shouldFetchData, loading, isRefreshing, dateRange, selectedTeamIds, selectedProjectIds, selectedTagNames, selectedSubpath, currentPage, pageSize, selectedShortUrl]); + // Function to clear the shorturl filter const handleClearShortUrlFilter = () => { // 先清除 store 中的数据 @@ -499,7 +622,7 @@ function AnalyticsContent() { setCurrentPage(1); }; - if (loading) { + if (loading && !isRefreshing) { return (
@@ -532,8 +655,24 @@ function AnalyticsContent() { )}
-

Analytics Dashboard

+
+

Analytics Dashboard

+ {lastRefreshed && ( +
+ Last updated: {format(lastRefreshed, 'MMM d, yyyy HH:mm:ss')} + {isRefreshing ? ' · Refreshing...' : ' · Auto-refreshes every 30 seconds'} +
+ )} +
+ {/* Show refresh indicator */} + {isRefreshing && ( +
+
+ Refreshing data... +
+ )} + {/* 如果有选定的 shorturl,可以显示一个提示,显示更多详细信息 */} {selectedShortUrl && (
diff --git a/windmill/sync_mongo_short_to_postgres_short_url_shorturl.ts b/windmill/sync_mongo_short_to_postgres_short_url_shorturl.ts new file mode 100644 index 0000000..67b797d --- /dev/null +++ b/windmill/sync_mongo_short_to_postgres_short_url_shorturl.ts @@ -0,0 +1,522 @@ +// 从MongoDB的main.short表同步数据到PostgreSQL的short_url.shorturl表 +import { getVariable, setVariable, getResource } from "npm:windmill-client@1"; +import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts"; +import { Client } from "https://deno.land/x/postgres@v0.17.0/mod.ts"; + +interface MongoConfig { + host: string; + port: string; + db: string; + username: string; + password: string; +} + +interface PostgresConfig { + host: string; + port: number; + database: string; + user: string; + password: string; + schema: string; +} + +// 扩展ShortRecord接口以包含更多可能的字段 +interface ShortRecord { + _id: ObjectId; + origin: string; + slug: string; + domain: string | null; + createTime: number | { $numberLong: string } | string; + // 可选字段 + expiredAt?: number | { $numberLong: string } | string | null; + expiredUrl?: string | null; + password?: string | null; + image?: string | null; + title?: string | null; + description?: string | null; +} + +interface SyncState { + last_sync_time: number; + records_synced: number; + last_sync_id?: string; +} + +// 同步状态键名 +const SYNC_STATE_KEY = "f/limq/mongo_short_to_postgres_shorturl_shorturl_state"; + +export async function main( + batch_size = 1000, + max_records = 9999999, + timeout_minutes = 60, + skip_duplicate_check = false, + force_insert = false, + reset_sync_state = false, + postgres_schema = "short_url", // 添加schema参数,允许运行时指定 + postgres_database = "postgres", // 添加数据库名称参数,默认为postgres + domain = "upj.to" // 添加domain参数,允许用户指定域名 +) { + const logWithTimestamp = (message: string) => { + const now = new Date(); + console.log(`[${now.toISOString()}] ${message}`); + }; + + logWithTimestamp("开始执行MongoDB到PostgreSQL的同步任务"); + logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`); + logWithTimestamp(`使用域名: ${domain}`); + if (skip_duplicate_check) { + logWithTimestamp("⚠️ 警告: 已启用跳过重复检查模式,不会检查记录是否已存在"); + } + if (force_insert) { + logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); + } + if (reset_sync_state) { + logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据"); + } + + // 设置超时 + const startTime = Date.now(); + const timeoutMs = timeout_minutes * 60 * 1000; + + // 检查是否超时 + const checkTimeout = () => { + if (Date.now() - startTime > timeoutMs) { + logWithTimestamp(`运行时间超过${timeout_minutes}分钟,暂停执行`); + return true; + } + return false; + }; + + // 日期解析函数,处理不同格式的日期 + const parseDate = (dateValue: any): Date | null => { + if (!dateValue) return null; + + // 处理 MongoDB $numberLong 格式 + if (dateValue.$numberLong) { + return new Date(Number(dateValue.$numberLong)); + } + + // 处理普通时间戳 + if (typeof dateValue === 'number') { + return new Date(dateValue); + } + + // 处理 ISO 字符串格式 + if (typeof dateValue === 'string') { + const date = new Date(dateValue); + return isNaN(date.getTime()) ? null : date; + } + + return null; + }; + + // 获取MongoDB和PostgreSQL的连接信息 + let mongoConfig: MongoConfig; + let postgresConfig: PostgresConfig; + + try { + const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb"); + if (typeof rawMongoConfig === "string") { + try { + mongoConfig = JSON.parse(rawMongoConfig); + } catch (e) { + console.error("MongoDB配置解析失败:", e); + throw e; + } + } else { + mongoConfig = rawMongoConfig as MongoConfig; + } + + // 使用getResource获取PostgreSQL资源 + try { + logWithTimestamp("正在获取PostgreSQL资源..."); + const resourceConfig = await getResource("f/limq/production_supabase"); + + // 将resource转换为PostgresConfig + postgresConfig = { + host: resourceConfig.host || "", + port: Number(resourceConfig.port) || 5432, + user: resourceConfig.user || "", + password: resourceConfig.password || "", + database: resourceConfig.database || postgres_database, // 使用提供的数据库名称作为备选 + schema: resourceConfig.schema || postgres_schema // 使用提供的schema作为备选 + }; + + // 检查并记录配置信息 + if (!postgresConfig.database || postgresConfig.database === "undefined") { + postgresConfig.database = postgres_database; + logWithTimestamp(`数据库名称未指定或为"undefined",使用提供的值: ${postgresConfig.database}`); + } + + if (!postgresConfig.schema || postgresConfig.schema === "undefined") { + postgresConfig.schema = postgres_schema; + logWithTimestamp(`Schema未指定或为"undefined",使用提供的值: ${postgresConfig.schema}`); + } + + logWithTimestamp(`PostgreSQL配置: 数据库=${postgresConfig.database}, Schema=${postgresConfig.schema}`); + } catch (e) { + console.error("获取PostgreSQL资源失败:", e); + throw e; + } + + console.log("MongoDB配置:", JSON.stringify({ + ...mongoConfig, + password: "****" // 隐藏密码 + })); + console.log("PostgreSQL配置:", JSON.stringify({ + ...postgresConfig, + password: "****" // 隐藏密码 + })); + } catch (error) { + console.error("获取配置失败:", error); + throw error; + } + + // 获取上次同步状态 + let lastSyncState: SyncState | null = null; + if (!reset_sync_state) { + try { + const rawSyncState = await getVariable(SYNC_STATE_KEY); + if (rawSyncState) { + if (typeof rawSyncState === "string") { + try { + lastSyncState = JSON.parse(rawSyncState); + } catch (e) { + logWithTimestamp(`解析上次同步状态失败: ${e}, 将从头开始同步`); + } + } else { + lastSyncState = rawSyncState as SyncState; + } + } + } catch (error) { + logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`); + } + } + + if (lastSyncState) { + logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`); + if (lastSyncState.last_sync_id) { + logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`); + } + } else { + logWithTimestamp("没有找到上次同步状态,将从头开始同步"); + } + + // 构建MongoDB连接URL + let mongoUrl = "mongodb://"; + if (mongoConfig.username && mongoConfig.password) { + mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`; + } + mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`; + + console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`); + + // 构建PostgreSQL连接URL + const pgConnectionString = `postgres://${postgresConfig.user}:${postgresConfig.password}@${postgresConfig.host}:${postgresConfig.port}/${postgresConfig.database}`; + console.log(`PostgreSQL连接URL: ${pgConnectionString.replace(/:[^:]*@/, ":****@")}`); + + // 连接MongoDB + const mongoClient = new MongoClient(); + let pgClient: Client | null = null; + + try { + await mongoClient.connect(mongoUrl); + logWithTimestamp("MongoDB连接成功"); + + // 连接PostgreSQL + pgClient = new Client(pgConnectionString); + await pgClient.connect(); + logWithTimestamp("PostgreSQL连接成功"); + + // 确认PostgreSQL schema存在 + try { + await pgClient.queryArray(`SELECT 1 FROM information_schema.schemata WHERE schema_name = '${postgresConfig.schema}'`); + logWithTimestamp(`PostgreSQL schema '${postgresConfig.schema}' 已确认存在`); + } catch (error) { + logWithTimestamp(`检查PostgreSQL schema失败: ${error}`); + throw new Error(`Schema '${postgresConfig.schema}' 可能不存在`); + } + + const db = mongoClient.database(mongoConfig.db); + const shortCollection = db.collection("short"); + + // 构建查询条件,根据上次同步状态获取新记录 + const query: Record = {}; + + // 如果有上次同步状态,则只获取更新的记录 + if (lastSyncState && lastSyncState.last_sync_time) { + // 使用上次同步时间作为过滤条件 + query.createTime = { $gt: lastSyncState.last_sync_time }; + logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`); + } + + // 计算总记录数 + const totalRecords = await shortCollection.countDocuments(query); + logWithTimestamp(`找到 ${totalRecords} 条新记录需要同步`); + + // 限制此次处理的记录数量 + const recordsToProcess = Math.min(totalRecords, max_records); + logWithTimestamp(`本次将处理 ${recordsToProcess} 条记录`); + + if (totalRecords === 0) { + logWithTimestamp("没有新记录需要同步,任务完成"); + return { + success: true, + records_synced: 0, + message: "没有新记录需要同步" + }; + } + + // 检查记录是否已经存在于PostgreSQL中 + const checkExistingRecords = async (records: ShortRecord[]): Promise => { + if (records.length === 0) return []; + + // 如果跳过重复检查或强制插入,则直接返回所有记录 + if (skip_duplicate_check || force_insert) { + logWithTimestamp(`已跳过重复检查,准备处理所有 ${records.length} 条记录`); + return records; + } + + logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于PostgreSQL中...`); + + try { + // 提取所有记录的slugs + const slugs = records.map(record => record.slug); + + // 查询PostgreSQL中是否已存在这些slugs + const result = await pgClient!.queryArray(` + SELECT slug FROM ${postgresConfig.schema}.shorturl + WHERE slug = ANY($1::text[]) + `, [slugs]); + + // 将已存在的slugs加入到集合中 + const existingSlugs = new Set(); + for (const row of result.rows) { + existingSlugs.add(row[0] as string); + } + + logWithTimestamp(`检测到 ${existingSlugs.size} 条记录已存在于PostgreSQL中`); + + // 过滤出不存在的记录 + const newRecords = records.filter(record => !existingSlugs.has(record.slug)); + logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`); + + return newRecords; + } catch (err) { + const error = err as Error; + logWithTimestamp(`PostgreSQL查询出错: ${error.message}`); + if (skip_duplicate_check) { + logWithTimestamp("已启用跳过重复检查,将继续处理所有记录"); + return records; + } else { + throw error; + } + } + }; + + // 处理记录的函数 + const processRecords = async (records: ShortRecord[]) => { + if (records.length === 0) return 0; + + logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); + + // 检查记录是否已存在 + let newRecords; + try { + newRecords = await checkExistingRecords(records); + } catch (err) { + const error = err as Error; + logWithTimestamp(`检查记录是否存在时出错: ${error.message}`); + if (!skip_duplicate_check && !force_insert) { + throw error; + } + // 如果跳过检查或强制插入,则使用所有记录 + logWithTimestamp("将使用所有记录进行处理"); + newRecords = records; + } + + if (newRecords.length === 0) { + logWithTimestamp("所有记录都已存在,跳过处理"); + return 0; + } + + logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`); + + // 批量插入PostgreSQL + try { + // 开始事务 + await pgClient!.queryArray('BEGIN'); + + let insertedCount = 0; + + // 由于参数可能很多,按小批次处理 + const smallBatchSize = 100; + for (let i = 0; i < newRecords.length; i += smallBatchSize) { + const batchRecords = newRecords.slice(i, i + smallBatchSize); + + // 构造批量插入语句 + const placeholders = []; + const values = []; + let valueIndex = 1; + + for (const record of batchRecords) { + // 参考提供的字段处理方式处理数据 + const createdAt = parseDate(record.createTime); + const updatedAt = createdAt; // 设置更新时间等于创建时间 + const fullShortUrl = `${domain}/${record.slug}`; + + placeholders.push(`($${valueIndex}, $${valueIndex+1}, $${valueIndex+2}, $${valueIndex+3}, $${valueIndex+4}, $${valueIndex+5}, $${valueIndex+6}, $${valueIndex+7}, $${valueIndex+8}, $${valueIndex+9}, $${valueIndex+10}, $${valueIndex+11}, $${valueIndex+12})`); + + values.push( + record._id.toString(), // id + record.slug, // slug + domain, // domain (使用提供的域名) + record.slug, // name (使用slug作为name) + record.slug, // title (使用slug作为title) + record.origin || '', // origin + createdAt, // created_at + updatedAt, // updated_at + fullShortUrl, // full_short_url + record.image || null, // image + record.description || null, // description + record.expiredUrl || null, // expired_url + parseDate(record.expiredAt) // expired_at + ); + + valueIndex += 13; + } + + const query = ` + INSERT INTO ${postgresConfig.schema}.shorturl + (id, slug, domain, name, title, origin, created_at, updated_at, full_short_url, image, description, expired_url, expired_at) + VALUES ${placeholders.join(', ')} + `; + + await pgClient!.queryArray(query, values); + insertedCount += batchRecords.length; + logWithTimestamp(`已插入 ${insertedCount}/${newRecords.length} 条记录`); + } + + // 提交事务 + await pgClient!.queryArray('COMMIT'); + + logWithTimestamp(`成功插入 ${insertedCount} 条记录到PostgreSQL`); + return insertedCount; + } catch (err) { + const error = err as Error; + // 发生错误,回滚事务 + await pgClient!.queryArray('ROLLBACK'); + logWithTimestamp(`向PostgreSQL插入数据失败: ${error.message}`); + throw error; + } + }; + + // 批量处理记录 + let processedRecords = 0; + let totalBatchRecords = 0; + let lastSyncTime = 0; + let lastSyncId = ""; + + for (let page = 0; processedRecords < recordsToProcess; page++) { + // 检查超时 + if (checkTimeout()) { + logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`); + break; + } + + // 每批次都输出进度 + logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + + logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`); + const records = await shortCollection.find( + query, + { + sort: { createTime: 1 }, + skip: page * batch_size, + limit: batch_size + } + ).toArray(); + + if (records.length === 0) { + logWithTimestamp("没有找到更多数据,同步结束"); + break; + } + + // 找到数据,开始处理 + logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`); + + // 输出当前批次的部分数据信息 + if (records.length > 0) { + logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, slug=${records[0].slug}, 时间=${new Date(typeof records[0].createTime === 'number' ? records[0].createTime : 0).toISOString()}`); + if (records.length > 1) { + const lastRec = records[records.length-1]; + logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${lastRec._id}, slug=${lastRec.slug}, 时间=${new Date(typeof lastRec.createTime === 'number' ? lastRec.createTime : 0).toISOString()}`); + } + } + + const batchSize = await processRecords(records); + processedRecords += records.length; + totalBatchRecords += batchSize; + + // 更新最后处理的记录时间和ID + if (records.length > 0) { + const lastRecord = records[records.length - 1]; + // 提取数字时间戳 + let lastCreateTime = 0; + if (typeof lastRecord.createTime === 'number') { + lastCreateTime = lastRecord.createTime; + } else if (lastRecord.createTime && lastRecord.createTime.$numberLong) { + lastCreateTime = Number(lastRecord.createTime.$numberLong); + } + + lastSyncTime = Math.max(lastSyncTime, lastCreateTime); + lastSyncId = lastRecord._id.toString(); + } + + logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); + } + + // 更新同步状态 + if (processedRecords > 0 && lastSyncTime > 0) { + // 创建新的同步状态 + const newSyncState: SyncState = { + last_sync_time: lastSyncTime, + records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords, + last_sync_id: lastSyncId + }; + + try { + // 保存同步状态 + await setVariable(SYNC_STATE_KEY, newSyncState); + logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); + } catch (err) { + const error = err as Error; + logWithTimestamp(`更新同步状态失败: ${error.message}`); + } + } + + return { + success: true, + records_processed: processedRecords, + records_synced: totalBatchRecords, + last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null, + message: "数据同步完成" + }; + } catch (err) { + const error = err as Error; + console.error("同步过程中发生错误:", error); + return { + success: false, + error: error.message, + stack: error.stack + }; + } finally { + // 关闭连接 + if (pgClient) { + await pgClient.end(); + logWithTimestamp("PostgreSQL连接已关闭"); + } + await mongoClient.close(); + logWithTimestamp("MongoDB连接已关闭"); + } +} \ No newline at end of file diff --git a/windmill/sync_mongo_to_events.ts b/windmill/sync_mongo_to_events.ts index 6cb2811..1a5e989 100644 --- a/windmill/sync_mongo_to_events.ts +++ b/windmill/sync_mongo_to_events.ts @@ -39,13 +39,17 @@ interface SyncState { last_sync_id?: string; } +// 同步状态键名 +const SYNC_STATE_KEY = "f/shorturl_analytics/mongo_sync_state"; + export async function main( batch_size = 1000, max_records = 9999999, timeout_minutes = 60, skip_clickhouse_check = false, force_insert = false, - database_override = "shorturl_analytics" // 添加数据库名称参数,默认为shorturl_analytics + database_override = "shorturl_analytics", // 添加数据库名称参数,默认为shorturl_analytics + reset_sync_state = false // 添加参数用于重置同步状态 ) { const logWithTimestamp = (message: string) => { const now = new Date(); @@ -60,6 +64,9 @@ export async function main( if (force_insert) { logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录"); } + if (reset_sync_state) { + logWithTimestamp("⚠️ 警告: 已启用重置同步状态,将从头开始同步数据"); + } // 设置超时 const startTime = Date.now(); @@ -127,6 +134,36 @@ export async function main( throw error; } + // 获取上次同步状态 + let lastSyncState: SyncState | null = null; + if (!reset_sync_state) { + try { + const rawSyncState = await getVariable(SYNC_STATE_KEY); + if (rawSyncState) { + if (typeof rawSyncState === "string") { + try { + lastSyncState = JSON.parse(rawSyncState); + } catch (e) { + logWithTimestamp(`解析上次同步状态失败: ${e}, 将从头开始同步`); + } + } else { + lastSyncState = rawSyncState as SyncState; + } + } + } catch (error) { + logWithTimestamp(`获取上次同步状态失败: ${error}, 将从头开始同步`); + } + } + + if (lastSyncState) { + logWithTimestamp(`找到上次同步状态: 最后同步时间 ${new Date(lastSyncState.last_sync_time).toISOString()}, 已同步记录数 ${lastSyncState.records_synced}`); + if (lastSyncState.last_sync_id) { + logWithTimestamp(`最后同步ID: ${lastSyncState.last_sync_id}`); + } + } else { + logWithTimestamp("没有找到上次同步状态,将从头开始同步"); + } + // 构建MongoDB连接URL let mongoUrl = "mongodb://"; if (mongoConfig.username && mongoConfig.password) { @@ -145,25 +182,32 @@ export async function main( const db = client.database(mongoConfig.db); const traceCollection = db.collection("trace"); - // 构建查询条件,获取所有记录 + // 构建查询条件,根据上次同步状态获取新记录 const query: Record = { type: 1 // 只同步type为1的记录 }; + // 如果有上次同步状态,则只获取更新的记录 + if (lastSyncState && lastSyncState.last_sync_time) { + // 使用上次同步时间作为过滤条件 + query.createTime = { $gt: lastSyncState.last_sync_time }; + logWithTimestamp(`将只同步createTime > ${lastSyncState.last_sync_time} (${new Date(lastSyncState.last_sync_time).toISOString()}) 的记录`); + } + // 计算总记录数 const totalRecords = await traceCollection.countDocuments(query); - console.log(`找到 ${totalRecords} 条记录需要同步`); + console.log(`找到 ${totalRecords} 条新记录需要同步`); // 限制此次处理的记录数量 const recordsToProcess = Math.min(totalRecords, max_records); console.log(`本次将处理 ${recordsToProcess} 条记录`); if (totalRecords === 0) { - console.log("没有记录需要同步,任务完成"); + console.log("没有新记录需要同步,任务完成"); return { success: true, records_synced: 0, - message: "没有记录需要同步" + message: "没有新记录需要同步" }; } @@ -464,6 +508,8 @@ export async function main( // 批量处理记录 let processedRecords = 0; let totalBatchRecords = 0; + let lastSyncTime = 0; + let lastSyncId = ""; for (let page = 0; processedRecords < recordsToProcess; page++) { // 检查超时 @@ -505,13 +551,40 @@ export async function main( processedRecords += records.length; totalBatchRecords += batchSize; + // 更新最后处理的记录时间和ID + if (records.length > 0) { + const lastRecord = records[records.length - 1]; + lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime); + lastSyncId = lastRecord._id.toString(); + } + logWithTimestamp(`第 ${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); } + // 更新同步状态 + if (processedRecords > 0 && lastSyncTime > 0) { + // 创建新的同步状态 + const newSyncState: SyncState = { + last_sync_time: lastSyncTime, + records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords, + last_sync_id: lastSyncId + }; + + try { + // 保存同步状态 + await setVariable(SYNC_STATE_KEY, newSyncState); + logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); + } catch (err) { + const error = err as Error; + logWithTimestamp(`更新同步状态失败: ${error.message}`); + } + } + return { success: true, records_processed: processedRecords, records_synced: totalBatchRecords, + last_sync_time: lastSyncTime > 0 ? new Date(lastSyncTime).toISOString() : null, message: "数据同步完成" }; } catch (err) { diff --git a/windmill/sync_shorturl_schema_to_clickhouse.ts b/windmill/sync_shorturl_schema_to_clickhouse.ts index 5d488f5..48da563 100644 --- a/windmill/sync_shorturl_schema_to_clickhouse.ts +++ b/windmill/sync_shorturl_schema_to_clickhouse.ts @@ -3,7 +3,17 @@ // 创建日期: 2023-11-21 import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts"; -import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts"; +import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts"; + +// 同步状态接口 +interface SyncState { + last_sync_time: string; // 上次同步的结束时间 + records_synced: number; // 累计同步的记录数 + last_run: string; // 上次运行的时间 +} + +// 同步状态键名 +const SYNC_STATE_KEY = "f/shorturl_analytics/shorturl_to_clickhouse_state"; // PostgreSQL配置接口 interface PgConfig { @@ -42,24 +52,15 @@ interface ShortUrlData { * 同步PostgreSQL short_url.shorturl表数据到ClickHouse */ export async function main( - params: { - /** 同步数据的开始时间(ISO 8601格式)。默认为1小时前 */ - start_time?: string; - /** 同步数据的结束时间(ISO 8601格式)。默认为当前时间 */ - end_time?: string; - /** 是否为测试模式(不执行实际更新) */ - dry_run?: boolean; - /** 是否显示详细日志 */ - verbose?: boolean; - } + /** 是否为测试模式(不执行实际更新) */ + dry_run = false, + /** 是否显示详细日志 */ + verbose = false, + /** 是否重置同步状态(从头开始同步) */ + reset_sync_state = false, + /** 如果没有同步状态,往前查询多少小时的数据(默认1小时) */ + default_hours_back = 1 ) { - // 设置默认参数 - const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000).toISOString(); - const start_time = params.start_time || oneHourAgo; - const end_time = params.end_time || new Date().toISOString(); - const dry_run = params.dry_run || false; - const verbose = params.verbose || false; - // 初始化日志函数 const log = (message: string, isVerbose = false) => { if (!isVerbose || verbose) { @@ -67,6 +68,33 @@ export async function main( } }; + // 获取同步状态 + let syncState: SyncState | null = null; + if (!reset_sync_state) { + try { + log("获取同步状态...", true); + const rawState = await getVariable(SYNC_STATE_KEY); + if (rawState) { + if (typeof rawState === "string") { + syncState = JSON.parse(rawState); + } else { + syncState = rawState as SyncState; + } + log(`找到上次同步状态: 最后同步时间 ${syncState.last_sync_time}, 已同步记录数 ${syncState.records_synced}`, true); + } + } catch (error) { + log(`获取同步状态失败: ${error}, 将使用默认设置`, true); + } + } else { + log("重置同步状态,从头开始同步", true); + } + + // 设置时间范围 + const oneHourAgo = new Date(Date.now() - default_hours_back * 60 * 60 * 1000).toISOString(); + // 如果有同步状态,使用上次同步时间作为开始时间;否则使用默认时间 + const start_time = syncState ? syncState.last_sync_time : oneHourAgo; + const end_time = new Date().toISOString(); + log(`开始同步shorturl表数据: ${start_time} 至 ${end_time}`); let pgPool: Pool | null = null; @@ -90,6 +118,10 @@ export async function main( log(`成功获取 ${shorturlData.length} 条shorturl数据`); if (shorturlData.length === 0) { + // 更新同步状态,即使没有新数据 + if (!dry_run) { + await updateSyncState(end_time, syncState ? syncState.records_synced : 0, log); + } return { success: true, message: "没有找到需要更新的数据", updated: 0 }; } @@ -100,10 +132,19 @@ export async function main( if (!dry_run) { const shorturlUpdated = await updateClickHouseShortUrl(shorturlData, chConfig, log); + // 更新同步状态 + const totalSynced = (syncState ? syncState.records_synced : 0) + shorturlUpdated; + await updateSyncState(end_time, totalSynced, log); + return { success: true, message: "shorturl表数据同步完成", - shorturl_updated: shorturlUpdated + shorturl_updated: shorturlUpdated, + total_synced: totalSynced, + sync_state: { + last_sync_time: end_time, + records_synced: totalSynced + } }; } else { log("测试模式: 不执行实际更新"); @@ -129,6 +170,25 @@ export async function main( } } +/** + * 更新同步状态 + */ +async function updateSyncState(lastSyncTime: string, recordsSynced: number, log: (message: string, isVerbose?: boolean) => void): Promise { + try { + const newState: SyncState = { + last_sync_time: lastSyncTime, + records_synced: recordsSynced, + last_run: new Date().toISOString() + }; + + await setVariable(SYNC_STATE_KEY, newState); + log(`同步状态已更新: 最后同步时间 ${lastSyncTime}, 累计同步记录数 ${recordsSynced}`, true); + } catch (error) { + log(`更新同步状态失败: ${error}`, true); + // 继续执行,不中断同步过程 + } +} + /** * 从PostgreSQL获取shorturl数据 */