5 Commits

Author SHA1 Message Date
b8d5b0545a sync event data 2025-03-24 22:47:02 +08:00
37aafbe636 fix style 2025-03-24 18:47:07 +08:00
f41a6b6e5b chart 2025-03-24 18:42:08 +08:00
98c5f0f154 fix sync columen 2025-03-24 17:36:31 +08:00
8012fa78c0 only sync mongo event type 1 2025-03-24 17:10:01 +08:00
4 changed files with 700 additions and 830 deletions

View File

@@ -16,6 +16,11 @@ import {
ReferrerItem,
} from "@/app/api/types";
import { TimeGranularity } from "@/lib/analytics";
import {
PieChart, Pie, ResponsiveContainer, Tooltip, Cell, Legend,
BarChart, Bar, XAxis, YAxis, CartesianGrid, LineChart, Line,
LabelList
} from "recharts";
interface LinkDetails {
id: string;
@@ -206,7 +211,28 @@ export default function LinkDetailsPage({
}
const details = await response.json();
setLinkDetails(details);
console.log("Link details:", details); // 添加日志以确认 API 响应
// 将 API 返回的数据映射到组件需要的格式
setLinkDetails({
id: details.link_id || linkId,
name: details.title || "Untitled Link",
shortUrl: details.short_url || window.location.hostname + "/" + details.link_id,
originalUrl: details.original_url || "",
creator: details.created_by || "Unknown",
createdAt: details.created_at || new Date().toISOString(),
visits: details.visits || 0,
visitChange: details.visit_change || 0,
uniqueVisitors: details.unique_visitors || 0,
uniqueVisitorsChange: details.unique_visitors_change || 0,
avgTime: formatTime(details.avg_time_spent || 0),
avgTimeChange: details.avg_time_change || 0,
conversionRate: details.conversion_rate || 0,
conversionChange: details.conversion_change || 0,
status: details.is_active ? "active" : "inactive",
tags: details.tags || [],
});
setLoading(false);
// 获取分析数据
@@ -689,77 +715,78 @@ export default function LinkDetailsPage({
<h3 className="mb-4 font-medium text-md text-foreground">
Device Types
</h3>
<div className="grid grid-cols-1 gap-6 sm:grid-cols-2 lg:grid-cols-4">
<div className="flex flex-col items-center">
<div className="mb-2 text-sm font-medium text-text-secondary">
Mobile
</div>
<div className="text-2xl font-bold text-foreground">
{overviewData.deviceTypes.mobile}
</div>
<div className="mt-1 text-xs text-text-secondary">
{overviewData.totalVisits
? Math.round(
(overviewData.deviceTypes.mobile /
overviewData.totalVisits) *
100
)
: 0}
%
</div>
{/* 饼图显示 */}
<div className="flex flex-col items-center">
<div className="w-full h-64">
<ResponsiveContainer width="100%" height="100%">
<PieChart>
<Pie
data={[
{ name: 'Mobile', value: overviewData.deviceTypes.mobile, fill: "#3498db" },
{ name: 'Desktop', value: overviewData.deviceTypes.desktop, fill: "#2ecc71" },
{ name: 'Tablet', value: overviewData.deviceTypes.tablet, fill: "#f39c12" },
{ name: 'Other', value: overviewData.deviceTypes.other, fill: "#e74c3c" }
].filter(item => item.value > 0)}
cx="50%"
cy="50%"
innerRadius={60}
outerRadius={90}
fill="#8884d8"
dataKey="value"
nameKey="name"
label={({ name, percent }) => `${name}: ${(percent * 100).toFixed(0)}%`}
labelLine={true}
>
{/* 为每种设备类型设置不同的颜色 */}
{[
{ key: "mobile", name: 'Mobile', value: overviewData.deviceTypes.mobile, fill: "#3498db" },
{ key: "desktop", name: 'Desktop', value: overviewData.deviceTypes.desktop, fill: "#2ecc71" },
{ key: "tablet", name: 'Tablet', value: overviewData.deviceTypes.tablet, fill: "#f39c12" },
{ key: "other", name: 'Other', value: overviewData.deviceTypes.other, fill: "#e74c3c" }
]
.filter(item => item.value > 0)
.map(item => (
<Cell key={item.key} fill={item.fill} />
))
}
</Pie>
<Tooltip
formatter={(value) => [`${value} 访问`, '数量']}
separator=": "
/>
<Legend />
</PieChart>
</ResponsiveContainer>
</div>
<div className="flex flex-col items-center">
<div className="mb-2 text-sm font-medium text-text-secondary">
Desktop
<div className="grid grid-cols-4 gap-8 mt-4 text-center">
<div>
<div className="text-sm font-medium text-text-secondary">Mobile</div>
<div className="text-lg font-bold text-foreground">{overviewData.deviceTypes.mobile}</div>
<div className="text-xs text-text-secondary">
{overviewData.totalVisits ? Math.round((overviewData.deviceTypes.mobile / overviewData.totalVisits) * 100) : 0}%
</div>
</div>
<div className="text-2xl font-bold text-foreground">
{overviewData.deviceTypes.desktop}
<div>
<div className="text-sm font-medium text-text-secondary">Desktop</div>
<div className="text-lg font-bold text-foreground">{overviewData.deviceTypes.desktop}</div>
<div className="text-xs text-text-secondary">
{overviewData.totalVisits ? Math.round((overviewData.deviceTypes.desktop / overviewData.totalVisits) * 100) : 0}%
</div>
</div>
<div className="mt-1 text-xs text-text-secondary">
{overviewData.totalVisits
? Math.round(
(overviewData.deviceTypes.desktop /
overviewData.totalVisits) *
100
)
: 0}
%
<div>
<div className="text-sm font-medium text-text-secondary">Tablet</div>
<div className="text-lg font-bold text-foreground">{overviewData.deviceTypes.tablet}</div>
<div className="text-xs text-text-secondary">
{overviewData.totalVisits ? Math.round((overviewData.deviceTypes.tablet / overviewData.totalVisits) * 100) : 0}%
</div>
</div>
</div>
<div className="flex flex-col items-center">
<div className="mb-2 text-sm font-medium text-text-secondary">
Tablet
</div>
<div className="text-2xl font-bold text-foreground">
{overviewData.deviceTypes.tablet}
</div>
<div className="mt-1 text-xs text-text-secondary">
{overviewData.totalVisits
? Math.round(
(overviewData.deviceTypes.tablet /
overviewData.totalVisits) *
100
)
: 0}
%
</div>
</div>
<div className="flex flex-col items-center">
<div className="mb-2 text-sm font-medium text-text-secondary">
Other
</div>
<div className="text-2xl font-bold text-foreground">
{overviewData.deviceTypes.other}
</div>
<div className="mt-1 text-xs text-text-secondary">
{overviewData.totalVisits
? Math.round(
(overviewData.deviceTypes.other /
overviewData.totalVisits) *
100
)
: 0}
%
<div>
<div className="text-sm font-medium text-text-secondary">Other</div>
<div className="text-lg font-bold text-foreground">{overviewData.deviceTypes.other}</div>
<div className="text-xs text-text-secondary">
{overviewData.totalVisits ? Math.round((overviewData.deviceTypes.other / overviewData.totalVisits) * 100) : 0}%
</div>
</div>
</div>
</div>
@@ -781,26 +808,41 @@ export default function LinkDetailsPage({
</div>
</div>
<div className="relative pt-2">
{funnelData.steps.map((step) => (
<div key={step.name} className="mb-4">
<div className="flex justify-between items-center mb-1">
<span className="text-sm font-medium">
{step.name}
</span>
<span className="text-sm text-text-secondary">
{step.value} ({(step.percent || 0).toFixed(1)}
%)
</span>
</div>
<div className="w-full bg-card-border rounded-full h-2.5">
<div
className="bg-accent-blue h-2.5 rounded-full"
style={{ width: `${step.percent || 0}%` }}
></div>
</div>
</div>
))}
<div className="h-80">
<ResponsiveContainer width="100%" height="100%">
<BarChart
layout="vertical"
data={funnelData.steps}
margin={{ top: 20, right: 30, left: 40, bottom: 5 }}
>
<CartesianGrid strokeDasharray="3 3" />
<XAxis type="number" />
<YAxis
dataKey="name"
type="category"
width={100}
tick={{ fontSize: 14 }}
/>
<Tooltip
formatter={(value: number, name: string, props: any) => [
`${value} (${props.payload.percent.toFixed(1)}%)`,
"Value"
]}
/>
<Bar
dataKey="value"
fill="#3498db"
radius={[0, 4, 4, 0]}
>
<LabelList
dataKey="percent"
position="right"
formatter={(value: number) => `${value.toFixed(1)}%`}
style={{ fill: "#666", fontSize: 12 }}
/>
</Bar>
</BarChart>
</ResponsiveContainer>
</div>
</div>
)}
@@ -820,9 +862,9 @@ export default function LinkDetailsPage({
onClick={() =>
updateTimeGranularity(granularity)
}
className={`px-3 py-1 text-xs rounded-md ${
className={`px-4 py-2 text-sm font-medium rounded-md ${
timeGranularity === granularity
? "bg-accent-blue text-white"
? "bg-accent-blue text-black shadow-sm"
: "bg-card-bg text-text-secondary border border-card-border hover:bg-gray-100"
}`}
>
@@ -850,38 +892,51 @@ export default function LinkDetailsPage({
</div>
</div>
{/* 简单趋势表格 */}
<div className="overflow-x-auto">
<table className="min-w-full divide-y divide-card-border">
<thead>
<tr>
<th className="px-4 py-3 text-xs font-medium tracking-wider text-left uppercase text-text-secondary">
Time
</th>
<th className="px-4 py-3 text-xs font-medium tracking-wider text-left uppercase text-text-secondary">
Visits
</th>
<th className="px-4 py-3 text-xs font-medium tracking-wider text-left uppercase text-text-secondary">
Unique Visitors
</th>
</tr>
</thead>
<tbody className="divide-y bg-card-bg divide-card-border">
{trendsData.trends.map((trend, i) => (
<tr key={i}>
<td className="px-4 py-2 text-sm whitespace-nowrap text-foreground">
{trend.timestamp}
</td>
<td className="px-4 py-2 text-sm whitespace-nowrap text-foreground">
{trend.visits}
</td>
<td className="px-4 py-2 text-sm whitespace-nowrap text-foreground">
{trend.uniqueVisitors}
</td>
</tr>
))}
</tbody>
</table>
{/* 图表展示访问趋势 */}
<div className="h-80">
<ResponsiveContainer width="100%" height="100%">
<LineChart
data={trendsData.trends}
margin={{ top: 20, right: 30, left: 20, bottom: 10 }}
>
<CartesianGrid strokeDasharray="3 3" />
<XAxis
dataKey="timestamp"
tick={{ fontSize: 12 }}
interval="preserveStartEnd"
/>
<YAxis
tickFormatter={(value: number) => value.toLocaleString()}
/>
<Tooltip
formatter={(value: number, name: string) => [
value.toLocaleString(),
name === "visits" ? "访问量" : "唯一访客"
]}
/>
<Legend
verticalAlign="top"
height={36}
formatter={(value: string) => value === "visits" ? "访问量" : "唯一访客"}
/>
<Line
type="monotone"
dataKey="visits"
stroke="#3498db"
strokeWidth={2}
activeDot={{ r: 6 }}
name="visits"
/>
<Line
type="monotone"
dataKey="uniqueVisitors"
stroke="#2ecc71"
strokeWidth={2}
dot={{ r: 4 }}
name="uniqueVisitors"
/>
</LineChart>
</ResponsiveContainer>
</div>
</div>
</div>
@@ -895,6 +950,63 @@ export default function LinkDetailsPage({
<h3 className="mb-4 font-medium text-md text-foreground">
Link Performance
</h3>
{/* 添加图表展示 */}
<div className="mb-8">
<div className="flex flex-col md:flex-row gap-6">
{/* 柱状图展示总点击量和独立访客 */}
<div className="h-80 md:w-1/2">
<ResponsiveContainer width="100%" height="100%">
<BarChart
data={[
{ name: 'Total Clicks', value: performanceData.totalClicks },
{ name: 'Unique Visitors', value: performanceData.uniqueVisitors },
{ name: 'Active Days', value: performanceData.activeDays },
{ name: 'Unique Referrers', value: performanceData.uniqueReferrers }
]}
margin={{ top: 20, right: 30, left: 20, bottom: 5 }}
>
<CartesianGrid strokeDasharray="3 3" />
<XAxis dataKey="name" />
<YAxis />
<Tooltip formatter={(value: number) => [value.toLocaleString(), 'Count']} />
<Legend />
<Bar dataKey="value" name="Value" fill="#3498db" radius={[4, 4, 0, 0]}>
<LabelList dataKey="value" position="top" formatter={(value: number) => value.toLocaleString()} />
</Bar>
</BarChart>
</ResponsiveContainer>
</div>
{/* 饼图展示跳出率、转化率等比例数据 */}
<div className="h-80 md:w-1/2">
<ResponsiveContainer width="100%" height="100%">
<PieChart>
<Pie
data={[
{ name: 'Bounce Rate', value: performanceData.bounceRate, fill: '#e74c3c' },
{ name: 'Conversion Rate', value: performanceData.conversionRate, fill: '#2ecc71' },
{ name: 'Non-converting, Non-bounce Visits',
value: 100 - performanceData.bounceRate - performanceData.conversionRate,
fill: '#3498db' }
]}
cx="50%"
cy="50%"
innerRadius={60}
outerRadius={90}
dataKey="value"
nameKey="name"
label={({name, value}: {name: string, value: number}) => `${name}: ${value}%`}
labelLine={true}
/>
<Tooltip formatter={(value: number) => [`${value}%`, 'Percentage']} />
<Legend />
</PieChart>
</ResponsiveContainer>
</div>
</div>
</div>
<div className="grid grid-cols-1 gap-6 sm:grid-cols-2 lg:grid-cols-4">
<div className="flex flex-col">
<div className="mb-1 text-sm font-medium text-text-secondary">
@@ -975,6 +1087,89 @@ export default function LinkDetailsPage({
<h3 className="mb-4 font-medium text-md text-foreground">
Popular Referrers
</h3>
{/* 添加图表展示 */}
<div className="mb-8">
<div className="flex flex-col md:flex-row gap-4">
{/* 条形图展示访问量和独立访客 */}
<div className="h-80 md:w-2/3">
<ResponsiveContainer width="100%" height="100%">
<BarChart
layout="vertical"
data={referrersData.referrers.slice(0, 10)} // 只显示前10个引荐来源
margin={{ top: 20, right: 30, left: 120, bottom: 5 }}
>
<CartesianGrid strokeDasharray="3 3" horizontal={false} />
<XAxis type="number" />
<YAxis
dataKey="source"
type="category"
tick={{ fontSize: 12 }}
width={120}
/>
<Tooltip
formatter={(value: number, name: string) => [
value,
name === "visitCount" ? "访问量" : "独立访客"
]}
/>
<Legend verticalAlign="top" height={36} />
<Bar
name="访问量"
dataKey="visitCount"
fill="#3498db"
radius={[0, 4, 4, 0]}
>
<LabelList
dataKey="percent"
position="right"
formatter={(value: number) => `${value}%`}
style={{ fill: "#666", fontSize: 12 }}
/>
</Bar>
<Bar
name="独立访客"
dataKey="uniqueVisitors"
fill="#2ecc71"
radius={[0, 4, 4, 0]}
/>
</BarChart>
</ResponsiveContainer>
</div>
{/* 饼图展示来源占比 */}
<div className="h-80 md:w-1/3">
<ResponsiveContainer width="100%" height="100%">
<PieChart>
<Pie
data={referrersData.referrers.slice(0, 6)} // 只显示前6个引荐来源
dataKey="visitCount"
nameKey="source"
cx="50%"
cy="50%"
innerRadius={60}
outerRadius={90}
paddingAngle={2}
fill="#8884d8"
label={({source, percent}: {source: string, percent: number}) =>
`${source.substring(0, 10)}...: ${(percent * 100).toFixed(0)}%`
}
labelLine={false}
>
{referrersData.referrers.slice(0, 6).map((entry, index) => (
<Cell key={`cell-${index}`} fill={['#3498db', '#2ecc71', '#9b59b6', '#f39c12', '#e74c3c', '#1abc9c'][index % 6]} />
))}
</Pie>
<Tooltip />
<Legend
formatter={(value: string) => value.length > 20 ? value.substring(0, 20) + '...' : value}
/>
</PieChart>
</ResponsiveContainer>
</div>
</div>
</div>
<div className="overflow-x-auto">
<table className="min-w-full divide-y divide-card-border">
<thead>
@@ -1031,6 +1226,36 @@ export default function LinkDetailsPage({
<h3 className="mb-4 font-medium text-md text-foreground">
Device Types
</h3>
{/* 添加设备类型饼图 */}
<div className="mb-8">
<div className="h-80">
<ResponsiveContainer width="100%" height="100%">
<PieChart>
<Pie
data={deviceData.deviceTypes}
dataKey="count"
nameKey="name"
cx="50%"
cy="50%"
innerRadius={60}
outerRadius={90}
fill="#8884d8"
label={({name, percent}: {name: string, percent: number}) =>
`${name}: ${(percent * 100).toFixed(0)}%`
}
>
{deviceData.deviceTypes.map((entry, index) => (
<Cell key={`device-type-cell-${index}`} fill={['#3498db', '#2ecc71', '#9b59b6', '#f39c12', '#e74c3c'][index % 5]} />
))}
</Pie>
<Tooltip formatter={(value: number) => [`${value} 次访问`, '访问量']} />
<Legend />
</PieChart>
</ResponsiveContainer>
</div>
</div>
<div className="grid grid-cols-1 gap-6 sm:grid-cols-2 lg:grid-cols-3">
{deviceData.deviceTypes.map(
(device: DeviceItem, i: number) => (
@@ -1054,6 +1279,44 @@ export default function LinkDetailsPage({
<h3 className="mb-4 font-medium text-md text-foreground">
Device Brands
</h3>
{/* 添加设备品牌横向条形图 */}
<div className="mb-8">
<div className="h-80">
<ResponsiveContainer width="100%" height="100%">
<BarChart
layout="vertical"
data={deviceData.deviceBrands.slice(0, 10)}
margin={{ top: 5, right: 30, left: 100, bottom: 5 }}
>
<CartesianGrid strokeDasharray="3 3" horizontal={false} />
<XAxis type="number" />
<YAxis
dataKey="name"
type="category"
tick={{ fontSize: 12 }}
width={100}
/>
<Tooltip
formatter={(value: number) => [`${value} 次访问`, '访问量']}
/>
<Bar
dataKey="count"
name="访问量"
fill="#3498db"
radius={[0, 4, 4, 0]}
>
<LabelList
dataKey="percent"
position="right"
formatter={(value: number) => `${value}%`}
/>
</Bar>
</BarChart>
</ResponsiveContainer>
</div>
</div>
<div className="overflow-x-auto">
<table className="min-w-full divide-y divide-card-border">
<thead>
@@ -1098,6 +1361,81 @@ export default function LinkDetailsPage({
<h3 className="mb-4 font-medium text-md text-foreground">
Platform Distribution
</h3>
{/* 添加图表展示 */}
<div className="mb-8">
<div className="flex flex-col md:flex-row gap-6">
{/* 操作系统分布饼图 */}
<div className="h-80 md:w-1/2">
<h4 className="mb-3 text-sm font-medium text-text-secondary text-center">
Operating Systems
</h4>
<ResponsiveContainer width="100%" height="90%">
<PieChart>
<Pie
data={platformData.platforms.slice(0, 6)}
dataKey="count"
nameKey="name"
cx="50%"
cy="50%"
innerRadius={60}
outerRadius={90}
paddingAngle={2}
fill="#8884d8"
label={({name, percent}: {name: string, percent: number}) =>
`${name}: ${(percent * 100).toFixed(0)}%`
}
labelLine={true}
>
{platformData.platforms.slice(0, 6).map((entry, index) => (
<Cell key={`os-cell-${index}`} fill={['#3498db', '#2ecc71', '#9b59b6', '#f39c12', '#e74c3c', '#1abc9c'][index % 6]} />
))}
</Pie>
<Tooltip formatter={(value: number, name: string) => [`${value} 次访问`, name]} />
<Legend formatter={(value: string) => value.length > 25 ? value.substring(0, 25) + '...' : value} />
</PieChart>
</ResponsiveContainer>
</div>
{/* 浏览器分布条形图 */}
<div className="h-80 md:w-1/2">
<h4 className="mb-3 text-sm font-medium text-text-secondary text-center">
Browsers
</h4>
<ResponsiveContainer width="100%" height="90%">
<BarChart
data={platformData.browsers.slice(0, 8)}
layout="vertical"
margin={{ top: 5, right: 30, left: 80, bottom: 5 }}
>
<CartesianGrid strokeDasharray="3 3" horizontal={false} />
<XAxis type="number" />
<YAxis
type="category"
dataKey="name"
tick={{ fontSize: 12 }}
width={80}
/>
<Tooltip formatter={(value: number) => [`${value} 次访问`, '数量']} />
<Legend />
<Bar
dataKey="count"
name="访问量"
fill="#3498db"
radius={[0, 4, 4, 0]}
>
<LabelList
dataKey="percent"
position="right"
formatter={(value: number) => `${value}%`}
/>
</Bar>
</BarChart>
</ResponsiveContainer>
</div>
</div>
</div>
<div className="grid grid-cols-1 gap-6 sm:grid-cols-2">
<div>
<h4 className="mb-3 text-sm font-medium text-text-secondary">
@@ -1225,6 +1563,42 @@ export default function LinkDetailsPage({
<h3 className="mb-4 font-medium text-md text-foreground">
Scan Locations
</h3>
{/* 添加扫描位置饼图 */}
<div className="mb-8">
<div className="h-80">
<ResponsiveContainer width="100%" height="100%">
<PieChart>
<Pie
data={qrCodeData.locations.slice(0, 8)}
dataKey="scanCount"
nameKey="city"
cx="50%"
cy="50%"
innerRadius={60}
outerRadius={90}
paddingAngle={2}
fill="#8884d8"
label={({city, country, percent}: {city: string, country: string, percent: number}) =>
`${city}: ${(percent * 100).toFixed(0)}%`
}
>
{qrCodeData.locations.slice(0, 8).map((entry, index) => (
<Cell key={`location-cell-${index}`} fill={['#3498db', '#2ecc71', '#9b59b6', '#f39c12', '#e74c3c', '#1abc9c', '#34495e', '#16a085'][index % 8]} />
))}
</Pie>
<Tooltip formatter={(value: number, name: string) => [`${value} 次扫描`, name]} />
<Legend
formatter={(value: string) => {
const location = qrCodeData.locations.find(loc => loc.city === value);
return location ? `${location.city}, ${location.country}` : value;
}}
/>
</PieChart>
</ResponsiveContainer>
</div>
</div>
<div className="overflow-x-auto">
<table className="min-w-full divide-y divide-card-border">
<thead>
@@ -1263,40 +1637,46 @@ export default function LinkDetailsPage({
<h3 className="mb-4 font-medium text-md text-foreground">
Scan Time Distribution
</h3>
<div className="grid grid-cols-1 gap-6">
<div className="overflow-x-auto">
<table className="min-w-full divide-y divide-card-border">
<thead>
<tr>
<th className="px-4 py-3 text-xs font-medium tracking-wider text-left uppercase text-text-secondary">
Hour
</th>
<th className="px-4 py-3 text-xs font-medium tracking-wider text-left uppercase text-text-secondary">
Scans
</th>
<th className="px-4 py-3 text-xs font-medium tracking-wider text-left uppercase text-text-secondary">
Percentage
</th>
</tr>
</thead>
<tbody className="divide-y bg-card-bg divide-card-border">
{qrCodeData.hourlyDistribution.map((hour) => (
<tr key={hour.hour}>
<td className="px-4 py-2 text-sm whitespace-nowrap text-foreground">
{hour.hour}:00 - {hour.hour}:59
</td>
<td className="px-4 py-2 text-sm whitespace-nowrap text-foreground">
{hour.scanCount}
</td>
<td className="px-4 py-2 text-sm whitespace-nowrap text-foreground">
{hour.percent.toFixed(1)}%
</td>
</tr>
))}
</tbody>
</table>
{/* 添加扫描时间分布柱状图 */}
<div className="mb-8">
<div className="h-80">
<ResponsiveContainer width="100%" height="100%">
<BarChart
data={qrCodeData.hourlyDistribution}
margin={{ top: 20, right: 30, left: 20, bottom: 5 }}
>
<CartesianGrid strokeDasharray="3 3" />
<XAxis
dataKey="hour"
label={{ value: 'Hour of Day', position: 'insideBottom', offset: -5 }}
/>
<YAxis
label={{ value: 'Scan Count', angle: -90, position: 'insideLeft' }}
/>
<Tooltip
formatter={(value: number) => [`${value} 次扫描`, '扫描次数']}
labelFormatter={(hour) => `${hour}:00 - ${hour}:59`}
/>
<Bar
dataKey="scanCount"
name="扫描次数"
fill="#3498db"
radius={[4, 4, 0, 0]}
>
<LabelList
dataKey="percent"
position="top"
formatter={(value: number) => `${value.toFixed(1)}%`}
/>
</Bar>
</BarChart>
</ResponsiveContainer>
</div>
</div>
<div className="grid grid-cols-1 gap-6">
</div>
</div>
</div>
)}

View File

@@ -0,0 +1,122 @@
-- 修改设备类型字段从枚举类型更改为字符串类型
-- 先删除依赖于link_events表的物化视图
DROP TABLE IF EXISTS limq.platform_distribution;
DROP TABLE IF EXISTS limq.link_hourly_patterns;
DROP TABLE IF EXISTS limq.link_daily_stats;
DROP TABLE IF EXISTS limq.team_daily_stats;
DROP TABLE IF EXISTS limq.project_daily_stats;
-- 修改link_events表的device_type字段
ALTER TABLE
limq.link_events
MODIFY
COLUMN device_type String;
-- 重新创建物化视图
-- 每日链接汇总视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.link_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, link_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
link_id,
count() AS total_clicks,
uniqExact(visitor_id) AS unique_visitors,
uniqExact(session_id) AS unique_sessions,
sum(time_spent_sec) AS total_time_spent,
avg(time_spent_sec) AS avg_time_spent,
countIf(is_bounce) AS bounce_count,
countIf(event_type = 'conversion') AS conversion_count,
uniqExact(referrer) AS unique_referrers,
countIf(device_type = 'mobile') AS mobile_count,
countIf(device_type = 'tablet') AS tablet_count,
countIf(device_type = 'desktop') AS desktop_count,
countIf(is_qr_scan) AS qr_scan_count,
sum(conversion_value) AS total_conversion_value
FROM
limq.link_events
GROUP BY
date,
link_id;
-- 每小时访问模式视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.link_hourly_patterns ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, hour, link_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
toHour(event_time) AS hour,
link_id,
count() AS visits,
uniqExact(visitor_id) AS unique_visitors
FROM
limq.link_events
GROUP BY
date,
hour,
link_id;
-- 平台分布视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.platform_distribution ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, utm_source, device_type) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
utm_source,
device_type,
count() AS visits,
uniqExact(visitor_id) AS unique_visitors
FROM
limq.link_events
WHERE
utm_source != ''
GROUP BY
date,
utm_source,
device_type;
-- 团队每日统计视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.team_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, team_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
l.team_id AS team_id,
count() AS total_clicks,
uniqExact(e.visitor_id) AS unique_visitors,
countIf(e.event_type = 'conversion') AS conversion_count,
uniqExact(e.link_id) AS links_used,
countIf(e.is_qr_scan) AS qr_scan_count
FROM
limq.link_events e
JOIN limq.links l ON e.link_id = l.link_id
WHERE
l.team_id != ''
GROUP BY
date,
l.team_id;
-- 项目每日统计视图
CREATE MATERIALIZED VIEW IF NOT EXISTS limq.project_daily_stats ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, project_id) SETTINGS index_granularity = 8192 AS
SELECT
toDate(event_time) AS date,
l.project_id AS project_id,
count() AS total_clicks,
uniqExact(e.visitor_id) AS unique_visitors,
countIf(e.event_type = 'conversion') AS conversion_count,
uniqExact(e.link_id) AS links_used,
countIf(e.is_qr_scan) AS qr_scan_count
FROM
limq.link_events e
JOIN limq.links l ON e.link_id = l.link_id
WHERE
l.project_id != ''
GROUP BY
date,
l.project_id;

View File

@@ -1,575 +0,0 @@
// 从MongoDB的trace表同步数据到ClickHouse的link_events表
import { MongoClient, ObjectId } from 'mongodb';
import fs from 'fs';
import path from 'path';
// 硬编码配置信息
const mongoConfig = {
host: "10.0.1.10",
port: "27017",
db: "main",
username: "",
password: ""
};
const clickhouseConfig = {
clickhouse_host: "10.0.1.60",
clickhouse_port: 8123,
clickhouse_user: "admin",
clickhouse_password: "your_secure_password",
clickhouse_database: "limq",
clickhouse_url: "http://10.0.1.60:8123"
};
// 状态文件存储路径
const STATE_FILE_PATH = path.join(__dirname, 'shorturl_event_sync_state.json');
interface TraceRecord {
_id: ObjectId;
slugId: ObjectId;
label: string | null;
ip: string;
type: number;
platform: string;
platformOS: string;
browser: string;
browserVersion: string;
url: string;
createTime: number;
}
interface SyncState {
last_sync_time: number;
records_synced: number;
last_sync_id?: string;
}
// 替代 Windmill 的变量存储函数
async function getVariable(key: string): Promise<string> {
if (key === "f/shorturl_analytics/clickhouse/shorturl_sync_state") {
try {
if (fs.existsSync(STATE_FILE_PATH)) {
return fs.readFileSync(STATE_FILE_PATH, 'utf8');
}
} catch (error) {
console.error("读取状态文件失败:", error);
}
return JSON.stringify({
last_sync_time: 0,
records_synced: 0
});
}
throw new Error(`未知的变量键: ${key}`);
}
// 替代 Windmill 的变量设置函数
async function setVariable(key: string, value: string): Promise<void> {
if (key === "f/shorturl_analytics/clickhouse/shorturl_sync_state") {
try {
fs.writeFileSync(STATE_FILE_PATH, value, 'utf8');
} catch (error) {
console.error("保存状态文件失败:", error);
throw error;
}
} else {
throw new Error(`未知的变量键: ${key}`);
}
}
export async function main(
batch_size = 1000,
initial_sync = false,
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
console.log(`[${now.toISOString()}] ${message}`);
};
logWithTimestamp("开始执行MongoDB到ClickHouse的同步任务");
logWithTimestamp(`批处理大小: ${batch_size}, 最大记录数: ${max_records}, 超时时间: ${timeout_minutes}分钟`);
if (skip_clickhouse_check) {
logWithTimestamp("⚠️ 警告: 已启用跳过ClickHouse检查模式不会检查记录是否已存在");
}
if (force_insert) {
logWithTimestamp("⚠️ 警告: 已启用强制插入模式,将尝试插入所有记录");
}
// 设置超时
const startTime = Date.now();
const timeoutMs = timeout_minutes * 60 * 1000;
// 检查是否超时
const checkTimeout = () => {
if (Date.now() - startTime > timeoutMs) {
console.log(`运行时间超过${timeout_minutes}分钟,暂停执行`);
return true;
}
return false;
};
console.log("MongoDB配置:", JSON.stringify(mongoConfig));
console.log("ClickHouse配置:", JSON.stringify(clickhouseConfig));
// 构建MongoDB连接URL
let mongoUrl = "mongodb://";
if (mongoConfig.username && mongoConfig.password) {
mongoUrl += `${mongoConfig.username}:${mongoConfig.password}@`;
}
mongoUrl += `${mongoConfig.host}:${mongoConfig.port}/${mongoConfig.db}`;
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 获取上次同步的状态
let syncState: SyncState;
try {
const rawSyncState = await getVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state");
try {
syncState = JSON.parse(rawSyncState);
console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`);
} catch (parseError) {
console.error("解析同步状态失败:", parseError);
throw parseError;
}
} catch (error) {
console.log("未找到同步状态,创建初始同步状态");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 如果强制从头开始同步
if (initial_sync) {
console.log("强制从头开始同步");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 连接MongoDB
const client = new MongoClient(mongoUrl);
try {
await client.connect();
console.log("MongoDB连接成功");
const db = client.db(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace");
// 构建查询条件,只查询新的记录
const query: Record<string, unknown> = {};
if (syncState.last_sync_time > 0) {
query.createTime = { $gt: syncState.last_sync_time };
}
if (syncState.last_sync_id) {
// 如果有上次同步的ID则从该ID之后开始查询
// 注意这需要MongoDB中createTime相同的记录按_id排序
query._id = { $gt: new ObjectId(syncState.last_sync_id) };
}
// 计算总记录数
const totalRecords = await traceCollection.countDocuments(query);
console.log(`找到 ${totalRecords} 条新记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
console.log("没有新记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
total_synced: syncState.records_synced,
message: "没有新记录需要同步"
};
}
// 分批处理记录
let processedRecords = 0;
let lastId: string | undefined;
let lastCreateTime = syncState.last_sync_time;
let totalBatchRecords = 0;
// 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查不测试连接");
return true;
}
try {
logWithTimestamp("测试ClickHouse连接...");
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}`,
},
body: "SELECT 1",
// 设置5秒超时
signal: AbortSignal.timeout(5000)
});
if (response.ok) {
logWithTimestamp("ClickHouse连接测试成功");
return true;
} else {
const errorText = await response.text();
logWithTimestamp(`ClickHouse连接测试失败: ${response.status} ${errorText}`);
return false;
}
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse连接测试失败: ${error.message}`);
return false;
}
};
// 检查记录是否已经存在于ClickHouse中
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
if (records.length === 0) return [];
// 如果跳过ClickHouse检查或强制插入则直接返回所有记录
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`已跳过ClickHouse重复检查准备处理所有 ${records.length} 条记录`);
return records;
}
logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`);
try {
// 提取所有记录的ID
const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询
logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`);
// 构建查询SQL检查记录是否已存在确保添加FORMAT JSON来获取正确的JSON格式响应
const query = `
SELECT link_id
FROM ${clickhouseConfig.clickhouse_database}.link_events
WHERE link_id IN ('${recordIds.join("','")}')
FORMAT JSON
`;
logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`);
// 发送请求到ClickHouse添加10秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`);
}
// 获取响应文本以便记录
const responseText = await response.text();
logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`);
if (!responseText.trim()) {
logWithTimestamp("ClickHouse返回空响应假定没有记录存在");
return records; // 如果响应为空,假设没有记录
}
// 解析结果
let result;
try {
result = JSON.parse(responseText);
} catch (err) {
logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`);
throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`);
}
// 确保result有正确的结构
if (!result.data) {
logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`);
return records; // 如果没有data字段假设没有记录
}
// 提取已存在的记录ID
const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id));
logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`);
if (existingIds.size > 0) {
logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`);
}
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse查询出错: ${error.message}`);
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查将继续处理所有记录");
return records;
} else {
throw error; // 如果没有启用跳过检查,则抛出错误
}
}
};
// 在处理记录前先检查ClickHouse连接
const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) {
logWithTimestamp("⚠️ ClickHouse连接测试失败请启用skip_clickhouse_check=true参数来跳过连接检查");
throw new Error("ClickHouse连接失败无法继续同步");
}
// 处理记录的函数
const processRecords = async (records: TraceRecord[]) => {
if (records.length === 0) return 0;
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`);
// 检查记录是否已存在
let newRecords;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_clickhouse_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理");
// 更新同步状态,即使没有新增记录
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
return 0;
}
logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`);
// 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => {
// 转换MongoDB记录为ClickHouse格式匹配ClickHouse表结构
return {
// UUID将由ClickHouse自动生成 (event_id)
link_id: record.slugId.toString(),
channel_id: record.label || "",
visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID
session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID
event_type: record.type <= 4 ? record.type : 1, // 确保event_type在枚举范围内
ip_address: record.ip,
country: "", // 这些字段在MongoDB中不存在使用默认值
city: "",
referrer: record.url || "",
utm_source: "",
utm_medium: "",
utm_campaign: "",
user_agent: record.browser + " " + record.browserVersion,
device_type: record.platform === "mobile" ? 1 : (record.platform === "tablet" ? 2 : 3),
browser: record.browser || "",
os: record.platformOS || "",
time_spent_sec: 0,
is_bounce: true,
is_qr_scan: false,
qr_code_id: "",
conversion_type: 1, // 默认为'visit'
conversion_value: 0,
custom_data: `{"mongo_id":"${record._id.toString()}"}`
};
});
// 更新同步状态使用原始records的最后一条以确保进度正确
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`);
// 生成ClickHouse插入SQL
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events
(link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city,
referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os,
time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data)
VALUES ${clickhouseData.map(record =>
`('${record.link_id}', '${record.channel_id.replace(/'/g, "''")}', '${record.visitor_id}', '${record.session_id}',
${record.event_type}, '${record.ip_address}', '', '',
'${record.referrer.replace(/'/g, "''")}', '', '', '', '${record.user_agent.replace(/'/g, "''")}', ${record.device_type},
'${record.browser.replace(/'/g, "''")}', '${record.os.replace(/'/g, "''")}',
0, true, false, '', 1, 0, '${record.custom_data}')`
).join(", ")}
`;
if (insertSQL.length === 0) {
console.log("没有新记录需要插入");
return 0;
}
// 发送请求到ClickHouse添加20秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
try {
logWithTimestamp("发送插入请求到ClickHouse...");
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${Buffer.from(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`).toString('base64')}`
},
body: insertSQL,
signal: AbortSignal.timeout(20000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse插入错误: ${response.status} ${errorText}`);
}
logWithTimestamp(`成功插入 ${newRecords.length} 条记录到ClickHouse`);
return newRecords.length;
} catch (err) {
const error = err as Error;
logWithTimestamp(`向ClickHouse插入数据失败: ${error.message}`);
throw error;
}
};
// 批量处理记录
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
logWithTimestamp(`已处理 ${processedRecords}/${recordsToProcess} 条记录,因超时暂停执行`);
break;
}
// 每批次都输出进度
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await traceCollection.find(query)
.sort({ createTime: 1, _id: 1 })
.skip(page * batch_size)
.limit(batch_size)
.toArray();
if (records.length === 0) {
logWithTimestamp(`${page+1} 批次没有找到数据,结束处理`);
break;
}
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
logWithTimestamp(`批次 ${page+1} 第一条记录: ID=${records[0]._id}, 时间=${new Date(records[0].createTime).toISOString()}`);
if (records.length > 1) {
logWithTimestamp(`批次 ${page+1} 最后一条记录: ID=${records[records.length-1]._id}, 时间=${new Date(records[records.length-1].createTime).toISOString()}`);
}
}
const batchSize = await processRecords(records);
processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在
totalBatchRecords += batchSize; // 只增加实际插入的记录数
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
// 更新查询条件,以便下一批次查询
query.createTime = { $gt: lastCreateTime };
if (lastId) {
query._id = { $gt: new ObjectId(lastId) };
}
logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`);
}
// 更新同步状态
const newSyncState: SyncState = {
last_sync_time: lastCreateTime,
records_synced: syncState.records_synced + totalBatchRecords,
last_sync_id: lastId
};
await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState));
console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`);
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
total_synced: newSyncState.records_synced,
last_sync_time: new Date(newSyncState.last_sync_time).toISOString(),
message: "数据同步完成"
};
} catch (err) {
console.error("同步过程中发生错误:", err);
return {
success: false,
error: err instanceof Error ? err.message : String(err),
stack: err instanceof Error ? err.stack : undefined
};
} finally {
// 关闭MongoDB连接
await client.close();
console.log("MongoDB连接已关闭");
}
}
// 如果直接执行此脚本
if (require.main === module) {
// 解析命令行参数
const args = process.argv.slice(2);
const params: Record<string, any> = {
batch_size: 1000,
initial_sync: false,
max_records: 9999999,
timeout_minutes: 60,
skip_clickhouse_check: false,
force_insert: false
};
// 简单的参数解析
for (let i = 0; i < args.length; i += 2) {
if (args[i].startsWith('--') && i + 1 < args.length) {
const key = args[i].substring(2);
let value: any = args[i + 1];
// 类型转换
if (value === 'true') value = true;
else if (value === 'false') value = false;
else if (!isNaN(Number(value))) value = Number(value);
params[key] = value;
}
}
console.log('启动同步任务,参数:', params);
main(
params.batch_size,
params.initial_sync,
params.max_records,
params.timeout_minutes,
params.skip_clickhouse_check,
params.force_insert
).then(result => {
console.log('同步任务完成:', result);
process.exit(result.success ? 0 : 1);
}).catch(err => {
console.error('同步任务失败:', err);
process.exit(1);
});
}

View File

@@ -40,12 +40,11 @@ interface SyncState {
}
export async function main(
batch_size = 1000, // 减小批处理大小为5
initial_sync = false,
max_records = 9999999, // 只同步10条记录用于测试
timeout_minutes = 60, // 减少超时时间为5分钟
skip_clickhouse_check = false, // 是否跳过ClickHouse重复检查
force_insert = false // 强制插入所有记录,不检查是否已存在
batch_size = 1000,
max_records = 9999999,
timeout_minutes = 60,
skip_clickhouse_check = false,
force_insert = false
) {
const logWithTimestamp = (message: string) => {
const now = new Date();
@@ -125,34 +124,6 @@ export async function main(
console.log(`MongoDB连接URL: ${mongoUrl.replace(/:[^:]*@/, ":****@")}`);
// 获取上次同步的状态
let syncState: SyncState;
try {
const rawSyncState = await getVariable<string>("f/shorturl_analytics/clickhouse/shorturl_event_sync_state");
try {
syncState = JSON.parse(rawSyncState);
console.log(`获取同步状态成功: 上次同步时间 ${new Date(syncState.last_sync_time).toISOString()}`);
} catch (parseError) {
console.error("解析同步状态失败:", parseError);
throw parseError;
}
} catch (_unused_error) {
console.log("未找到同步状态,创建初始同步状态");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 如果强制从头开始同步
if (initial_sync) {
console.log("强制从头开始同步");
syncState = {
last_sync_time: 0,
records_synced: 0,
};
}
// 连接MongoDB
const client = new MongoClient();
try {
@@ -162,43 +133,28 @@ export async function main(
const db = client.database(mongoConfig.db);
const traceCollection = db.collection<TraceRecord>("trace");
// 构建查询条件,只查询新的记录
const query: Record<string, unknown> = {};
if (syncState.last_sync_time > 0) {
query.createTime = { $gt: syncState.last_sync_time };
}
if (syncState.last_sync_id) {
// 如果有上次同步的ID则从该ID之后开始查询
// 注意这需要MongoDB中createTime相同的记录按_id排序
query._id = { $gt: new ObjectId(syncState.last_sync_id) };
}
// 构建查询条件,获取所有记录
const query: Record<string, unknown> = {
type: 1 // 只同步type为1的记录
};
// 计算总记录数
const totalRecords = await traceCollection.countDocuments(query);
console.log(`找到 ${totalRecords}记录需要同步`);
console.log(`找到 ${totalRecords} 条记录需要同步`);
// 限制此次处理的记录数量
const recordsToProcess = Math.min(totalRecords, max_records);
console.log(`本次将处理 ${recordsToProcess} 条记录`);
if (totalRecords === 0) {
console.log("没有记录需要同步,任务完成");
console.log("没有记录需要同步,任务完成");
return {
success: true,
records_synced: 0,
total_synced: syncState.records_synced,
message: "没有新记录需要同步"
message: "没有记录需要同步"
};
}
// 分批处理记录
let processedRecords = 0;
let lastId: string | undefined;
let lastCreateTime = syncState.last_sync_time;
let totalBatchRecords = 0;
// 检查ClickHouse连接状态
const checkClickHouseConnection = async (): Promise<boolean> => {
if (skip_clickhouse_check) {
@@ -358,10 +314,6 @@ export async function main(
if (newRecords.length === 0) {
logWithTimestamp("所有记录都已存在,跳过处理");
// 更新同步状态,即使没有新增记录
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
return 0;
}
@@ -385,7 +337,7 @@ export async function main(
utm_medium: "",
utm_campaign: "",
user_agent: record.browser + " " + record.browserVersion,
device_type: record.platform === "mobile" ? 1 : (record.platform === "tablet" ? 2 : 3),
device_type: record.platform || "unknown",
browser: record.browser || "",
os: record.platformOS || "",
time_spent_sec: 0,
@@ -398,25 +350,27 @@ export async function main(
};
});
// 更新同步状态使用原始records的最后一条以确保进度正确
const lastRecord = records[records.length - 1];
lastId = lastRecord._id.toString();
lastCreateTime = lastRecord.createTime;
logWithTimestamp(`更新同步位置到: ID=${lastId}, 时间=${new Date(lastCreateTime).toISOString()}`);
// 生成ClickHouse插入SQL
const insertSQL = `
INSERT INTO ${clickhouseConfig.clickhouse_database}.link_events
(link_id, channel_id, visitor_id, session_id, event_type, ip_address, country, city,
referrer, utm_source, utm_medium, utm_campaign, user_agent, device_type, browser, os,
time_spent_sec, is_bounce, is_qr_scan, qr_code_id, conversion_type, conversion_value, custom_data)
VALUES ${clickhouseData.map(record =>
`('${record.link_id}', '${record.channel_id.replace(/'/g, "''")}', '${record.visitor_id}', '${record.session_id}',
${record.event_type}, '${record.ip_address}', '', '',
'${record.referrer.replace(/'/g, "''")}', '', '', '', '${record.user_agent.replace(/'/g, "''")}', ${record.device_type},
'${record.browser.replace(/'/g, "''")}', '${record.os.replace(/'/g, "''")}',
0, true, false, '', 1, 0, '${record.custom_data}')`
).join(", ")}
VALUES ${clickhouseData.map(record => {
// 确保所有字符串值都是字符串类型,并安全处理替换
const safeReplace = (val: any): string => {
// 确保值是字符串如果是null或undefined则使用空字符串
const str = val === null || val === undefined ? "" : String(val);
// 安全替换单引号
return str.replace(/'/g, "''");
};
return `('${record.link_id}', '${safeReplace(record.channel_id)}', '${record.visitor_id}', '${record.session_id}',
${record.event_type}, '${safeReplace(record.ip_address)}', '', '',
'${safeReplace(record.referrer)}', '', '', '', '${safeReplace(record.user_agent)}', '${safeReplace(record.device_type)}',
'${safeReplace(record.browser)}', '${safeReplace(record.os)}',
0, true, false, '', 1, 0, '${safeReplace(record.custom_data)}')`;
}).join(", ")}
`;
if (insertSQL.length === 0) {
@@ -453,6 +407,9 @@ export async function main(
};
// 批量处理记录
let processedRecords = 0;
let totalBatchRecords = 0;
for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时
if (checkTimeout()) {
@@ -464,17 +421,22 @@ export async function main(
logWithTimestamp(`开始处理第 ${page+1} 批次,已完成 ${processedRecords}/${recordsToProcess} 条记录 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
logWithTimestamp(`正在从MongoDB获取第 ${page+1} 批次数据...`);
const records = await traceCollection.find(query)
.sort({ createTime: 1, _id: 1 })
.skip(page * batch_size)
.limit(batch_size)
.toArray();
const records = await traceCollection.find(
query,
{
allowDiskUse: true,
sort: { createTime: 1 },
skip: page * batch_size,
limit: batch_size
}
).toArray();
if (records.length === 0) {
logWithTimestamp(`${page+1} 批次没有找到数据,结束处理`);
logWithTimestamp("没有找到更多数据,同步结束");
break;
}
// 找到数据,开始处理
logWithTimestamp(`获取到 ${records.length} 条记录,开始处理...`);
// 输出当前批次的部分数据信息
if (records.length > 0) {
@@ -485,35 +447,16 @@ export async function main(
}
const batchSize = await processRecords(records);
processedRecords += records.length; // 总是增加处理的记录数,即使有些记录已存在
totalBatchRecords += batchSize; // 只增加实际插入的记录数
processedRecords += records.length;
totalBatchRecords += batchSize;
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
// 更新查询条件,以便下一批次查询
query.createTime = { $gt: lastCreateTime };
if (lastId) {
query._id = { $gt: new ObjectId(lastId) };
}
logWithTimestamp(`更新查询条件: 创建时间 > ${new Date(lastCreateTime).toISOString()}, ID > ${lastId || 'none'}`);
}
// 更新同步状态
const newSyncState: SyncState = {
last_sync_time: lastCreateTime,
records_synced: syncState.records_synced + totalBatchRecords,
last_sync_id: lastId
};
await setVariable("f/shorturl_analytics/clickhouse/shorturl_sync_state", JSON.stringify(newSyncState));
console.log(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 总同步记录数 ${newSyncState.records_synced}`);
return {
success: true,
records_processed: processedRecords,
records_synced: totalBatchRecords,
total_synced: newSyncState.records_synced,
last_sync_time: new Date(newSyncState.last_sync_time).toISOString(),
message: "数据同步完成"
};
} catch (err) {