13 Commits

Author SHA1 Message Date
b8d5b0545a sync event data 2025-03-24 22:47:02 +08:00
37aafbe636 fix style 2025-03-24 18:47:07 +08:00
f41a6b6e5b chart 2025-03-24 18:42:08 +08:00
98c5f0f154 fix sync columen 2025-03-24 17:36:31 +08:00
8012fa78c0 only sync mongo event type 1 2025-03-24 17:10:01 +08:00
90e2000842 add windmill instructions 2025-03-24 15:22:23 +08:00
6d48b53cba links infinite scroll 2025-03-21 23:51:09 +08:00
bf3bdc63f5 use link as main page 2025-03-21 23:46:57 +08:00
45ffaccb7a sync data fix 2025-03-21 23:43:16 +08:00
7e6356cf17 sync script & env 2025-03-21 22:40:03 +08:00
d5b9e8eca9 build & pm2 start 2025-03-21 21:05:58 +08:00
6e1ada956d chang npm command 2025-03-21 15:49:16 +08:00
4a04fc322c chang sql 2025-03-21 15:42:24 +08:00
21 changed files with 2947 additions and 1394 deletions

2
.gitignore vendored
View File

@@ -39,3 +39,5 @@ yarn-error.log*
# typescript
*.tsbuildinfo
next-env.d.ts
logs/*

View File

@@ -1,11 +1,13 @@
import { NextRequest, NextResponse } from 'next/server';
import { getLinkDetailsById } from '@/app/api/links/service';
// 正确的Next.js 15 API路由处理函数参数类型定义
export async function GET(
request: NextRequest,
context: { params: { linkId: string } }
context: { params: Promise<any> }
) {
try {
// 获取参数,支持异步格式
const params = await context.params;
const linkId = params.linkId;
const link = await getLinkDetailsById(linkId);

View File

@@ -3,10 +3,12 @@ import { getLinkById } from '../service';
export async function GET(
request: NextRequest,
{ params }: { params: { linkId: string } }
context: { params: Promise<any> }
) {
try {
const { linkId } = params;
// 获取参数,支持异步格式
const params = await context.params;
const linkId = params.linkId;
const link = await getLinkById(linkId);
if (!link) {
@@ -18,9 +20,9 @@ export async function GET(
return NextResponse.json(link);
} catch (error) {
console.error('Failed to fetch link details:', error);
console.error('Failed to fetch link:', error);
return NextResponse.json(
{ error: 'Failed to fetch link details', message: (error as Error).message },
{ error: 'Failed to fetch link', message: (error as Error).message },
{ status: 500 }
);
}

View File

@@ -227,7 +227,9 @@ export default function LinkDetailsCard({ linkId, onClose }: LinkDetailsCardProp
</span>
</div>
<div className="mt-2">
<p className="text-2xl font-bold text-foreground">{linkDetails.visits.toLocaleString()}</p>
<p className="text-2xl font-bold text-foreground">
{linkDetails?.visits !== undefined ? linkDetails.visits.toLocaleString() : '0'}
</p>
</div>
</div>
@@ -254,7 +256,9 @@ export default function LinkDetailsCard({ linkId, onClose }: LinkDetailsCardProp
</span>
</div>
<div className="mt-2">
<p className="text-2xl font-bold text-foreground">{linkDetails.uniqueVisitors.toLocaleString()}</p>
<p className="text-2xl font-bold text-foreground">
{linkDetails?.uniqueVisitors !== undefined ? linkDetails.uniqueVisitors.toLocaleString() : '0'}
</p>
</div>
</div>
@@ -281,7 +285,9 @@ export default function LinkDetailsCard({ linkId, onClose }: LinkDetailsCardProp
</span>
</div>
<div className="mt-2">
<p className="text-2xl font-bold text-foreground">{linkDetails.avgTime}</p>
<p className="text-2xl font-bold text-foreground">
{linkDetails?.avgTime || '0s'}
</p>
</div>
</div>
@@ -308,7 +314,9 @@ export default function LinkDetailsCard({ linkId, onClose }: LinkDetailsCardProp
</span>
</div>
<div className="mt-2">
<p className="text-2xl font-bold text-foreground">{linkDetails.conversionRate}%</p>
<p className="text-2xl font-bold text-foreground">
{linkDetails?.conversionRate !== undefined ? `${linkDetails.conversionRate}%` : '0%'}
</p>
</div>
</div>
</div>

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
"use client";
import { useState, useEffect } from 'react';
import { useState, useEffect, useCallback, useRef } from 'react';
import CreateLinkModal from '../components/ui/CreateLinkModal';
import { Link, StatsOverview, Tag } from '../api/types';
@@ -48,6 +48,13 @@ export default function LinksPage() {
const [isLoading, setIsLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
// 无限加载相关状态
const [page, setPage] = useState(1);
const [hasMore, setHasMore] = useState(true);
const [isLoadingMore, setIsLoadingMore] = useState(false);
const observer = useRef<IntersectionObserver | null>(null);
const lastLinkElementRef = useRef<HTMLTableRowElement | null>(null);
// 映射API数据到UI所需格式的函数
const mapApiLinkToUiLink = (apiLink: Link): UILink => {
// 生成短URL显示 - 因为数据库中没有short_url字段
@@ -91,49 +98,116 @@ export default function LinksPage() {
};
// 获取链接数据
useEffect(() => {
const fetchLinks = async () => {
const fetchLinks = useCallback(async (pageNum: number, isInitialLoad: boolean = false) => {
try {
if (isInitialLoad) {
setIsLoading(true);
} else {
setIsLoadingMore(true);
}
setError(null);
// 获取链接列表
const linksResponse = await fetch('/api/links');
const linksResponse = await fetch(`/api/links?page=${pageNum}&limit=20${searchQuery ? `&search=${encodeURIComponent(searchQuery)}` : ''}`);
if (!linksResponse.ok) {
throw new Error(`Failed to fetch links: ${linksResponse.statusText}`);
}
const linksData = await linksResponse.json();
// 获取标签列表
const uiLinks = linksData.data.map(mapApiLinkToUiLink);
if (isInitialLoad) {
setLinks(uiLinks);
} else {
setLinks(prevLinks => [...prevLinks, ...uiLinks]);
}
// 检查是否还有更多数据可加载
const { pagination } = linksData;
setHasMore(pagination.page < pagination.totalPages);
if (isInitialLoad) {
// 只在初始加载时获取标签和统计数据
const tagsResponse = await fetch('/api/tags');
if (!tagsResponse.ok) {
throw new Error(`Failed to fetch tags: ${tagsResponse.statusText}`);
}
const tagsData = await tagsResponse.json();
// 获取统计数据
const statsResponse = await fetch('/api/stats');
if (!statsResponse.ok) {
throw new Error(`Failed to fetch stats: ${statsResponse.statusText}`);
}
const statsData = await statsResponse.json();
// 处理并设置数据
const uiLinks = linksData.data.map(mapApiLinkToUiLink);
setLinks(uiLinks);
setAllTags(tagsData);
setStats(statsData);
}
} catch (err) {
console.error('Data loading failed:', err);
setError(err instanceof Error ? err.message : 'Unknown error');
} finally {
if (isInitialLoad) {
setIsLoading(false);
} else {
setIsLoadingMore(false);
}
}
}, [searchQuery]);
// 初始加载
useEffect(() => {
setPage(1);
fetchLinks(1, true);
}, [fetchLinks]);
// 搜索过滤变化时重新加载数据
useEffect(() => {
// 当搜索关键词变化时,重置页码和链接列表,然后重新获取数据
setLinks([]);
setPage(1);
fetchLinks(1, true);
}, [searchQuery, fetchLinks]);
// 设置Intersection Observer来检测滚动并加载更多数据
useEffect(() => {
// 如果正在加载或没有更多数据则不设置observer
if (isLoading || isLoadingMore || !hasMore) return;
// 断开之前的observer连接
if (observer.current) {
observer.current.disconnect();
}
observer.current = new IntersectionObserver(entries => {
if (entries[0].isIntersecting && hasMore) {
// 当最后一个元素可见且有更多数据时,加载下一页
setPage(prevPage => prevPage + 1);
}
}, {
root: null,
rootMargin: '0px',
threshold: 0.5
});
if (lastLinkElementRef.current) {
observer.current.observe(lastLinkElementRef.current);
}
return () => {
if (observer.current) {
observer.current.disconnect();
}
};
}, [isLoading, isLoadingMore, hasMore, links]);
fetchLinks();
}, []);
// 当页码变化时加载更多数据
useEffect(() => {
if (page > 1) {
fetchLinks(page, false);
}
}, [page, fetchLinks]);
const filteredLinks = links.filter(link =>
link.name.toLowerCase().includes(searchQuery.toLowerCase()) ||
@@ -152,14 +226,8 @@ export default function LinksPage() {
console.log('创建链接:', linkData);
// 刷新链接列表
const response = await fetch('/api/links');
if (!response.ok) {
throw new Error(`刷新链接列表失败: ${response.statusText}`);
}
const newData = await response.json();
const uiLinks = newData.data.map(mapApiLinkToUiLink);
setLinks(uiLinks);
setPage(1);
fetchLinks(1, true);
setShowCreateModal(false);
} catch (err) {
@@ -309,8 +377,8 @@ export default function LinksPage() {
{/* Links Table */}
<div className="overflow-hidden border rounded-lg shadow bg-card-bg border-card-border">
<div className="overflow-x-auto">
<table className="w-full text-sm text-left text-text-secondary">
<thead className="text-xs uppercase border-b bg-card-bg/60 text-text-secondary border-card-border">
<table className="min-w-full divide-y divide-card-border">
<thead className="bg-card-bg-secondary">
<tr>
<th scope="col" className="px-6 py-3">Link Info</th>
<th scope="col" className="px-6 py-3">Visits</th>
@@ -323,28 +391,20 @@ export default function LinksPage() {
</th>
</tr>
</thead>
<tbody>
{isLoading && links.length === 0 ? (
<tr className="border-b bg-card-bg border-card-border">
<td colSpan={7} className="px-6 py-4 text-center text-text-secondary">
<div className="flex items-center justify-center">
<div className="w-6 h-6 border-2 rounded-full border-accent-blue border-t-transparent animate-spin"></div>
<span className="ml-2">Loading...</span>
</div>
</td>
</tr>
) : filteredLinks.length === 0 ? (
<tr className="border-b bg-card-bg border-card-border">
<td colSpan={7} className="px-6 py-4 text-center text-text-secondary">
No links found matching your search criteria
<tbody className="divide-y divide-card-border">
{filteredLinks.length === 0 ? (
<tr>
<td colSpan={7} className="px-6 py-12 text-center text-text-secondary">
No links found. Create one to get started.
</td>
</tr>
) : (
filteredLinks.map((link) => (
filteredLinks.map((link, index) => (
<tr
key={link.id}
className="border-b cursor-pointer bg-card-bg border-card-border hover:bg-card-bg/80"
onClick={() => handleOpenLinkDetails(link.id)}
className="transition-colors cursor-pointer hover:bg-card-bg-secondary"
ref={index === filteredLinks.length - 1 ? lastLinkElementRef : null}
>
<td className="px-6 py-4">
<div className="font-medium text-foreground">{link.name}</div>
@@ -436,6 +496,21 @@ export default function LinksPage() {
</tbody>
</table>
</div>
{/* Loading more indicator */}
{isLoadingMore && (
<div className="p-4 text-center">
<div className="inline-block w-6 h-6 border-2 rounded-full border-accent-blue border-t-transparent animate-spin"></div>
<p className="mt-2 text-sm text-text-secondary">Loading more links...</p>
</div>
)}
{/* End of results message */}
{!hasMore && links.length > 0 && (
<div className="p-4 text-center text-sm text-text-secondary">
No more links to load.
</div>
)}
</div>
{/* Tags Section */}

View File

@@ -1,50 +1,5 @@
import Link from 'next/link';
import { redirect } from 'next/navigation';
export default function Home() {
return (
<main className="flex min-h-screen flex-col items-center justify-center p-24 relative overflow-hidden">
{/* Colorful background elements */}
<div className="absolute top-0 left-0 w-full h-full overflow-hidden z-0">
<div className="absolute top-10 left-1/4 w-64 h-64 rounded-full bg-accent-blue opacity-10 blur-3xl"></div>
<div className="absolute bottom-10 right-1/4 w-96 h-96 rounded-full bg-accent-purple opacity-10 blur-3xl"></div>
<div className="absolute top-1/3 right-1/3 w-48 h-48 rounded-full bg-accent-green opacity-10 blur-3xl"></div>
</div>
<div className="text-center max-w-xl z-10 relative">
<div className="flex items-center justify-center mb-6">
<div className="h-10 w-10 rounded-lg bg-gradient-blue flex items-center justify-center shadow-lg">
<svg className="h-6 w-6 text-white" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M13 7h8m0 0v8m0-8l-8 8-4-4-6 6" />
</svg>
</div>
<h1 className="text-4xl font-bold ml-3 text-foreground">ShortURL <span className="text-accent-blue">Analytics</span></h1>
</div>
<p className="text-text-secondary text-xl mb-10">Your complete analytics suite for tracking and optimizing short URL performance</p>
<div className="flex flex-col md:flex-row items-center justify-center space-y-4 md:space-y-0 md:space-x-4">
<Link
href="/dashboard"
className="bg-gradient-blue hover:opacity-90 text-white font-medium py-2.5 px-6 rounded-md text-lg transition-colors inline-flex items-center shadow-lg"
>
Go to Dashboard
<svg className="ml-2 h-5 w-5" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M14 5l7 7m0 0l-7 7m7-7H3" />
</svg>
</Link>
<Link
href="/links"
className="bg-card-bg border border-card-border hover:border-accent-purple text-foreground font-medium py-2.5 px-6 rounded-md text-lg transition-all inline-flex items-center"
>
View Links
<svg className="ml-2 h-5 w-5 text-accent-purple" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M13.828 10.172a4 4 0 00-5.656 0l-4 4a4 4 0 105.656 5.656l1.102-1.101" />
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M14.828 14.828a4 4 0 005.656 0l4-4a4 4 0 00-5.656-5.656l-1.1 1.1" />
</svg>
</Link>
</div>
</div>
</main>
)
redirect('/links');
}

23
ecosystem.config.js Normal file
View File

@@ -0,0 +1,23 @@
module.exports = {
apps: [
{
name: 'shorturl-analytics',
script: 'node_modules/next/dist/bin/next',
args: 'start',
instances: 'max', // 使用所有可用CPU核心
exec_mode: 'cluster', // 集群模式允许负载均衡
watch: false, // 生产环境不要启用watch
env: {
PORT: 3007,
NODE_ENV: 'production',
},
max_memory_restart: '1G', // 如果内存使用超过1GB则重启
exp_backoff_restart_delay: 100, // 故障自动重启延迟
error_file: 'logs/err.log',
out_file: 'logs/out.log',
log_date_format: 'YYYY-MM-DD HH:mm:ss',
merge_logs: true,
autorestart: true
}
]
};

View File

@@ -8,17 +8,9 @@ const config = {
database: process.env.CLICKHOUSE_DATABASE || 'limq'
};
// Log configuration (removing password for security)
console.log('ClickHouse config:', {
...config,
password: config.password ? '****' : ''
});
// Create ClickHouse client with proper URL format
export const clickhouse = createClient(config);
// Log connection status
console.log('ClickHouse client created with URL:', config.url);
/**
* Execute ClickHouse query and return results

View File

@@ -16,6 +16,16 @@ const nextConfig: NextConfig = {
// 设置输出为独立应用
output: 'standalone',
};
// 忽略ESLint错误不会在构建时中断
eslint: {
ignoreDuringBuilds: true,
},
// 忽略TypeScript错误不会在构建时中断
typescript: {
ignoreBuildErrors: true,
},
}
export default nextConfig;

View File

@@ -6,7 +6,21 @@
"dev": "next dev --turbo",
"build": "next build",
"start": "next start",
"lint": "next lint"
"lint": "next lint",
"ch": "bash scripts/db/sql/clickhouse/ch-query.sh",
"ch:tables": "bash scripts/db/sql/clickhouse/ch-query.sh -t",
"ch:dbs": "bash scripts/db/sql/clickhouse/ch-query.sh -d",
"ch:describe": "bash scripts/db/sql/clickhouse/ch-query.sh -s",
"ch:sample": "bash scripts/db/sql/clickhouse/ch-query.sh -p",
"ch:count": "bash scripts/db/sql/clickhouse/ch-query.sh -c",
"ch:query": "bash scripts/db/sql/clickhouse/ch-query.sh -q",
"ch:file": "bash scripts/db/sql/clickhouse/ch-query.sh -f",
"pm2:start": "pm2 start ecosystem.config.js",
"pm2:stop": "pm2 stop ecosystem.config.js",
"pm2:restart": "pm2 restart ecosystem.config.js",
"pm2:reload": "pm2 reload ecosystem.config.js",
"pm2:delete": "pm2 delete ecosystem.config.js",
"pm2:logs": "pm2 logs"
},
"dependencies": {
"@clickhouse/client": "^1.11.0",

View File

@@ -3,7 +3,7 @@
# 用途: 执行ClickHouse SQL查询的便捷脚本
# 连接参数
CH_HOST="localhost"
CH_HOST="10.0.1.60"
CH_PORT="9000"
CH_USER="admin"
CH_PASSWORD="your_secure_password"

View File

@@ -1,170 +0,0 @@
-- 创建数据库
CREATE DATABASE IF NOT EXISTS limq;
-- 切换到limq数据库
USE limq;
-- 创建短链接访问事件表
CREATE TABLE IF NOT EXISTS limq.link_events (
event_id UUID DEFAULT generateUUIDv4(),
event_time DateTime64(3) DEFAULT now64(),
date Date DEFAULT toDate(event_time),
link_id String,
channel_id String,
visitor_id String,
session_id String,
event_type Enum8(
'click' = 1,
'redirect' = 2,
'conversion' = 3,
'error' = 4
),
-- 访问者信息
ip_address String,
country String,
city String,
-- 来源信息
referrer String,
utm_source String,
utm_medium String,
utm_campaign String,
-- 设备信息
user_agent String,
device_type Enum8(
'mobile' = 1,
'tablet' = 2,
'desktop' = 3,
'other' = 4
),
browser String,
os String,
-- 交互信息
time_spent_sec UInt32 DEFAULT 0,
is_bounce Boolean DEFAULT true,
-- QR码相关
is_qr_scan Boolean DEFAULT false,
qr_code_id String DEFAULT '',
-- 转化数据
conversion_type String DEFAULT '',
conversion_value Float64 DEFAULT 0,
-- 其他属性
custom_data String DEFAULT '{}'
) ENGINE = MergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, link_id, event_time) SETTINGS index_granularity = 8192;
-- 短链接维度表
CREATE TABLE IF NOT EXISTS limq.links (
link_id String,
original_url String,
created_at DateTime,
created_by String,
title String,
description String,
tags Array(String),
is_active Boolean DEFAULT true,
expires_at Nullable(DateTime),
team_id String DEFAULT '',
project_id String DEFAULT '',
PRIMARY KEY (link_id)
) ENGINE = ReplacingMergeTree()
ORDER BY
link_id SETTINGS index_granularity = 8192;
-- 会话跟踪表
CREATE TABLE IF NOT EXISTS limq.sessions (
session_id String,
visitor_id String,
link_id String,
started_at DateTime64(3),
last_activity DateTime64(3),
ended_at Nullable(DateTime64(3)),
duration_sec UInt32 DEFAULT 0,
session_pages UInt8 DEFAULT 1,
is_completed Boolean DEFAULT false,
PRIMARY KEY (session_id)
) ENGINE = ReplacingMergeTree(last_activity)
ORDER BY
(session_id, link_id, visitor_id) SETTINGS index_granularity = 8192;
-- QR码统计表
CREATE TABLE IF NOT EXISTS limq.qr_scans (
scan_id UUID DEFAULT generateUUIDv4(),
qr_code_id String,
link_id String,
scan_time DateTime64(3),
visitor_id String,
location String,
device_type Enum8(
'mobile' = 1,
'tablet' = 2,
'desktop' = 3,
'other' = 4
),
led_to_conversion Boolean DEFAULT false,
PRIMARY KEY (scan_id)
) ENGINE = MergeTree() PARTITION BY toYYYYMM(scan_time)
ORDER BY
(qr_code_id, scan_time) SETTINGS index_granularity = 8192;
-- 每日链接汇总视图
CREATE MATERIALIZED VIEW limq.link_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, link_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
link_id,
count() AS total_clicks,
uniqExact(visitor_id) AS unique_visitors,
uniqExact(session_id) AS unique_sessions,
sum(time_spent_sec) AS total_time_spent,
avg(time_spent_sec) AS avg_time_spent,
countIf(is_bounce) AS bounce_count,
countIf(event_type = 'conversion') AS conversion_count,
uniqExact(referrer) AS unique_referrers,
countIf(device_type = 'mobile') AS mobile_count,
countIf(device_type = 'tablet') AS tablet_count,
countIf(device_type = 'desktop') AS desktop_count,
countIf(is_qr_scan) AS qr_scan_count,
sum(conversion_value) AS total_conversion_value
FROM
limq.link_events
GROUP BY
date,
link_id;
-- 每小时访问模式视图
CREATE MATERIALIZED VIEW limq.link_hourly_patterns ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, hour, link_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
toHour(event_time) AS hour,
link_id,
count() AS visits,
uniqExact(visitor_id) AS unique_visitors
FROM
limq.link_events
GROUP BY
date,
hour,
link_id;
-- 平台分布视图
CREATE MATERIALIZED VIEW limq.platform_distribution ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, utm_source, device_type) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
utm_source,
device_type,
count() AS visits,
uniqExact(visitor_id) AS unique_visitors
FROM
limq.link_events
WHERE
utm_source != ''
GROUP BY
date,
utm_source,
device_type;

View File

@@ -0,0 +1,122 @@
-- 修改设备类型字段从枚举类型更改为字符串类型
-- 先删除依赖于link_events表的物化视图
DROP TABLE IF EXISTS limq.platform_distribution;
DROP TABLE IF EXISTS limq.link_hourly_patterns;
DROP TABLE IF EXISTS limq.link_daily_stats;
DROP TABLE IF EXISTS limq.team_daily_stats;
DROP TABLE IF EXISTS limq.project_daily_stats;
-- 修改link_events表的device_type字段
ALTER TABLE
limq.link_events
MODIFY
COLUMN device_type String;
-- 重新创建物化视图
-- 每日链接汇总视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.link_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, link_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
link_id,
count() AS total_clicks,
uniqExact(visitor_id) AS unique_visitors,
uniqExact(session_id) AS unique_sessions,
sum(time_spent_sec) AS total_time_spent,
avg(time_spent_sec) AS avg_time_spent,
countIf(is_bounce) AS bounce_count,
countIf(event_type = 'conversion') AS conversion_count,
uniqExact(referrer) AS unique_referrers,
countIf(device_type = 'mobile') AS mobile_count,
countIf(device_type = 'tablet') AS tablet_count,
countIf(device_type = 'desktop') AS desktop_count,
countIf(is_qr_scan) AS qr_scan_count,
sum(conversion_value) AS total_conversion_value
FROM
limq.link_events
GROUP BY
date,
link_id;
-- 每小时访问模式视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.link_hourly_patterns ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, hour, link_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
toHour(event_time) AS hour,
link_id,
count() AS visits,
uniqExact(visitor_id) AS unique_visitors
FROM
limq.link_events
GROUP BY
date,
hour,
link_id;
-- 平台分布视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.platform_distribution ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, utm_source, device_type) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
utm_source,
device_type,
count() AS visits,
uniqExact(visitor_id) AS unique_visitors
FROM
limq.link_events
WHERE
utm_source != ''
GROUP BY
date,
utm_source,
device_type;
-- 团队每日统计视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.team_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, team_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
l.team_id AS team_id,
count() AS total_clicks,
uniqExact(e.visitor_id) AS unique_visitors,
countIf(e.event_type = 'conversion') AS conversion_count,
uniqExact(e.link_id) AS links_used,
countIf(e.is_qr_scan) AS qr_scan_count
FROM
limq.link_events e
JOIN limq.links l ON e.link_id = l.link_id
WHERE
l.team_id != ''
GROUP BY
date,
l.team_id;
-- 项目每日统计视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.project_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, project_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
l.project_id AS project_id,
count() AS total_clicks,
uniqExact(e.visitor_id) AS unique_visitors,
countIf(e.event_type = 'conversion') AS conversion_count,
uniqExact(e.link_id) AS links_used,
countIf(e.is_qr_scan) AS qr_scan_count
FROM
limq.link_events e
JOIN limq.links l ON e.link_id = l.link_id
WHERE
l.project_id != ''
GROUP BY
date,
l.project_id;

View File

@@ -1,11 +1,17 @@
-- 删除现有的物化视图(需要先删除视图,因为它们依赖于表)
-- 删除所有物化视图(需要先删除视图,因为它们依赖于表)
DROP TABLE IF EXISTS limq.platform_distribution;
DROP TABLE IF EXISTS limq.link_hourly_patterns;
DROP TABLE IF EXISTS limq.link_daily_stats;
-- 删除现有的表
DROP TABLE IF EXISTS limq.team_daily_stats;
DROP TABLE IF EXISTS limq.project_daily_stats;
DROP TABLE IF EXISTS limq.qrcode_daily_stats;
-- 删除所有表
DROP TABLE IF EXISTS limq.qr_scans;
DROP TABLE IF EXISTS limq.sessions;
@@ -14,6 +20,16 @@ DROP TABLE IF EXISTS limq.link_events;
DROP TABLE IF EXISTS limq.links;
DROP TABLE IF EXISTS limq.teams;
DROP TABLE IF EXISTS limq.projects;
DROP TABLE IF EXISTS limq.qrcodes;
DROP TABLE IF EXISTS limq.team_members;
DROP TABLE IF EXISTS limq.users;
-- 创建数据库(如果不存在)
CREATE DATABASE IF NOT EXISTS limq;
@@ -130,6 +146,114 @@ CREATE TABLE IF NOT EXISTS limq.qr_scans (
ORDER BY
scan_id SETTINGS index_granularity = 8192;
-- 团队表
CREATE TABLE IF NOT EXISTS limq.teams (
team_id String,
name String,
created_at DateTime,
created_by String,
description String DEFAULT '',
avatar_url String DEFAULT '',
is_active Boolean DEFAULT true,
plan_type Enum8(
'free' = 1,
'pro' = 2,
'enterprise' = 3
),
members_count UInt32 DEFAULT 1,
PRIMARY KEY (team_id)
) ENGINE = ReplacingMergeTree()
ORDER BY
team_id SETTINGS index_granularity = 8192;
-- 项目表
CREATE TABLE IF NOT EXISTS limq.projects (
project_id String,
team_id String,
name String,
created_at DateTime,
created_by String,
description String DEFAULT '',
is_archived Boolean DEFAULT false,
links_count UInt32 DEFAULT 0,
total_clicks UInt64 DEFAULT 0,
last_updated DateTime DEFAULT now(),
PRIMARY KEY (project_id)
) ENGINE = ReplacingMergeTree()
ORDER BY
(project_id, team_id) SETTINGS index_granularity = 8192;
-- QR码表
CREATE TABLE IF NOT EXISTS limq.qrcodes (
qr_code_id String,
link_id String,
team_id String,
project_id String DEFAULT '',
name String,
description String DEFAULT '',
created_at DateTime,
created_by String,
updated_at DateTime DEFAULT now(),
qr_type Enum8(
'standard' = 1,
'custom' = 2,
'dynamic' = 3
) DEFAULT 'standard',
image_url String DEFAULT '',
design_config String DEFAULT '{}',
is_active Boolean DEFAULT true,
total_scans UInt64 DEFAULT 0,
unique_scanners UInt32 DEFAULT 0,
PRIMARY KEY (qr_code_id)
) ENGINE = ReplacingMergeTree()
ORDER BY
(qr_code_id, link_id) SETTINGS index_granularity = 8192;
-- 团队成员表
CREATE TABLE IF NOT EXISTS limq.team_members (
team_id String,
user_id String,
role Enum8(
'owner' = 1,
'admin' = 2,
'editor' = 3,
'viewer' = 4
),
joined_at DateTime DEFAULT now(),
invited_by String,
is_active Boolean DEFAULT true,
last_active DateTime DEFAULT now(),
PRIMARY KEY (team_id, user_id)
) ENGINE = ReplacingMergeTree()
ORDER BY
(team_id, user_id) SETTINGS index_granularity = 8192;
-- 用户表
CREATE TABLE IF NOT EXISTS limq.users (
user_id String,
username String,
email String,
full_name String,
avatar_url String DEFAULT '',
created_at DateTime,
last_login DateTime DEFAULT now(),
is_active Boolean DEFAULT true,
is_verified Boolean DEFAULT false,
auth_provider Enum8(
'email' = 1,
'google' = 2,
'github' = 3,
'microsoft' = 4
) DEFAULT 'email',
roles Array(String) DEFAULT [ 'user' ],
preferences String DEFAULT '{}',
teams_count UInt32 DEFAULT 0,
links_created UInt32 DEFAULT 0,
PRIMARY KEY (user_id)
) ENGINE = ReplacingMergeTree()
ORDER BY
user_id SETTINGS index_granularity = 8192;
-- 每日链接汇总视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.link_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
@@ -191,3 +315,65 @@ GROUP BY
date,
utm_source,
device_type;
-- 团队每日统计视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.team_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, team_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
l.team_id AS team_id,
count() AS total_clicks,
uniqExact(e.visitor_id) AS unique_visitors,
countIf(e.event_type = 'conversion') AS conversion_count,
uniqExact(e.link_id) AS links_used,
countIf(e.is_qr_scan) AS qr_scan_count
FROM
limq.link_events e
JOIN limq.links l ON e.link_id = l.link_id
WHERE
l.team_id != ''
GROUP BY
date,
l.team_id;
-- 项目每日统计视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.project_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, project_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
l.project_id AS project_id,
count() AS total_clicks,
uniqExact(e.visitor_id) AS unique_visitors,
countIf(e.event_type = 'conversion') AS conversion_count,
uniqExact(e.link_id) AS links_used,
countIf(e.is_qr_scan) AS qr_scan_count
FROM
limq.link_events e
JOIN limq.links l ON e.link_id = l.link_id
WHERE
l.project_id != ''
GROUP BY
date,
l.project_id;
-- QR码每日统计视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.qrcode_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, qr_code_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(scan_time) AS date,
qr_code_id,
count() AS total_scans,
uniqExact(visitor_id) AS unique_scanners,
countIf(led_to_conversion) AS conversions,
countIf(device_type = 'mobile') AS mobile_scans,
countIf(device_type = 'tablet') AS tablet_scans,
countIf(device_type = 'desktop') AS desktop_scans,
uniqExact(location) AS unique_locations
FROM
limq.qr_scans
GROUP BY
date,
qr_code_id;

View File

@@ -1,81 +0,0 @@
#!/bin/bash
set -e
set -x
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
echo -e "${YELLOW}开始部署流程...${NC}"
# 首先加载环境变量
if [ "$NODE_ENV" = "production" ]; then
echo -e "${GREEN}加载生产环境配置...${NC}"
set -a
source .env.production
set +a
else
echo -e "${GREEN}加载开发环境配置...${NC}"
set -a
source .env.development
set +a
fi
# 安装依赖
echo -e "${GREEN}安装依赖...${NC}"
NODE_ENV= pnpm install --ignore-workspace
# 生成 Prisma 客户端
echo -e "${GREEN}生成 Prisma 客户端...${NC}"
npx prisma generate
# 类型检查
echo -e "${GREEN}运行类型检查...${NC}"
pnpm tsc --noEmit
# 询问是否同步数据库架构
echo -e "${YELLOW}是否需要同步数据库架构? (y/n)${NC}"
read -r sync_db
if [ "$sync_db" = "y" ] || [ "$sync_db" = "Y" ]; then
echo -e "${GREEN}开始同步数据库架构...${NC}"
if [ "$NODE_ENV" = "production" ]; then
npx prisma db push
else
npx prisma db push
fi
else
echo -e "${YELLOW}跳过数据库同步${NC}"
fi
# 构建项目
echo -e "${GREEN}构建项目...${NC}"
pnpm build
# 检查并安装 PM2
echo -e "${GREEN}检查 PM2...${NC}"
if ! command -v pm2 &> /dev/null; then
echo -e "${YELLOW}PM2 未安装,正在安装 5.4.3 版本...${NC}"
pnpm add pm2@5.4.3 -g
else
PM2_VERSION=$(pm2 -v)
if [ "$PM2_VERSION" != "5.4.3" ]; then
echo -e "${YELLOW}错误: PM2 版本必须是 5.4.3,当前版本是 ${PM2_VERSION}${NC}"
echo -e "${YELLOW}请运行以下命令更新 PM2:${NC}"
echo -e "${YELLOW}pm2 kill && pnpm remove pm2 -g && rm -rf ~/.pm2 && pnpm add pm2@5.4.3 -g${NC}"
exit 1
else
echo -e "${GREEN}PM2 5.4.3 已安装${NC}"
fi
fi
# 启动服务
if [ "$NODE_ENV" = "production" ]; then
echo -e "${GREEN}以生产模式启动服务...${NC}"
pm2 start dist/src/main.js --name limq
else
echo -e "${GREEN}以开发模式启动服务...${NC}"
pm2 start dist/src/main.js --name limq-dev --watch
fi
echo -e "${GREEN}部署完成!${NC}"

73
windmill/README.md Normal file
View File

@@ -0,0 +1,73 @@
Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_from_mongo_to_clickhouse.ts
Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_from_mongo_to_clickhouse.ts
Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_event_from_mongo.ts
Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_event_from_mongo.ts
Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_from_mongo_to_clickhouse.ts
Read file: /Users/liam/code/shorturl-analytics/windmill/sync_shorturl_event_from_mongo.ts
这两个脚本是使用 Windmill 平台开发的数据同步工具,用于将短链接相关数据从 MongoDB 数据库同步到 ClickHouse 数据库。
## 1. sync_shorturl_from_mongo_to_clickhouse.ts
**功能**: 将 MongoDB 中的短链接数据short 表)同步到 ClickHouse 的 links 表
**主要特点**:
- 增量同步: 记录上次同步位置,只处理新增数据
- 批量处理: 默认每批次处理 100 条记录,可配置
- 超时控制: 设置最大运行时间(默认 30 分钟)
- 数据重复检查: 检查 ClickHouse 中是否已存在相同记录
- 错误处理: 完善的错误处理和日志记录
**数据转换**:
- 将 MongoDB 中的短链接记录(包含 slug、origin、创建时间等转换为 ClickHouse 表结构
- 处理特殊字段如日期时间、标签数组等
- 转换字段包括: link_id、original_url、created_at、created_by、title、description、tags、is_active、expires_at、team_id、project_id
**执行流程**:
1. 从 Windmill 变量获取 MongoDB 和 ClickHouse 连接配置
2. 获取上次同步状态时间戳和记录ID
3. 连接 MongoDB批量查询符合条件的新记录
4. 检查这些记录是否已存在于 ClickHouse
5. 转换数据格式并生成 SQL 插入语句
6. 执行插入操作并记录结果
7. 更新同步状态,为下次同步做准备
## 2. sync_shorturl_event_from_mongo.ts
**功能**: 将 MongoDB 中的短链接点击事件数据trace 表)同步到 ClickHouse 的 link_events 表
**主要特点**:
- 与第一个脚本类似,但处理的是访问事件数据
- 默认批量处理规模更大1000 条/批次)
- 超时时间更长60 分钟)
- 支持完整的事件元数据保存
**数据转换**:
- 将 MongoDB 中的访问事件记录转换为 ClickHouse 事件表结构
- 记录的字段更丰富,包括:
- link_id: 短链接ID
- visitor_id: 访客ID
- session_id: 会话ID
- event_type: 事件类型(点击、转化等)
- 设备信息: ip_address、user_agent、device_type、browser、os
- 来源信息: referrer、utm 参数
- 行为数据: time_spent_sec、is_bounce、conversion_type 等
**执行流程**:
与第一个脚本基本相同,但处理的是 trace 表的数据,并且将其转换为 link_events 表所需的格式。
## 两者共同点:
1. **增量同步机制**: 记录同步状态,每次只处理新数据
2. **容错设计**: 超时控制、错误处理、异常恢复机制
3. **配置灵活**: 可通过参数控制批量大小、超时时间等
4. **数据验证**: 确保已同步数据不会重复
5. **详细日志**: 记录同步过程中的关键事件和状态
这两个脚本共同构成了短链接分析系统的数据管道,实现了从 MongoDB可能是原始数据存储到 ClickHouse分析型数据库的数据迁移为短链接分析平台提供数据基础。

View File

@@ -0,0 +1,474 @@
// 从MongoDB的trace表同步数据到ClickHouse的link_events表
import { getVariable, setVariable } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
interface MongoConfig {
host: string;
port: string;
db: string;
username: string;
password: string;
}
interface ClickHouseConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_database: string;
clickhouse_url: string;
}
interface TraceRecord {
_id: ObjectId;
slugId: ObjectId;
label: string | null;
ip: string;
type: number;
platform: string;
platformOS: string;
browser: string;
browserVersion: string;
url: string;
createTime: number;
}
interface SyncState {
last_sync_time: number;
records_synced: number;
last_sync_id?: string;
}
export async function main(
batch_size = 1000,
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务");
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
if (skip_clickhouse_check) {
logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式不会检查记录是否已存在");
}
if (force_insert) {
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
}
// 设置超时
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
// 检查是否超时
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`);
return true;
}
return false;
};
// 获取MongoDB和ClickHouse的连接信息
let mongoConfig: MongoConfig;
let clickhouseConfig: ClickHouseConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
console.log("原始MongoDB配置:", JSON.stringify(rawMongoConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawMongoConfig === "string") {
try {
mongoConfig = JSON.parse(rawMongoConfig);
} catch (e) {
console.error("MongoDB配置解析失败:", e);
throw e;
}
} else {
mongoConfig = rawMongoConfig as MongoConfig;
}
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
console.log("原始ClickHouse配置:", JSON.stringify(rawClickhouseConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawClickhouseConfig === "string") {
try {
clickhouseConfig = JSON.parse(rawClickhouseConfig);
} catch (e) {
console.error("ClickHouse配置解析失败:", e);
throw e;
}
} else {
clickhouseConfig = rawClickhouseConfig as ClickHouseConfig;
}
console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig));
console.log("ClickHouse配置解析为:", JSON.stringify(clickhouseConfig));
} catch (error) {
console.error("获取配置失败:", error);
throw error;
}
// 构建MongoDB连接URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 连接MongoDB
const client = new MongoClient();
try {
await client.connect(mongoUrl);
console.log("MongoDB连接成功");
const db = client.database(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace");
// 构建查询条件,获取所有记录
const query: Record<string, unknown> = {
type: 1 // 只同步type为1的记录
};
// 计算总记录数
const totalRecords = await traceCollection.countDocuments(query);
console.log(`找到 ${totalRecords} 条记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
console.log("没有记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
message: "没有记录需要同步"
};
}
// 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查不测试连接");
return true;
}
try {
logWithTimestamp("测试ClickHouse连接...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
},
body: "SELECT 1",
// 设置5秒超时
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse连接测试成功");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`);
return false;
}
};
// 检查记录是否已经存在于ClickHouse中
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
if (records.length === 0) return [];
// 如果跳过ClickHouse检查或强制插入则直接返回所有记录
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`已跳过ClickHouse重复检查准备处理所有 ${records.length} 条记录`);
return records;
}
logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`);
try {
// 提取所有记录的ID
const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询
logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`);
// 构建查询SQL检查记录是否已存在确保添加FORMAT JSON来获取正确的JSON格式响应
const query = `
SELECT link_id
FROM ${clickhouseConfig.clickhouse_database}.link_events
WHERE link_id IN ('${recordIds.join("','")}')
FORMAT JSON
`;
logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`);
// 发送请求到ClickHouse添加10秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`);
}
// 获取响应文本以便记录
const responseText = await response.text();
logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`);
if (!responseText.trim()) {
logWithTimestamp("ClickHouse返回空响应假定没有记录存在");
return records; // 如果响应为空,假设没有记录
}
// 解析结果
let result;
try {
result = JSON.parse(responseText);
} catch (err) {
logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`);
throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`);
}
// 确保result有正确的结构
if (!result.data) {
logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`);
return records; // 如果没有data字段假设没有记录
}
// 提取已存在的记录ID
const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id));
logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`);
if (existingIds.size > 0) {
logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`);
}
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse查询出错: ${error.message}`);
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查将继续处理所有记录");
return records;
} else {
throw error; // 如果没有启用跳过检查,则抛出错误
}
}
};
// 在处理记录前先检查ClickHouse连接
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
logWithTimestamp("⚠️ ClickHouse连接测试失败请启用skip_clickhouse_check=true参数来跳过连接检查");
throw new Error("ClickHouse连接失败无法继续同步");
}
// 处理记录的函数
const processRecords = async (records: TraceRecord[]) => {
if (records.length === 0) return 0;
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`);
// 检查记录是否已存在
let newRecords;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_clickhouse_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理");
return 0;
}
logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`);
// 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => {
// 转换MongoDB记录为ClickHouse格式匹配ClickHouse表结构
return {
// UUID将由ClickHouse自动生成 (event_id)
link_id: record.slugId.toString(),
channel_id: record.label || "",
visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID
session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID
event_type: record.type <= 4 ? record.type : 1, // 确保event_type在枚举范围内
ip_address: record.ip,
country: "", // 这些字段在MongoDB中不存在使用默认值
city: "",
referrer: record.url || "",
utm_source: "",
utm_medium: "",
utm_campaign: "",
user_agent: record.browser + " " + record.browserVersion,
device_type: record.platform || "unknown",
browser: record.browser || "",
os: record.platformOS || "",
time_spent_sec: 0,
is_bounce: true,
is_qr_scan: false,
qr_code_id: "",
conversion_type: 1, // 默认为'visit'
conversion_value: 0,
custom_data: `{"mongo_id":"${record._id.toString()}"}`
};
});
// 生成ClickHouse插入SQL
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events
(link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city,
referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os,
time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data)
VALUES ${clickhouseData.map(record => {
// 确保所有字符串值都是字符串类型,并安全处理替换
const safeReplace = (val: any): string => {
// 确保值是字符串如果是null或undefined则使用空字符串
const str = val === null || val === undefined ? "" : String(val);
// 安全替换单引号
return str.replace(/'/g, "''");
};
return `('${record.link_id}', '${safeReplace(record.channel_id)}', '${record.visitor_id}', '${record.session_id}',
${record.event_type}, '${safeReplace(record.ip_address)}', '', '',
'${safeReplace(record.referrer)}', '', '', '', '${safeReplace(record.user_agent)}', '${safeReplace(record.device_type)}',
'${safeReplace(record.browser)}', '${safeReplace(record.os)}',
0, true, false, '', 1, 0, '${safeReplace(record.custom_data)}')`;
}).join(", ")}
`;
if (insertSQL.length === 0) {
console.log("没有新记录需要插入");
return 0;
}
// 发送请求到ClickHouse添加20秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
try {
logWithTimestamp("发送插入请求到ClickHouse...");
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: insertSQL,
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`);
}
logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`);
return newRecords.length;
} catch (err) {
const error = err as Error;
logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`);
throw error;
}
};
// 批量处理记录
let processedRecords = 0;
let totalBatchRecords = 0;
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
break;
}
// 每批次都输出进度
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await traceCollection.find(
query,
{
allowDiskUse: true,
sort: { createTime: 1 },
skip: page * batch_size,
limit: batch_size
}
).toArray();
if (records.length === 0) {
logWithTimestamp("没有找到更多数据,同步结束");
break;
}
// 找到数据,开始处理
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`);
if (records.length > 1) {
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`);
}
}
const batchSize = await processRecords(records);
processedRecords += records.length;
totalBatchRecords += batchSize;
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
}
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
message: "数据同步完成"
};
} catch (err) {
console.error("同步过程中发生错误:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
// 关闭MongoDB连接
await client.close();
console.log("MongoDB连接已关闭");
}
}

View File

@@ -0,0 +1,532 @@
// 从MongoDB的short表同步数据到ClickHouse的links表
import { getVariable, setVariable } from "npm:windmill-client@1";
import { MongoClient, ObjectId } from "https://deno.land/x/mongo@v0.32.0/mod.ts";
interface MongoConfig {
host: string;
port: string;
db: string;
username: string;
password: string;
}
interface ClickHouseConfig {
clickhouse_host: string;
clickhouse_port: number;
clickhouse_user: string;
clickhouse_password: string;
clickhouse_database: string;
clickhouse_url: string;
}
interface ShortRecord {
_id: ObjectId;
slug: string; // 短链接的slug部分
origin: string; // 原始URL
domain?: string; // 域名
createTime: number; // 创建时间戳
user?: string; // 创建用户
title?: string; // 标题
description?: string; // 描述
tags?: string[]; // 标签
active?: boolean; // 是否活跃
expiresAt?: number; // 过期时间戳
teamId?: string; // 团队ID
projectId?: string; // 项目ID
}
interface SyncState {
last_sync_time: number;
records_synced: number;
last_sync_id?: string;
}
export async function main(
batch_size = 100,
initial_sync = false,
max_records = 999999,
timeout_minutes = 30,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("开始执行MongoDB到ClickHouse的短链接同步任务...");
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
if (skip_clickhouse_check) {
logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式不会检查记录是否已存在");
}
if (force_insert) {
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
}
// 设置超时
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
// 检查是否超时
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`);
return true;
}
return false;
};
// 获取MongoDB和ClickHouse的连接信息
let mongoConfig: MongoConfig;
let clickhouseConfig: ClickHouseConfig;
try {
const rawMongoConfig = await getVariable("f/shorturl_analytics/mongodb");
console.log("原始MongoDB配置:", typeof rawMongoConfig === "string" ? rawMongoConfig : JSON.stringify(rawMongoConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawMongoConfig === "string") {
try {
mongoConfig = JSON.parse(rawMongoConfig);
} catch (e) {
console.error("MongoDB配置解析失败:", e);
throw e;
}
} else {
mongoConfig = rawMongoConfig as MongoConfig;
}
const rawClickhouseConfig = await getVariable("f/shorturl_analytics/clickhouse");
console.log("原始ClickHouse配置:", typeof rawClickhouseConfig === "string" ? rawClickhouseConfig : JSON.stringify(rawClickhouseConfig));
// 尝试解析配置,如果是字符串形式
if (typeof rawClickhouseConfig === "string") {
try {
clickhouseConfig = JSON.parse(rawClickhouseConfig);
} catch (e) {
console.error("ClickHouse配置解析失败:", e);
throw e;
}
} else {
clickhouseConfig = rawClickhouseConfig as ClickHouseConfig;
}
console.log("MongoDB配置解析为:", JSON.stringify(mongoConfig));
console.log("ClickHouse配置解析为:", JSON.stringify(clickhouseConfig));
} catch (error) {
console.error("获取配置失败:", error);
throw error;
}
// 构建MongoDB连接URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 获取上次同步的状态
let syncState: SyncState;
try {
const rawSyncState = await getVariable<string>("f/shorturl_analytics/clickhouse/shorturl_links_sync_state");
try {
syncState = JSON.parse(rawSyncState);
console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`);
} catch (parseError) {
console.error("解析同步状态失败:", parseError);
throw parseError;
}
} catch (_unused_error) {
console.log("未找到同步状态,创建初始同步状态");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 如果强制从头开始同步
if (initial_sync) {
console.log("强制从头开始同步");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 连接MongoDB
const client = new MongoClient();
try {
await client.connect(mongoUrl);
console.log("MongoDB连接成功");
const db = client.database(mongoConfig.db);
const shortCollection = db.collection<ShortRecord>("short");
// 构建查询条件,只查询新的记录
const query: Record<string, unknown> = {};
if (syncState.last_sync_time > 0) {
query.createTime = { $gt: syncState.last_sync_time };
}
if (syncState.last_sync_id) {
// 如果有上次同步的ID则从该ID之后开始查询
query._id = { $gt: new ObjectId(syncState.last_sync_id) };
}
// 计算总记录数
const totalRecords = await shortCollection.countDocuments(query);
console.log(`找到 ${totalRecords} 条新短链接记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
console.log("没有新记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
total_synced: syncState.records_synced,
message: "没有新记录需要同步"
};
}
// 分批处理记录
let processedRecords = 0;
let lastId: string | undefined;
let lastCreateTime = syncState.last_sync_time;
let totalBatchRecords = 0;
// 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查不测试连接");
return true;
}
try {
logWithTimestamp("测试ClickHouse连接...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`,
},
body: "SELECT 1 FORMAT JSON",
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse连接测试成功");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`);
return false;
}
};
// 检查记录是否已经存在于ClickHouse中
const checkExistingRecords = async (records: ShortRecord[]): Promise<ShortRecord[]> => {
if (records.length === 0) return [];
// 如果跳过ClickHouse检查或强制插入则直接返回所有记录
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`已跳过ClickHouse重复检查准备处理所有 ${records.length} 条记录`);
return records;
}
logWithTimestamp(`正在检查 ${records.length} 条短链接记录是否已存在于ClickHouse中...`);
try {
// 提取所有记录的ID
const recordIds = records.map(record => record._id.toString());
logWithTimestamp(`待检查的短链接ID: ${recordIds.join(', ')}`);
// 构建查询SQL检查记录是否已存在
const query = `
SELECT link_id
FROM ${clickhouseConfig.clickhouse_database}.links
WHERE link_id IN ('${recordIds.join("','")}')
FORMAT JSON
`;
logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`);
// 发送请求到ClickHouse
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`);
}
// 获取响应文本
const responseText = await response.text();
logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`);
if (!responseText.trim()) {
logWithTimestamp("ClickHouse返回空响应假定没有记录存在");
return records;
}
// 解析结果
let result;
try {
result = JSON.parse(responseText);
} catch (err) {
logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`);
throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`);
}
// 确保result有正确的结构
if (!result.data) {
logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`);
return records;
}
// 提取已存在的记录ID
const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id));
logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`);
if (existingIds.size > 0) {
logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`);
}
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingIds.has(record._id.toString()));
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse查询出错: ${error.message}`);
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查将继续处理所有记录");
return records;
} else {
throw error;
}
}
};
// 在处理记录前先检查ClickHouse连接
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
logWithTimestamp("⚠️ ClickHouse连接测试失败请启用skip_clickhouse_check=true参数来跳过连接检查");
throw new Error("ClickHouse连接失败无法继续同步");
}
// 处理记录的函数
const processRecords = async (records: ShortRecord[]) => {
if (records.length === 0) return 0;
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条短链接记录...`);
// 检查记录是否已存在
let newRecords;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_clickhouse_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理");
// 更新同步状态,即使没有新增记录
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
return 0;
}
logWithTimestamp(`准备处理 ${newRecords.length} 条新短链接记录...`);
// 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => {
// 转换MongoDB记录为ClickHouse格式匹配ClickHouse表结构
// 处理日期时间移除ISO格式中的Z以使ClickHouse正确解析
const createdAtStr = new Date(record.createTime).toISOString().replace('Z', '');
const expiresAtStr = record.expiresAt ? new Date(record.expiresAt).toISOString().replace('Z', '') : null;
return {
link_id: record._id.toString(), // 使用MongoDB的_id作为link_id
original_url: record.origin || "",
created_at: createdAtStr,
created_by: record.user || "unknown",
title: record.slug, // 使用slug作为title
description: record.description || "",
tags: record.tags || [],
is_active: record.active !== undefined ? record.active : true,
expires_at: expiresAtStr,
team_id: record.teamId || "",
project_id: record.projectId || ""
};
});
// 更新同步状态使用原始records的最后一条以确保进度正确
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`);
// 生成ClickHouse插入SQL
// 注意Array类型需要特殊处理这里将tags作为JSON字符串处理
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.links
(link_id, original_url, created_at, created_by, title, description, tags, is_active, expires_at, team_id, project_id)
VALUES
${clickhouseData.map(record => {
// 处理tags数组
const tagsStr = JSON.stringify(record.tags || []);
// 处理expires_at可能为null的情况
const expiresAt = record.expires_at ? `'${record.expires_at}'` : "NULL";
// 确保所有字段在使用replace前都有默认值
const safeOriginalUrl = (record.original_url || "").replace(/'/g, "''");
const safeCreatedBy = (record.created_by || "unknown").replace(/'/g, "''");
const safeTitle = (record.title || "无标题").replace(/'/g, "''");
const safeDescription = (record.description || "").replace(/'/g, "''");
const safeTeamId = record.team_id || "";
const safeProjectId = record.project_id || "";
return `('${record.link_id}', '${safeOriginalUrl}', '${record.created_at}', '${safeCreatedBy}', '${safeTitle}', '${safeDescription}', ${tagsStr}, ${record.is_active}, ${expiresAt}, '${safeTeamId}', '${safeProjectId}')`;
}).join(", ")}
`;
if (clickhouseData.length === 0) {
console.log("没有新记录需要插入");
return 0;
}
// 发送请求到ClickHouse
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
try {
logWithTimestamp("发送插入请求到ClickHouse...");
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: insertSQL,
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`);
}
logWithTimestamp(`成功插入 ${newRecords.length} 条短链接记录到ClickHouse`);
return newRecords.length;
} catch (err) {
const error = err as Error;
logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`);
throw error;
}
};
// 批量处理记录
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
break;
}
// 每批次都输出进度
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await shortCollection.find(query)
.sort({ createTime: 1, _id: 1 })
.skip(page * batch_size)
.limit(batch_size)
.toArray();
if (records.length === 0) {
logWithTimestamp(`${page+1} 批次没有找到数据,结束处理`);
break;
}
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, Slug=${records[0].slug}, 时间=${new Date(records[0].createTime).toISOString()}`);
if (records.length > 1) {
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, Slug=${records[records.length-1].slug}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`);
}
}
const batchSize = await processRecords(records);
processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在
totalBatchRecords += batchSize; // 只增加实际插入的记录数
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
// 更新查询条件,以便下一批次查询
query.createTime = { $gt: lastCreateTime };
if (lastId) {
query._id = { $gt: new ObjectId(lastId) };
}
logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`);
}
// 更新同步状态
const newSyncState: SyncState = {
last_sync_time: lastCreateTime,
records_synced: syncState.records_synced + totalBatchRecords,
last_sync_id: lastId
};
await setVariable("f/shorturl_analytics/clickhouse/shorturl_links_sync_state", JSON.stringify(newSyncState));
console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`);
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
total_synced: newSyncState.records_synced,
last_sync_time: new Date(newSyncState.last_sync_time).toISOString(),
message: "短链接数据同步完成"
};
} catch (err) {
console.error("同步过程中发生错误:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
// 关闭MongoDB连接
await client.close();
console.log("MongoDB连接已关闭");
}
}