Compare commits

3 Commits

8 changed files with 507 additions and 264 deletions

View File

@@ -44,6 +44,11 @@ export default function Header() {
Short Links Short Links
</Link> </Link>
</li> </li>
<li>
<Link href="/create-shorturl" className="text-sm text-gray-700 hover:text-blue-500">
Create Short URL
</Link>
</li>
</ul> </ul>
</nav> </nav>
)} )}

View File

@@ -1,65 +0,0 @@
'use client';
import Link from 'next/link';
export default function Navbar() {
return (
<header className="w-full py-4 border-b border-card-border bg-background">
<div className="container flex items-center justify-between px-4 mx-auto">
<div className="flex items-center space-x-4">
<Link href="/" className="flex items-center space-x-2">
<svg
className="w-6 h-6 text-accent-blue"
xmlns="http://www.w3.org/2000/svg"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
>
<path d="M10 13a5 5 0 0 0 7.54.54l3-3a5 5 0 0 0-7.07-7.07l-1.72 1.71"></path>
<path d="M14 11a5 5 0 0 0-7.54-.54l-3 3a5 5 0 0 0 7.07 7.07l1.71-1.71"></path>
</svg>
<span className="text-xl font-bold text-foreground">ShortURL</span>
</Link>
<nav className="hidden space-x-4 md:flex">
<Link
href="/links"
className="text-sm text-foreground hover:text-accent-blue transition-colors"
>
Links
</Link>
<Link
href="/analytics"
className="text-sm text-foreground hover:text-accent-blue transition-colors"
>
Analytics
</Link>
</nav>
</div>
<div className="flex items-center space-x-3">
<button className="p-2 text-sm text-foreground rounded-md gradient-border">
Upgrade
</button>
<button className="p-2 text-sm text-foreground hover:text-accent-blue">
<svg
className="w-5 h-5"
xmlns="http://www.w3.org/2000/svg"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
strokeWidth="2"
strokeLinecap="round"
strokeLinejoin="round"
>
<circle cx="12" cy="12" r="10"></circle>
<circle cx="12" cy="10" r="3"></circle>
<path d="M7 20.662V19a2 2 0 0 1 2-2h6a2 2 0 0 1 2 2v1.662"></path>
</svg>
</button>
</div>
</div>
</header>
);
}

View File

@@ -7,6 +7,7 @@ import { getSupabaseClient } from '../../utils/supabase';
import { AuthChangeEvent, Session } from '@supabase/supabase-js'; import { AuthChangeEvent, Session } from '@supabase/supabase-js';
import { Loader2, X, Check } from 'lucide-react'; import { Loader2, X, Check } from 'lucide-react';
import { cn } from '@/lib/utils'; import { cn } from '@/lib/utils';
import { limqRequest } from '@/lib/api';
type Team = Database['limq']['Tables']['teams']['Row']; type Team = Database['limq']['Tables']['teams']['Row'];
@@ -70,6 +71,14 @@ export function TeamSelector({
try { try {
const supabase = getSupabaseClient(); const supabase = getSupabaseClient();
// 尝试创建默认团队和项目(如果用户还没有)
try {
const response = await limqRequest('team/create-default', 'POST');
console.log('Default team creation response:', response);
} catch (teamError) {
console.error('Error creating default team:', teamError);
}
const { data: memberships, error: membershipError } = await supabase const { data: memberships, error: membershipError } = await supabase
.from('team_membership') .from('team_membership')
.select('team_id') .select('team_id')

View File

@@ -0,0 +1,327 @@
'use client';
import { useState, useEffect } from 'react';
import { useRouter } from 'next/navigation';
import { useAuth } from '@/lib/auth';
import { ProtectedRoute } from '@/lib/auth';
import { limqRequest } from '@/lib/api';
interface ShortUrlData {
originalUrl: string;
customSlug?: string;
title: string;
description?: string;
tags?: string[];
}
export default function CreateShortUrlPage() {
return (
<ProtectedRoute>
<CreateShortUrlForm />
</ProtectedRoute>
);
}
function CreateShortUrlForm() {
const router = useRouter();
const { user } = useAuth();
const [formData, setFormData] = useState<ShortUrlData>({
originalUrl: '',
customSlug: '',
title: '',
description: '',
tags: [],
});
const [tagInput, setTagInput] = useState('');
const [isSubmitting, setIsSubmitting] = useState(false);
const [error, setError] = useState<string | null>(null);
const [success, setSuccess] = useState(false);
// 使用 useEffect 在加载时添加用户信息到表单数据中
useEffect(() => {
if (user) {
console.log('当前用户:', user.email);
// 可以在这里添加用户相关数据到表单中
}
}, [user]);
const handleChange = (e: React.ChangeEvent<HTMLInputElement | HTMLTextAreaElement>) => {
const { name, value } = e.target;
setFormData(prev => ({
...prev,
[name]: value
}));
};
const handleTagKeyDown = (e: React.KeyboardEvent<HTMLInputElement>) => {
if (e.key === 'Enter' && tagInput.trim()) {
e.preventDefault();
addTag();
}
};
const addTag = () => {
if (tagInput.trim() && !formData.tags?.includes(tagInput.trim())) {
setFormData(prev => ({
...prev,
tags: [...(prev.tags || []), tagInput.trim()]
}));
setTagInput('');
}
};
const removeTag = (tagToRemove: string) => {
setFormData(prev => ({
...prev,
tags: prev.tags?.filter(tag => tag !== tagToRemove)
}));
};
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
setIsSubmitting(true);
setError(null);
try {
// 验证必填字段
if (!formData.originalUrl) {
throw new Error('原始 URL 是必填项');
}
if (!formData.title) {
throw new Error('标题是必填项');
}
// 按照API要求构建请求数据
const requestData = {
type: "shorturl",
attributes: {
// 可以添加任何额外属性但attributes不能为空
icon: ""
},
shortUrl: {
url: formData.originalUrl,
slug: formData.customSlug || undefined,
title: formData.title,
name: formData.title,
description: formData.description || ""
},
// 如果有team或project ID也可以添加
// teamId: "your-team-id",
// projectId: "your-project-id",
tagIds: formData.tags && formData.tags.length > 0 ? formData.tags : undefined
};
// 调用 API 创建 shorturl 资源
const response = await limqRequest('resource/shorturl', 'POST', requestData as unknown as Record<string, unknown>);
console.log('创建成功:', response);
setSuccess(true);
// 2秒后跳转到链接列表页面
setTimeout(() => {
router.push('/links');
}, 2000);
} catch (err) {
console.error('创建短链接失败:', err);
setError(err instanceof Error ? err.message : '创建短链接失败,请稍后重试');
} finally {
setIsSubmitting(false);
}
};
return (
<div className="container mx-auto px-4 py-8 max-w-3xl">
<div className="bg-white rounded-lg shadow-md overflow-hidden">
<div className="border-b border-gray-200 bg-blue-50 px-6 py-4">
<h1 className="text-xl font-medium text-gray-900">Create Short URL</h1>
<p className="mt-1 text-sm text-gray-600">
Create a new short URL resource for tracking and analytics
</p>
</div>
{error && (
<div className="bg-red-50 border-l-4 border-red-500 p-4 m-6">
<div className="flex">
<div className="flex-shrink-0">
<svg className="h-5 w-5 text-red-500" fill="currentColor" viewBox="0 0 20 20">
<path fillRule="evenodd" d="M10 18a8 8 0 100-16 8 8 0 000 16zM8.707 7.293a1 1 0 00-1.414 1.414L8.586 10l-1.293 1.293a1 1 0 101.414 1.414L10 11.414l1.293 1.293a1 1 0 001.414-1.414L11.414 10l1.293-1.293a1 1 0 00-1.414-1.414L10 8.586 8.707 7.293z" clipRule="evenodd" />
</svg>
</div>
<div className="ml-3">
<p className="text-sm text-red-700">{error}</p>
</div>
</div>
</div>
)}
{success && (
<div className="bg-green-50 border-l-4 border-green-500 p-4 m-6">
<div className="flex">
<div className="flex-shrink-0">
<svg className="h-5 w-5 text-green-500" fill="currentColor" viewBox="0 0 20 20">
<path fillRule="evenodd" d="M10 18a8 8 0 100-16 8 8 0 000 16zm3.707-9.293a1 1 0 00-1.414-1.414L9 10.586 7.707 9.293a1 1 0 00-1.414 1.414l2 2a1 1 0 001.414 0l4-4z" clipRule="evenodd" />
</svg>
</div>
<div className="ml-3">
<p className="text-sm text-green-700">
...
</p>
</div>
</div>
</div>
)}
<form onSubmit={handleSubmit} className="p-6 space-y-6">
{/* 标题 */}
<div>
<label htmlFor="title" className="block text-sm font-medium text-gray-700">
<span className="text-red-500">*</span>
</label>
<input
type="text"
id="title"
name="title"
value={formData.title}
onChange={handleChange}
placeholder="例如:产品发布活动"
className="mt-1 block w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500 sm:text-sm"
required
/>
</div>
{/* 原始 URL */}
<div>
<label htmlFor="originalUrl" className="block text-sm font-medium text-gray-700">
URL <span className="text-red-500">*</span>
</label>
<input
type="url"
id="originalUrl"
name="originalUrl"
value={formData.originalUrl}
onChange={handleChange}
placeholder="https://example.com/your-long-url"
className="mt-1 block w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500 sm:text-sm"
required
/>
</div>
{/* 自定义短链接 */}
<div>
<label htmlFor="customSlug" className="block text-sm font-medium text-gray-700">
<span className="text-gray-500">()</span>
</label>
<div className="flex mt-1 rounded-md shadow-sm">
<span className="inline-flex items-center px-3 py-2 text-sm text-gray-500 border border-r-0 border-gray-300 rounded-l-md bg-gray-50">
shorturl.com/
</span>
<input
type="text"
id="customSlug"
name="customSlug"
value={formData.customSlug}
onChange={handleChange}
placeholder="custom-slug"
className="flex-1 block w-full min-w-0 px-3 py-2 border border-gray-300 rounded-none rounded-r-md focus:outline-none focus:ring-blue-500 focus:border-blue-500 sm:text-sm"
/>
</div>
<p className="mt-1 text-xs text-gray-500">
</p>
</div>
{/* 描述 */}
<div>
<label htmlFor="description" className="block text-sm font-medium text-gray-700">
<span className="text-gray-500">()</span>
</label>
<textarea
id="description"
name="description"
value={formData.description}
onChange={handleChange}
rows={3}
placeholder="对此链接的简短描述"
className="mt-1 block w-full px-3 py-2 border border-gray-300 rounded-md shadow-sm focus:outline-none focus:ring-blue-500 focus:border-blue-500 sm:text-sm"
/>
</div>
{/* 标签 */}
<div>
<label htmlFor="tagInput" className="block text-sm font-medium text-gray-700">
<span className="text-gray-500">()</span>
</label>
<div className="flex mt-1 rounded-md shadow-sm">
<input
type="text"
id="tagInput"
value={tagInput}
onChange={(e) => setTagInput(e.target.value)}
onKeyDown={handleTagKeyDown}
placeholder="添加标签并按 Enter"
className="flex-1 block w-full min-w-0 px-3 py-2 border border-gray-300 rounded-l-md focus:outline-none focus:ring-blue-500 focus:border-blue-500 sm:text-sm"
/>
<button
type="button"
onClick={addTag}
className="inline-flex items-center px-3 py-2 text-sm font-medium text-white border border-transparent rounded-r-md shadow-sm bg-blue-600 hover:bg-blue-700 focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-blue-500"
>
</button>
</div>
{formData.tags && formData.tags.length > 0 && (
<div className="flex flex-wrap gap-2 mt-2">
{formData.tags.map(tag => (
<span key={tag} className="inline-flex items-center px-2 py-0.5 text-xs font-medium bg-blue-100 rounded-full text-blue-800">
{tag}
<button
type="button"
onClick={() => removeTag(tag)}
className="flex-shrink-0 ml-1 text-blue-500 rounded-full hover:text-blue-700 focus:outline-none"
>
<span className="sr-only"> {tag}</span>
<svg className="w-3 h-3" fill="currentColor" viewBox="0 0 20 20" xmlns="http://www.w3.org/2000/svg">
<path fillRule="evenodd" d="M10 18a8 8 0 100-16 8 8 0 000 16zM8.707 7.293a1 1 0 00-1.414 1.414L8.586 10l-1.293 1.293a1 1 0 101.414 1.414L10 11.414l1.293 1.293a1 1 0 001.414-1.414L11.414 10l1.293-1.293a1 1 0 00-1.414-1.414L10 8.586 8.707 7.293z" clipRule="evenodd" />
</svg>
</button>
</span>
))}
</div>
)}
</div>
{/* 提交按钮 */}
<div className="flex justify-end pt-4 border-t border-gray-200">
<button
type="button"
onClick={() => router.back()}
className="px-4 py-2 text-sm font-medium text-gray-700 bg-white border border-gray-300 rounded-md shadow-sm hover:bg-gray-50 focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-blue-500 mr-3"
>
</button>
<button
type="submit"
disabled={isSubmitting}
className="inline-flex justify-center px-4 py-2 text-sm font-medium text-white bg-blue-600 border border-transparent rounded-md shadow-sm hover:bg-blue-700 focus:outline-none focus:ring-2 focus:ring-offset-2 focus:ring-blue-500"
>
{isSubmitting ? (
<>
<svg className="w-5 h-5 mr-2 -ml-1 animate-spin" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24">
<circle className="opacity-25" cx="12" cy="12" r="10" stroke="currentColor" strokeWidth="4"></circle>
<path className="opacity-75" fill="currentColor" d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4zm2 5.291A7.962 7.962 0 014 12H0c0 3.042 1.135 5.824 3 7.938l3-2.647z"></path>
</svg>
...
</>
) : '创建短链接'}
</button>
</div>
</form>
</div>
</div>
);
}

49
lib/api.ts Normal file
View File

@@ -0,0 +1,49 @@
import supabase from './supabase';
// Define response type for API
export interface ApiResponse<T = unknown> {
success: boolean;
data?: T;
error?: string;
message?: string;
}
// Common function for authenticated API requests to LIMQ
export async function limqRequest<T = unknown>(
endpoint: string,
method: 'GET' | 'POST' | 'PUT' | 'DELETE' = 'GET',
data?: Record<string, unknown>
): Promise<ApiResponse<T>> {
// Get current session
const { data: { session } } = await supabase.auth.getSession();
if (!session) {
throw new Error('No active session. User must be authenticated.');
}
const baseUrl = process.env.NEXT_PUBLIC_LIMQ_API || 'http://localhost:3005';
const url = `${baseUrl}${endpoint.startsWith('/') ? endpoint : '/' + endpoint}`;
const options: RequestInit = {
method,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${session.access_token}`
}
};
if (data && (method === 'POST' || method === 'PUT')) {
options.body = JSON.stringify(data);
}
const response = await fetch(url, options);
if (!response.ok) {
const errorData = await response.json().catch(() => null);
throw new Error(
errorData?.error || `Request failed with status ${response.status}`
);
}
return response.json();
}

View File

@@ -4,6 +4,7 @@ import React, { createContext, useContext, useEffect, useState } from 'react';
import { useRouter } from 'next/navigation'; import { useRouter } from 'next/navigation';
import { Session, User } from '@supabase/supabase-js'; import { Session, User } from '@supabase/supabase-js';
import supabase from './supabase'; import supabase from './supabase';
import { limqRequest } from './api';
// 定义用户类型 // 定义用户类型
export type AuthUser = User | null; export type AuthUser = User | null;
@@ -13,21 +14,16 @@ export type AuthContextType = {
user: AuthUser; user: AuthUser;
session: Session | null; session: Session | null;
isLoading: boolean; isLoading: boolean;
signIn: (email: string, password: string) => Promise<{ error?: any }>; signIn: (email: string, password: string) => Promise<{ error?: unknown }>;
signInWithGoogle: () => Promise<{ error?: any }>; signInWithGoogle: () => Promise<{ error?: unknown }>;
signInWithGitHub: () => Promise<{ error?: any }>; signInWithGitHub: () => Promise<{ error?: unknown }>;
signUp: (email: string, password: string) => Promise<void>; signUp: (email: string, password: string) => Promise<void>;
signOut: () => Promise<void>; signOut: () => Promise<void>;
autoRegisterTestUser: () => Promise<void>; // 添加自动注册测试用户函数
}; };
// 创建验证上下文 // 创建验证上下文
const AuthContext = createContext<AuthContextType | undefined>(undefined); const AuthContext = createContext<AuthContextType | undefined>(undefined);
// 测试账户常量 - 使用已验证的账户
const TEST_EMAIL = 'vitalitymailg@gmail.com';
const TEST_PASSWORD = 'password123';
// 验证提供者组件 // 验证提供者组件
export const AuthProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => { export const AuthProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => {
const [user, setUser] = useState<AuthUser>(null); const [user, setUser] = useState<AuthUser>(null);
@@ -90,7 +86,8 @@ export const AuthProvider: React.FC<{ children: React.ReactNode }> = ({ children
setSession(data.session); setSession(data.session);
setUser(data.user); setUser(data.user);
router.push('/dashboard');
router.push('/analytics');
return {}; return {};
} catch (error) { } catch (error) {
console.error('登录过程出错:', error); console.error('登录过程出错:', error);
@@ -201,35 +198,6 @@ export const AuthProvider: React.FC<{ children: React.ReactNode }> = ({ children
} }
}; };
// 自动注册测试用户函数
const autoRegisterTestUser = async () => {
setIsLoading(true);
try {
console.log('正在使用测试账户登录:', TEST_EMAIL);
// 使用测试账户直接登录
const { data, error } = await supabase.auth.signInWithPassword({
email: TEST_EMAIL,
password: TEST_PASSWORD,
});
if (error) {
console.error('测试账户登录失败:', error);
throw error;
}
console.log('测试账户登录成功!');
setSession(data.session);
setUser(data.user);
router.push('/dashboard');
} catch (error) {
console.error('测试账户登录出错:', error);
throw error;
} finally {
setIsLoading(false);
}
};
const contextValue: AuthContextType = { const contextValue: AuthContextType = {
user, user,
session, session,
@@ -239,7 +207,6 @@ export const AuthProvider: React.FC<{ children: React.ReactNode }> = ({ children
signInWithGitHub, signInWithGitHub,
signUp, signUp,
signOut, signOut,
autoRegisterTestUser,
}; };
return ( return (

View File

@@ -185,7 +185,7 @@ export async function main(
max_records = 9999999, max_records = 9999999,
timeout_minutes = 60, timeout_minutes = 60,
skip_clickhouse_check = false, skip_clickhouse_check = false,
force_insert = false, force_insert = true,
database_override = "shorturl_analytics", // 添加数据库名称参数默认为shorturl_analytics database_override = "shorturl_analytics", // 添加数据库名称参数默认为shorturl_analytics
reset_sync_state = false, // 添加参数用于重置同步状态 reset_sync_state = false, // 添加参数用于重置同步状态
debug_utm = false // 添加参数控制UTM调试日志输出 debug_utm = false // 添加参数控制UTM调试日志输出
@@ -391,104 +391,6 @@ export async function main(
} }
}; };
// 检查记录是否已经存在于ClickHouse中
const checkExistingRecords = async (records: TraceRecord[]): Promise<TraceRecord[]> => {
if (records.length === 0) return [];
// 如果跳过ClickHouse检查或强制插入则直接返回所有记录
if (skip_clickhouse_check || force_insert) {
logWithTimestamp(`已跳过ClickHouse重复检查准备处理所有 ${records.length} 条记录`);
return records;
}
logWithTimestamp(`正在检查 ${records.length} 条记录是否已存在于ClickHouse中...`);
try {
// 验证数据库名称
if (!clickhouseConfig.clickhouse_database || clickhouseConfig.clickhouse_database === "undefined") {
throw new Error("数据库名称未定义或无效,请检查配置");
}
// 提取所有记录的ID
const recordIds = records.map(record => record.slugId.toString()); // 使用slugId作为link_id查询
logWithTimestamp(`待检查的记录ID: ${recordIds.join(', ')}`);
// 构建查询SQL检查记录是否已存在确保添加FORMAT JSON来获取正确的JSON格式响应
const query = `
SELECT link_id, visitor_id
FROM ${clickhouseConfig.clickhouse_database}.events
WHERE link_id IN ('${recordIds.join("','")}')
FORMAT JSON
`;
logWithTimestamp(`执行ClickHouse查询: ${query.replace(/\n\s*/g, ' ')}`);
// 发送请求到ClickHouse添加10秒超时
const clickhouseUrl = `${clickhouseConfig.clickhouse_url}`;
const response = await fetch(clickhouseUrl, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": `Basic ${btoa(`${clickhouseConfig.clickhouse_user}:${clickhouseConfig.clickhouse_password}`)}`
},
body: query,
signal: AbortSignal.timeout(10000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`ClickHouse查询错误: ${response.status} ${errorText}`);
}
// 获取响应文本以便记录
const responseText = await response.text();
logWithTimestamp(`ClickHouse查询响应: ${responseText.slice(0, 200)}${responseText.length > 200 ? '...' : ''}`);
if (!responseText.trim()) {
logWithTimestamp("ClickHouse返回空响应假定没有记录存在");
return records; // 如果响应为空,假设没有记录
}
// 解析结果
let result;
try {
result = JSON.parse(responseText);
} catch (err) {
logWithTimestamp(`ClickHouse响应不是有效的JSON: ${responseText}`);
throw new Error(`解析ClickHouse响应失败: ${(err as Error).message}`);
}
// 确保result有正确的结构
if (!result.data) {
logWithTimestamp(`ClickHouse响应缺少data字段: ${JSON.stringify(result)}`);
return records; // 如果没有data字段假设没有记录
}
// 提取已存在的记录ID
const existingIds = new Set(result.data.map((row: { link_id: string }) => row.link_id));
logWithTimestamp(`检测到 ${existingIds.size} 条记录已存在于ClickHouse中`);
if (existingIds.size > 0) {
logWithTimestamp(`已存在的记录ID: ${Array.from(existingIds).join(', ')}`);
}
// 过滤出不存在的记录
const newRecords = records.filter(record => !existingIds.has(record.slugId.toString())); // 使用slugId匹配link_id
logWithTimestamp(`过滤后剩余 ${newRecords.length} 条新记录需要插入`);
return newRecords;
} catch (err) {
const error = err as Error;
logWithTimestamp(`ClickHouse查询出错: ${error.message}`);
if (skip_clickhouse_check) {
logWithTimestamp("已启用跳过ClickHouse检查将继续处理所有记录");
return records;
} else {
throw error; // 如果没有启用跳过检查,则抛出错误
}
}
};
// 在处理记录前先检查ClickHouse连接 // 在处理记录前先检查ClickHouse连接
const clickhouseConnected = await checkClickHouseConnection(); const clickhouseConnected = await checkClickHouseConnection();
if (!clickhouseConnected && !skip_clickhouse_check) { if (!clickhouseConnected && !skip_clickhouse_check) {
@@ -502,27 +404,10 @@ export async function main(
logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`); logWithTimestamp(`开始处理批次数据,共 ${records.length} 条记录...`);
// 检查记录是否已存在 // 强制使用所有记录,不检查重复
let newRecords; const newRecords = records;
try {
newRecords = await checkExistingRecords(records);
} catch (err) {
const error = err as Error;
logWithTimestamp(`检查记录是否存在时出错: ${error.message}`);
if (!skip_clickhouse_check && !force_insert) {
throw error;
}
// 如果跳过检查或强制插入,则使用所有记录
logWithTimestamp("将使用所有记录进行处理");
newRecords = records;
}
if (newRecords.length === 0) { logWithTimestamp(`准备处理 ${newRecords.length} 条记录...`);
logWithTimestamp("所有记录都已存在,跳过处理");
return 0;
}
logWithTimestamp(`准备处理 ${newRecords.length} 条新记录...`);
// 获取链接信息 - 新增代码 // 获取链接信息 - 新增代码
const slugIds = newRecords.map(record => record.slugId); const slugIds = newRecords.map(record => record.slugId);
@@ -533,7 +418,7 @@ export async function main(
// 创建映射用于快速查找 - 新增代码 // 创建映射用于快速查找 - 新增代码
const shortLinksMap = new Map(shortLinks.map((link: ShortRecord) => [link._id.toString(), link])); const shortLinksMap = new Map(shortLinks.map((link: ShortRecord) => [link._id.toString(), link]));
logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息`); logWithTimestamp(`获取到 ${shortLinks.length} 条短链接信息${newRecords.length - shortLinks.length} 条数据将使用占位符`);
// 准备ClickHouse插入数据 // 准备ClickHouse插入数据
const clickhouseData = newRecords.map(record => { const clickhouseData = newRecords.map(record => {
@@ -570,11 +455,11 @@ export async function main(
event_type: record.type === 1 ? "visit" : "custom", event_type: record.type === 1 ? "visit" : "custom",
event_attributes: JSON.stringify(eventAttributes), event_attributes: JSON.stringify(eventAttributes),
link_id: record.slugId.toString(), link_id: record.slugId.toString(),
link_slug: shortLink?.slug || "", link_slug: shortLink?.slug || "unknown_slug", // 使用占位符
link_label: record.label || "", link_label: record.label || "",
link_title: shortLink?.title || "", link_title: shortLink?.title || "unknown_title", // 使用占位符
link_original_url: shortLink?.origin || "", link_original_url: shortLink?.origin || "https://unknown.url", // 使用占位符
link_attributes: JSON.stringify({ domain: shortLink?.domain || null }), link_attributes: JSON.stringify({ domain: shortLink?.domain || "unknown_domain" }), // 使用占位符
link_created_at: shortLink?.createTime link_created_at: shortLink?.createTime
? new Date(shortLink.createTime).toISOString().replace('T', ' ').replace('Z', '') ? new Date(shortLink.createTime).toISOString().replace('T', ' ').replace('Z', '')
: eventTime.toISOString().replace('T', ' ').replace('Z', ''), : eventTime.toISOString().replace('T', ' ').replace('Z', ''),
@@ -582,34 +467,34 @@ export async function main(
? new Date(shortLink.expiresAt).toISOString().replace('T', ' ').replace('Z', '') ? new Date(shortLink.expiresAt).toISOString().replace('T', ' ').replace('Z', '')
: null, : null,
link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]", link_tags: shortLink?.tags ? JSON.stringify(shortLink.tags) : "[]",
user_id: shortLink?.user || "", user_id: shortLink?.user || "unknown_user", // 使用占位符
user_name: "", user_name: "unknown_user", // 使用占位符
user_email: "", user_email: "",
user_attributes: "{}", user_attributes: "{}",
team_id: shortLink?.teamId || "", team_id: shortLink?.teamId || "unknown_team", // 使用占位符
team_name: "", team_name: "unknown_team", // 使用占位符
team_attributes: "{}", team_attributes: "{}",
project_id: shortLink?.projectId || "", project_id: shortLink?.projectId || "unknown_project", // 使用占位符
project_name: "", project_name: "unknown_project", // 使用占位符
project_attributes: "{}", project_attributes: "{}",
qr_code_id: "", qr_code_id: "",
qr_code_name: "", qr_code_name: "",
qr_code_attributes: "{}", qr_code_attributes: "{}",
visitor_id: record._id.toString(), // 使用MongoDB ID作为访客ID visitor_id: record._id.toString(),
session_id: record._id.toString() + "-" + record.createTime, // 创建一个唯一会话ID session_id: record._id.toString() + "-" + record.createTime,
ip_address: record.ip, ip_address: record.ip || "0.0.0.0", // 使用占位符
country: "", // 这些字段在MongoDB中不存在使用默认值 country: "",
city: "", city: "",
device_type: record.platform || "unknown", device_type: record.platform || "unknown",
browser: record.browser || "", browser: record.browser || "unknown", // 使用占位符
os: record.platformOS || "", os: record.platformOS || "unknown", // 使用占位符
user_agent: record.browser + " " + record.browserVersion, user_agent: (record.browser || "unknown") + " " + (record.browserVersion || "unknown"), // 使用占位符
referrer: record.url || "", referrer: record.url || "",
utm_source: utmParams.utm_source, utm_source: utmParams.utm_source || "",
utm_medium: utmParams.utm_medium, utm_medium: utmParams.utm_medium || "",
utm_campaign: utmParams.utm_campaign, utm_campaign: utmParams.utm_campaign || "",
utm_term: utmParams.utm_term, utm_term: utmParams.utm_term || "",
utm_content: utmParams.utm_content, utm_content: utmParams.utm_content || "",
time_spent_sec: 0, time_spent_sec: 0,
is_bounce: true, is_bounce: true,
is_qr_scan: false, is_qr_scan: false,
@@ -693,7 +578,6 @@ export async function main(
let processedRecords = 0; let processedRecords = 0;
let totalBatchRecords = 0; let totalBatchRecords = 0;
let lastSyncTime = 0; let lastSyncTime = 0;
let lastSyncId = "";
for (let page = 0; processedRecords < recordsToProcess; page++) { for (let page = 0; processedRecords < recordsToProcess; page++) {
// 检查超时 // 检查超时
@@ -750,7 +634,6 @@ export async function main(
if (records.length > 0) { if (records.length > 0) {
const lastRecord = records[records.length - 1]; const lastRecord = records[records.length - 1];
lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime); lastSyncTime = Math.max(lastSyncTime, lastRecord.createTime);
lastSyncId = lastRecord._id.toString();
} }
logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`); logWithTimestamp(`${page+1} 批次处理完成。已处理 ${processedRecords}/${recordsToProcess} 条记录,实际插入 ${totalBatchRecords} 条 (${Math.round(processedRecords/recordsToProcess*100)}%)`);
@@ -758,11 +641,10 @@ export async function main(
// 更新同步状态 // 更新同步状态
if (processedRecords > 0 && lastSyncTime > 0) { if (processedRecords > 0 && lastSyncTime > 0) {
// 创建新的同步状态 // 创建新的同步状态,简化对象结构
const newSyncState: SyncState = { const newSyncState: SyncState = {
last_sync_time: lastSyncTime, last_sync_time: lastSyncTime,
records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + totalBatchRecords, records_synced: (lastSyncState ? lastSyncState.records_synced : 0) + processedRecords, // 使用处理的总记录数,而不是实际插入数
last_sync_id: lastSyncId
}; };
try { try {
@@ -771,7 +653,8 @@ export async function main(
logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`); logWithTimestamp(`同步状态已更新: 最后同步时间 ${new Date(newSyncState.last_sync_time).toISOString()}, 累计同步记录数 ${newSyncState.records_synced}`);
} catch (err) { } catch (err) {
const error = err as Error; const error = err as Error;
logWithTimestamp(`更新同步状态失败: ${error.message}`); logWithTimestamp(`更新同步状态失败: ${error.message},将继续执行`);
// 不抛出错误,继续执行
} }
} }

View File

@@ -4,7 +4,7 @@
// 描述: 此脚本从PostgreSQL数据库获取所有shorturl类型的资源及其关联数据并同步到ClickHouse // 描述: 此脚本从PostgreSQL数据库获取所有shorturl类型的资源及其关联数据并同步到ClickHouse
import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts"; import { Pool } from "https://deno.land/x/postgres@v0.17.0/mod.ts";
import { getResource, getVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts"; import { getResource, getVariable, setVariable } from "https://deno.land/x/windmill@v1.183.0/mod.ts";
// 资源属性接口 // 资源属性接口
interface ResourceAttributes { interface ResourceAttributes {
@@ -37,6 +37,15 @@ interface PgConfig {
[key: string]: unknown; [key: string]: unknown;
} }
// 上次同步状态接口
interface SyncState {
lastSyncTime: string;
lastRunTime: string;
}
// 状态变量名称
const STATE_VARIABLE_PATH = "f/shorturl_analytics/shorturl_sync_state";
// Windmill函数定义 // Windmill函数定义
export async function main( export async function main(
/** PostgreSQL和ClickHouse同步脚本 */ /** PostgreSQL和ClickHouse同步脚本 */
@@ -47,9 +56,11 @@ export async function main(
includeDeleted?: boolean; includeDeleted?: boolean;
/** 是否执行实际写入操作 */ /** 是否执行实际写入操作 */
dryRun?: boolean; dryRun?: boolean;
/** 开始时间ISO格式*/ /** 是否强制全量同步 */
forceFullSync?: boolean;
/** 手动指定开始时间ISO格式- 会覆盖自动增量设置 */
startTime?: string; startTime?: string;
/** 结束时间ISO格式*/ /** 手动指定结束时间ISO格式*/
endTime?: string; endTime?: string;
} }
) { ) {
@@ -57,8 +68,41 @@ export async function main(
const limit = params.limit || 500; const limit = params.limit || 500;
const includeDeleted = params.includeDeleted || false; const includeDeleted = params.includeDeleted || false;
const dryRun = params.dryRun || false; const dryRun = params.dryRun || false;
const startTime = params.startTime ? new Date(params.startTime) : undefined; const forceFullSync = params.forceFullSync || false;
const endTime = params.endTime ? new Date(params.endTime) : undefined;
// 获取当前时间作为本次运行时间
const currentRunTime = new Date().toISOString();
// 初始化同步状态
let syncState: SyncState;
let startTime: Date | undefined;
const endTime: Date | undefined = params.endTime ? new Date(params.endTime) : new Date();
// 如果强制全量同步或手动指定了开始时间,则使用指定的开始时间
if (forceFullSync || params.startTime) {
startTime = params.startTime ? new Date(params.startTime) : undefined;
console.log(`使用${params.startTime ? '手动指定' : '全量同步'} - 开始时间: ${startTime ? startTime.toISOString() : '无限制'}`);
}
// 否则尝试获取上次同步时间作为增量同步的开始时间点
else {
try {
// 获取上次同步状态
const stateStr = await getVariable(STATE_VARIABLE_PATH);
if (stateStr) {
syncState = JSON.parse(stateStr);
console.log(`获取到上次同步状态: 同步时间=${syncState.lastSyncTime}, 运行时间=${syncState.lastRunTime}`);
// 使用上次运行时间作为本次的开始时间 (减去1分钟防止边界问题)
const lastRunTime = new Date(syncState.lastRunTime);
lastRunTime.setMinutes(lastRunTime.getMinutes() - 1);
startTime = lastRunTime;
} else {
console.log("未找到上次同步状态,将执行全量同步");
}
} catch (error: unknown) {
console.log(`获取同步状态出错: ${error instanceof Error ? error.message : String(error)},将执行全量同步`);
}
}
console.log(`开始同步PostgreSQL shorturl数据到ClickHouse`); console.log(`开始同步PostgreSQL shorturl数据到ClickHouse`);
console.log(`参数: limit=${limit}, includeDeleted=${includeDeleted}, dryRun=${dryRun}`); console.log(`参数: limit=${limit}, includeDeleted=${includeDeleted}, dryRun=${dryRun}`);
@@ -67,7 +111,7 @@ export async function main(
// 获取数据库配置 // 获取数据库配置
console.log("获取PostgreSQL数据库配置..."); console.log("获取PostgreSQL数据库配置...");
const pgConfig = await getResource('f/limq/postgresql') as PgConfig; const pgConfig = await getResource('f/limq/production_supabase') as PgConfig;
console.log(`数据库连接配置: host=${pgConfig.host}, port=${pgConfig.port}, database=${pgConfig.dbname || 'postgres'}, user=${pgConfig.user}`); console.log(`数据库连接配置: host=${pgConfig.host}, port=${pgConfig.port}, database=${pgConfig.dbname || 'postgres'}, user=${pgConfig.user}`);
let pgPool: Pool | null = null; let pgPool: Pool | null = null;
@@ -106,6 +150,8 @@ export async function main(
console.log(`获取到 ${shorturls.length} 个shorturl资源`); console.log(`获取到 ${shorturls.length} 个shorturl资源`);
if (shorturls.length === 0) { if (shorturls.length === 0) {
// 即使没有数据也更新状态
await updateSyncState(currentRunTime);
return { synced: 0, message: "没有找到需要同步的shorturl资源" }; return { synced: 0, message: "没有找到需要同步的shorturl资源" };
} }
@@ -120,7 +166,11 @@ export async function main(
// 写入ClickHouse // 写入ClickHouse
const inserted = await insertToClickhouse(clickhouseData); const inserted = await insertToClickhouse(clickhouseData);
console.log(`成功写入 ${inserted} 条记录到ClickHouse`); console.log(`成功写入 ${inserted} 条记录到ClickHouse`);
return { synced: inserted, message: "同步完成" };
// 更新同步状态
await updateSyncState(currentRunTime);
return { synced: inserted, message: "同步完成", lastSyncTime: currentRunTime };
} else { } else {
console.log("Dry run模式 - 不执行实际写入"); console.log("Dry run模式 - 不执行实际写入");
console.log(`将写入 ${clickhouseData.length} 条记录到ClickHouse`); console.log(`将写入 ${clickhouseData.length} 条记录到ClickHouse`);
@@ -146,6 +196,22 @@ export async function main(
} }
} }
// 更新同步状态
async function updateSyncState(currentRunTime: string): Promise<void> {
try {
const syncState: SyncState = {
lastSyncTime: new Date().toISOString(), // 记录数据同步完成的时间
lastRunTime: currentRunTime // 记录本次运行的时间点
};
console.log(`更新同步状态: ${JSON.stringify(syncState)}`);
await setVariable(STATE_VARIABLE_PATH, JSON.stringify(syncState));
} catch (error: unknown) {
console.error(`更新同步状态失败: ${error instanceof Error ? error.message : String(error)}`);
// 不中断主流程,即使状态更新失败
}
}
// 从PostgreSQL获取所有shorturl资源 // 从PostgreSQL获取所有shorturl资源
async function fetchShorturlResources( async function fetchShorturlResources(
pgPool: Pool, pgPool: Pool,
@@ -185,8 +251,9 @@ async function fetchShorturlResources(
query += ` AND r.deleted_at IS NULL`; query += ` AND r.deleted_at IS NULL`;
} }
// 修改为同时考虑created_at和updated_at确保捕获自上次同步以来创建或更新的记录
if (options.startTime) { if (options.startTime) {
query += ` AND r.created_at >= $${paramCount}`; query += ` AND (r.created_at >= $${paramCount} OR r.updated_at >= $${paramCount})`;
params.push(options.startTime); params.push(options.startTime);
paramCount++; paramCount++;
} }
@@ -197,7 +264,8 @@ async function fetchShorturlResources(
paramCount++; paramCount++;
} }
query += ` ORDER BY r.created_at DESC LIMIT $${paramCount}`; // 优先按更新时间排序,确保最近更新的记录先处理
query += ` ORDER BY r.updated_at DESC, r.created_at DESC LIMIT $${paramCount}`;
params.push(options.limit); params.push(options.limit);
const client = await pgPool.connect(); const client = await pgPool.connect();