cleaning up db

This commit is contained in:
2025-03-12 14:08:38 +08:00
parent ae7ea3b567
commit 285a0c780a
11 changed files with 1011 additions and 70 deletions

View File

@@ -0,0 +1,212 @@
// 检查ClickHouse数据库结构的脚本
const { createClient } = require('@clickhouse/client');
const dotenv = require('dotenv');
const path = require('path');
const fs = require('fs');
// 加载环境变量
dotenv.config({ path: path.resolve(__dirname, '../../.env') });
// 定义输出目录
const DB_REPORTS_DIR = path.resolve(__dirname, '../db-reports');
// 获取ClickHouse配置
const clickhouseHost = process.env.CLICKHOUSE_HOST || 'localhost';
const clickhousePort = process.env.CLICKHOUSE_PORT || '8123';
const clickhouseUser = process.env.CLICKHOUSE_USER || 'default';
const clickhousePassword = process.env.CLICKHOUSE_PASSWORD || '';
const clickhouseDatabase = process.env.CLICKHOUSE_DATABASE || 'default';
console.log('ClickHouse配置:');
console.log(` - 主机: ${clickhouseHost}`);
console.log(` - 端口: ${clickhousePort}`);
console.log(` - 用户: ${clickhouseUser}`);
console.log(` - 数据库: ${clickhouseDatabase}`);
// 创建ClickHouse客户端 - 使用0.2.10版本的API
const client = createClient({
url: `http://${clickhouseHost}:${clickhousePort}`,
username: clickhouseUser,
password: clickhousePassword,
database: clickhouseDatabase
});
// 获取所有表
async function getAllTables() {
console.log('\n获取所有表...');
try {
const query = `
SELECT name
FROM system.tables
WHERE database = '${clickhouseDatabase}'
`;
const resultSet = await client.query({
query,
format: 'JSONEachRow'
});
const tables = await resultSet.json();
if (!tables || tables.length === 0) {
console.log(`数据库 ${clickhouseDatabase} 中没有找到任何表`);
return null;
}
console.log(`数据库 ${clickhouseDatabase} 中找到以下表:`);
tables.forEach(table => {
console.log(` - ${table.name}`);
});
return tables.map(table => table.name);
} catch (error) {
console.error('获取所有表时出错:', error);
return null;
}
}
// 获取表结构
async function getTableSchema(tableName) {
console.log(`\n获取表 ${tableName} 的结构...`);
try {
const query = `
DESCRIBE TABLE ${clickhouseDatabase}.${tableName}
`;
const resultSet = await client.query({
query,
format: 'JSONEachRow'
});
const columns = await resultSet.json();
if (!columns || columns.length === 0) {
console.log(`${tableName} 不存在或没有列`);
return null;
}
console.log(`${tableName} 的列:`);
columns.forEach(column => {
console.log(` - ${column.name} (${column.type}, ${column.default_type === '' ? '无默认值' : `默认值: ${column.default_expression}`})`);
});
return columns;
} catch (error) {
console.error(`获取表 ${tableName} 结构时出错:`, error);
return null;
}
}
// 获取表数据示例
async function getTableDataSample(tableName, limit = 5) {
console.log(`\n获取表 ${tableName} 的数据示例 (最多 ${limit} 行)...`);
try {
const query = `
SELECT *
FROM ${clickhouseDatabase}.${tableName}
LIMIT ${limit}
`;
const resultSet = await client.query({
query,
format: 'JSONEachRow'
});
const rows = await resultSet.json();
if (!rows || rows.length === 0) {
console.log(`${tableName} 中没有数据`);
return null;
}
console.log(`${tableName} 的数据示例:`);
rows.forEach((row, index) => {
console.log(`${index + 1}:`);
Object.entries(row).forEach(([key, value]) => {
console.log(` ${key}: ${value}`);
});
});
return rows;
} catch (error) {
console.error(`获取表 ${tableName} 数据示例时出错:`, error);
return null;
}
}
// 主函数
async function main() {
let outputBuffer = '';
const originalConsoleLog = console.log;
// 重定向console.log到buffer和控制台
console.log = function() {
// 调用原始的console.log
originalConsoleLog.apply(console, arguments);
// 写入到buffer
outputBuffer += Array.from(arguments).join(' ') + '\n';
};
try {
// 获取所有表
const tables = await getAllTables();
if (!tables) {
console.error('无法获取表列表');
process.exit(1);
}
console.log('\n所有ClickHouse表:');
console.log(tables.join(', '));
// 获取每个表的结构,但不获取数据示例
for (const tableName of tables) {
await getTableSchema(tableName);
// 移除数据示例检查
// await getTableDataSample(tableName);
}
console.log('\nClickHouse数据库结构检查完成');
// 保存输出到指定目录
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
// 确保目录存在
if (!fs.existsSync(DB_REPORTS_DIR)) {
fs.mkdirSync(DB_REPORTS_DIR, { recursive: true });
}
const outputPath = path.join(DB_REPORTS_DIR, `clickhouse-schema-${timestamp}.log`);
fs.writeFileSync(outputPath, outputBuffer);
originalConsoleLog(`结果已保存到: ${outputPath}`);
} catch (error) {
console.error('检查ClickHouse数据库结构时出错:', error);
} finally {
// 恢复原始的console.log
console.log = originalConsoleLog;
// 关闭客户端连接
await client.close();
}
}
// 导出函数
module.exports = {
getAllTables,
getTableSchema,
getTableDataSample,
main
};
// 如果直接运行此脚本则执行main函数
if (require.main === module) {
main().catch(error => {
console.error('运行脚本时出错:', error);
process.exit(1);
});
}

View File

@@ -0,0 +1,329 @@
// 检查数据库结构的脚本
const { Client } = require('pg');
const dotenv = require('dotenv');
const path = require('path');
const fs = require('fs');
// 加载环境变量
dotenv.config({ path: path.resolve(__dirname, '../../.env') });
// 获取数据库连接字符串
const databaseUrl = process.env.DATABASE_URL;
if (!databaseUrl) {
console.error('缺少数据库连接字符串。请确保.env文件中包含DATABASE_URL');
process.exit(1);
}
// 定义输出目录
const DB_REPORTS_DIR = path.resolve(__dirname, '../db-reports');
// 连接数据库
async function connect() {
console.log('使用PostgreSQL连接字符串连接数据库...');
// 创建PostgreSQL客户端
const client = new Client({
connectionString: databaseUrl,
});
try {
await client.connect();
console.log('成功连接到数据库');
return client;
} catch (error) {
console.error('连接数据库失败:', error);
throw error;
}
}
// 断开数据库连接
async function disconnect(client) {
try {
await client.end();
console.log('已断开数据库连接');
} catch (error) {
console.error('断开数据库连接失败:', error);
}
}
// 获取所有表
async function getAllTables(client) {
console.log('\n获取所有表...');
try {
const query = `
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
ORDER BY table_name;
`;
const result = await client.query(query);
if (!result.rows || result.rows.length === 0) {
console.log('没有找到任何表');
return null;
}
console.log('找到以下表:');
result.rows.forEach(row => {
console.log(` - ${row.table_name}`);
});
return result.rows.map(row => row.table_name);
} catch (error) {
console.error('获取所有表时出错:', error);
return null;
}
}
// 获取表结构
async function getTableSchema(client, tableName) {
console.log(`\n获取表 ${tableName} 的结构...`);
try {
// 获取基本列信息
const columnsQuery = `
SELECT
column_name,
data_type,
is_nullable,
column_default,
character_maximum_length,
numeric_precision,
numeric_scale
FROM
information_schema.columns
WHERE
table_schema = 'public' AND
table_name = $1
ORDER BY
ordinal_position;
`;
const columnsResult = await client.query(columnsQuery, [tableName]);
if (!columnsResult.rows || columnsResult.rows.length === 0) {
console.log(`${tableName} 不存在或没有列`);
return null;
}
// 获取主键信息
const primaryKeyQuery = `
SELECT
kcu.column_name
FROM
information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE
tc.constraint_type = 'PRIMARY KEY' AND
tc.table_schema = 'public' AND
tc.table_name = $1
ORDER BY
kcu.ordinal_position;
`;
const primaryKeyResult = await client.query(primaryKeyQuery, [tableName]);
// 获取外键信息
const foreignKeysQuery = `
SELECT
kcu.column_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM
information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
AND tc.table_schema = ccu.table_schema
WHERE
tc.constraint_type = 'FOREIGN KEY' AND
tc.table_schema = 'public' AND
tc.table_name = $1;
`;
const foreignKeysResult = await client.query(foreignKeysQuery, [tableName]);
// 获取索引信息
const indexesQuery = `
SELECT
indexname,
indexdef
FROM
pg_indexes
WHERE
schemaname = 'public' AND
tablename = $1;
`;
const indexesResult = await client.query(indexesQuery, [tableName]);
// 输出列信息
console.log(`${tableName} 的列:`);
columnsResult.rows.forEach(column => {
console.log(` - ${column.column_name} (${column.data_type}${
column.character_maximum_length ? `(${column.character_maximum_length})` :
(column.numeric_precision ? `(${column.numeric_precision},${column.numeric_scale})` : '')
}, ${column.is_nullable === 'YES' ? '可为空' : '不可为空'}, 默认值: ${column.column_default || 'NULL'})`);
});
// 输出主键信息
if (primaryKeyResult.rows.length > 0) {
console.log(` 主键: ${primaryKeyResult.rows.map(row => row.column_name).join(', ')}`);
} else {
console.log(' 主键: 无');
}
// 输出外键信息
if (foreignKeysResult.rows.length > 0) {
console.log(' 外键:');
foreignKeysResult.rows.forEach(fk => {
console.log(` - ${fk.column_name} -> ${fk.foreign_table_name}.${fk.foreign_column_name}`);
});
} else {
console.log(' 外键: 无');
}
// 输出索引信息
if (indexesResult.rows.length > 0) {
console.log(' 索引:');
indexesResult.rows.forEach(idx => {
console.log(` - ${idx.indexname}: ${idx.indexdef}`);
});
} else {
console.log(' 索引: 无');
}
return {
columns: columnsResult.rows,
primaryKey: primaryKeyResult.rows,
foreignKeys: foreignKeysResult.rows,
indexes: indexesResult.rows
};
} catch (error) {
console.error(`获取表 ${tableName} 结构时出错:`, error);
return null;
}
}
// 获取表数据示例
async function getTableDataSample(client, tableName, limit = 5) {
console.log(`\n获取表 ${tableName} 的数据示例 (最多 ${limit} 行)...`);
try {
const query = `
SELECT *
FROM "${tableName}"
LIMIT $1;
`;
const result = await client.query(query, [limit]);
if (!result.rows || result.rows.length === 0) {
console.log(`${tableName} 中没有数据`);
return null;
}
console.log(`${tableName} 的数据示例:`);
result.rows.forEach((row, index) => {
console.log(`${index + 1}:`);
Object.entries(row).forEach(([key, value]) => {
console.log(` ${key}: ${value}`);
});
});
return result.rows;
} catch (error) {
console.error(`获取表 ${tableName} 数据示例时出错:`, error);
return null;
}
}
// 主函数
async function main() {
let client = null;
let outputBuffer = '';
const originalConsoleLog = console.log;
// 重定向console.log到buffer和控制台
console.log = function() {
// 调用原始的console.log
originalConsoleLog.apply(console, arguments);
// 写入到buffer
outputBuffer += Array.from(arguments).join(' ') + '\n';
};
try {
// 连接数据库
client = await connect();
// 获取所有表
const tables = await getAllTables(client);
if (!tables) {
console.error('无法获取表列表');
process.exit(1);
}
console.log('\n所有PostgreSQL表:');
console.log(tables.join(', '));
// 获取所有表的结构,而不只是特定表
for (const tableName of tables) {
await getTableSchema(client, tableName);
// 移除数据示例检查
// await getTableDataSample(client, tableName);
}
console.log('\n数据库结构检查完成');
// 保存输出到指定目录
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
// 确保目录存在
if (!fs.existsSync(DB_REPORTS_DIR)) {
fs.mkdirSync(DB_REPORTS_DIR, { recursive: true });
}
const outputPath = path.join(DB_REPORTS_DIR, `postgres-schema-${timestamp}.log`);
fs.writeFileSync(outputPath, outputBuffer);
originalConsoleLog(`结果已保存到: ${outputPath}`);
} catch (error) {
console.error('检查数据库结构时出错:', error);
process.exit(1);
} finally {
// 恢复原始的console.log
console.log = originalConsoleLog;
// 关闭数据库连接
if (client) {
await disconnect(client);
}
}
}
// 导出函数
module.exports = {
connect,
disconnect,
getAllTables,
getTableSchema,
getTableDataSample,
main
};
// 如果直接运行此脚本则执行main函数
if (require.main === module) {
main().catch(error => {
console.error('运行脚本时出错:', error);
process.exit(1);
});
}

View File

@@ -0,0 +1,102 @@
// 一键运行所有数据库检查脚本
const { exec } = require('child_process');
const path = require('path');
const fs = require('fs');
// 定义脚本路径
const postgresScriptPath = path.join(__dirname, 'postgres-schema.js');
const clickhouseScriptPath = path.join(__dirname, 'clickhouse-schema.js');
// 定义输出目录
const DB_REPORTS_DIR = path.resolve(__dirname, '../db-reports');
// 确保目录存在
if (!fs.existsSync(DB_REPORTS_DIR)) {
fs.mkdirSync(DB_REPORTS_DIR, { recursive: true });
console.log(`创建输出目录: ${DB_REPORTS_DIR}`);
}
// 定义日期时间格式化函数,用于生成日志文件名
function getTimestampString() {
return new Date().toISOString().replace(/[:.]/g, '-');
}
// 运行PostgreSQL脚本
async function runPostgresScript() {
return new Promise((resolve, reject) => {
console.log('\n=======================================');
console.log('正在运行PostgreSQL数据库结构检查脚本...');
console.log('=======================================\n');
const process = exec(`node --no-inspect ${postgresScriptPath}`, (error, stdout, stderr) => {
if (error) {
console.error(`PostgreSQL脚本运行出错: ${error.message}`);
reject(error);
return;
}
if (stderr) {
console.error(`PostgreSQL脚本错误: ${stderr}`);
}
console.log(stdout);
resolve();
});
});
}
// 运行ClickHouse脚本
async function runClickHouseScript() {
return new Promise((resolve, reject) => {
console.log('\n=======================================');
console.log('正在运行ClickHouse数据库结构检查脚本...');
console.log('=======================================\n');
const process = exec(`node --no-inspect ${clickhouseScriptPath}`, (error, stdout, stderr) => {
if (error) {
console.error(`ClickHouse脚本运行出错: ${error.message}`);
reject(error);
return;
}
if (stderr) {
console.error(`ClickHouse脚本错误: ${stderr}`);
}
console.log(stdout);
resolve();
});
});
}
// 主函数
async function main() {
try {
console.log('开始运行所有数据库结构检查脚本...');
console.log(`输出目录: ${DB_REPORTS_DIR}`);
console.log(`时间戳: ${getTimestampString()}`);
// 运行PostgreSQL脚本
await runPostgresScript();
// 运行ClickHouse脚本
await runClickHouseScript();
console.log('\n=======================================');
console.log('所有数据库结构检查脚本已完成!');
console.log('报告已保存到以下目录:');
console.log(DB_REPORTS_DIR);
console.log('=======================================');
} catch (error) {
console.error('运行脚本时出错:', error);
process.exit(1);
}
}
// 执行主函数
if (require.main === module) {
main().catch(error => {
console.error('运行脚本时出错:', error);
process.exit(1);
});
}

View File

@@ -0,0 +1,98 @@
#!/bin/bash
# 文件名: ch-query.sh
# 用途: 执行ClickHouse SQL查询的便捷脚本
# 连接参数
CH_HOST="localhost"
CH_PORT="9000"
CH_USER="admin"
CH_PASSWORD="your_secure_password"
CH_DATABASE="promote"
# 基本查询函数
function ch_query() {
clickhouse client --host $CH_HOST --port $CH_PORT --user $CH_USER --password $CH_PASSWORD --database $CH_DATABASE -q "$1"
}
# 显示帮助信息
function show_help() {
echo "ClickHouse 查询工具"
echo "用法: $0 [选项] [SQL查询]"
echo ""
echo "选项:"
echo " -t 显示所有表"
echo " -d <表名> 显示表结构"
echo " -s <表名> 显示表样本数据(前10行)"
echo " -c <表名> 计算表中的记录数"
echo " -h, --help 显示此帮助信息"
echo " -q \"SQL查询\" 执行自定义SQL查询"
echo " -f <文件名> 执行SQL文件"
echo ""
echo "示例:"
echo " $0 -t # 显示所有表"
echo " $0 -d events # 显示events表结构"
echo " $0 -q \"SELECT * FROM events LIMIT 5\" # 执行自定义查询"
}
# 没有参数时显示帮助
if [ $# -eq 0 ]; then
show_help
exit 0
fi
# 处理命令行参数
case "$1" in
-t)
ch_query "SHOW TABLES"
;;
-d)
if [ -z "$2" ]; then
echo "错误: 需要提供表名"
exit 1
fi
ch_query "DESCRIBE TABLE $2"
;;
-s)
if [ -z "$2" ]; then
echo "错误: 需要提供表名"
exit 1
fi
ch_query "SELECT * FROM $2 LIMIT 10"
;;
-c)
if [ -z "$2" ]; then
echo "错误: 需要提供表名"
exit 1
fi
ch_query "SELECT COUNT(*) FROM $2"
;;
-q)
if [ -z "$2" ]; then
echo "错误: 需要提供SQL查询"
exit 1
fi
ch_query "$2"
;;
-f)
if [ -z "$2" ]; then
echo "错误: 需要提供SQL文件"
exit 1
fi
if [ ! -f "$2" ]; then
echo "错误: 文件 '$2' 不存在"
exit 1
fi
SQL=$(cat "$2")
ch_query "$SQL"
;;
-h|--help)
show_help
;;
*)
echo "未知选项: $1"
show_help
exit 1
;;
esac
exit 0

View File

@@ -0,0 +1,4 @@
```bash
alias clickhouse-sql='clickhouse client --host localhost --port 9000 --user admin --password your_secure_password --database promote -q'
clickhouse-sql "SHOW TABLES"
```

View File

@@ -0,0 +1,251 @@
-- 删除旧表
DROP TABLE IF EXISTS events;
DROP TABLE IF EXISTS mv_kol_performance;
DROP TABLE IF EXISTS mv_platform_distribution;
DROP TABLE IF EXISTS mv_sentiment_analysis;
DROP TABLE IF EXISTS mv_interaction_time;
DROP TABLE IF EXISTS mv_conversion_funnel;
-- 创建新的events表
CREATE TABLE events (
-- 基本信息
event_id UUID DEFAULT generateUUIDv4(),
timestamp DateTime DEFAULT now(),
date Date DEFAULT toDate(now()),
hour UInt8 DEFAULT toHour(now()),
-- 实体关联
user_id String,
influencer_id String,
content_id String,
project_id String,
-- 事件信息
event_type Enum8(
'view' = 1,
-- 浏览
'like' = 2,
-- 点赞
'unlike' = 3,
-- 取消点赞
'follow' = 4,
-- 关注
'unfollow' = 5,
-- 取消关注
'comment' = 6,
-- 评论
'share' = 7,
-- 分享
'click' = 8,
-- 点击链接
'impression' = 9,
-- 曝光
'purchase' = 10,
-- 购买
'signup' = 11 -- 注册
),
-- 转化漏斗
funnel_stage Enum8(
'exposure' = 1,
-- 曝光
'interest' = 2,
-- 兴趣
'consideration' = 3,
-- 考虑
'intent' = 4,
-- 意向
'evaluation' = 5,
-- 评估
'purchase' = 6 -- 购买
),
-- 内容信息
platform String,
-- 社交平台
content_type Enum8(
'video' = 1,
'image' = 2,
'text' = 3,
'story' = 4,
'reel' = 5,
'live' = 6
),
content_status Enum8(
-- 审核状态
'approved' = 1,
'pending' = 2,
'rejected' = 3
),
-- 互动分析
sentiment Enum8(
-- 情感分析
'positive' = 1,
'neutral' = 2,
'negative' = 3
),
comment_text String,
-- 评论文本
keywords Array(String),
-- 关键词
-- 数值指标
interaction_value Float64,
-- 互动价值
followers_count UInt32,
-- 粉丝数
followers_change Int32,
-- 粉丝变化量
likes_count UInt32,
-- 点赞数
likes_change Int32,
-- 点赞变化量
views_count UInt32,
-- 观看数
-- 设备信息
ip String,
user_agent String,
device_type String,
referrer String,
-- 地理信息
geo_country String,
geo_city String,
-- 会话信息
session_id String
) ENGINE = MergeTree() PARTITION BY toYYYYMM(timestamp)
ORDER BY
(event_type, influencer_id, date, hour) SETTINGS index_granularity = 8192;
-- 创建物化视图KOL表现概览
CREATE MATERIALIZED VIEW mv_kol_performance ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(influencer_id, date) AS
SELECT
influencer_id,
date,
sum(if(event_type = 'follow', 1, 0)) - sum(if(event_type = 'unfollow', 1, 0)) AS new_followers,
sum(if(event_type = 'like', 1, 0)) - sum(if(event_type = 'unlike', 1, 0)) AS new_likes,
sum(if(event_type = 'view', 1, 0)) AS views,
sum(if(event_type = 'comment', 1, 0)) AS comments,
sum(if(event_type = 'share', 1, 0)) AS shares
FROM
events
GROUP BY
influencer_id,
date;
-- 创建物化视图:平台分布
CREATE MATERIALIZED VIEW mv_platform_distribution ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(platform, date) AS
SELECT
platform,
date,
count() AS events_count,
uniqExact(user_id) AS unique_users,
uniqExact(content_id) AS unique_contents
FROM
events
GROUP BY
platform,
date;
-- 创建物化视图:情感分析
CREATE MATERIALIZED VIEW mv_sentiment_analysis ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(sentiment, date) AS
SELECT
sentiment,
date,
count() AS count
FROM
events
WHERE
sentiment IS NOT NULL
AND event_type = 'comment'
GROUP BY
sentiment,
date;
-- 创建物化视图:用户互动时间
CREATE MATERIALIZED VIEW mv_interaction_time ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(date, hour) AS
SELECT
date,
hour,
count() AS events_count,
uniqExact(user_id) AS unique_users
FROM
events
GROUP BY
date,
hour;
-- 创建物化视图:内容审核状态
CREATE MATERIALIZED VIEW mv_content_status ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(content_status, date) AS
SELECT
content_status,
date,
count() AS count
FROM
events
WHERE
content_status IS NOT NULL
GROUP BY
content_status,
date;
-- 创建物化视图:转化漏斗
CREATE MATERIALIZED VIEW mv_conversion_funnel ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(funnel_stage, date) AS
SELECT
funnel_stage,
date,
count() AS stage_count,
uniqExact(user_id) AS unique_users
FROM
events
WHERE
funnel_stage IS NOT NULL
GROUP BY
funnel_stage,
date;
-- 创建物化视图:热门内容
CREATE MATERIALIZED VIEW mv_popular_content ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(content_id, date) AS
SELECT
content_id,
influencer_id,
date,
sum(if(event_type = 'view', 1, 0)) AS views,
sum(if(event_type = 'like', 1, 0)) AS likes,
sum(if(event_type = 'comment', 1, 0)) AS comments,
sum(if(event_type = 'share', 1, 0)) AS shares
FROM
events
GROUP BY
content_id,
influencer_id,
date;
-- 创建物化视图:关键词分析
CREATE MATERIALIZED VIEW mv_keywords ENGINE = SummingMergeTree() PARTITION BY toYYYYMM(date)
ORDER BY
(keyword, date) AS
SELECT
arrayJoin(keywords) AS keyword,
date,
count() AS frequency
FROM
events
WHERE
length(keywords) > 0
GROUP BY
keyword,
date;

View File

@@ -0,0 +1,331 @@
#!/usr/bin/env node
// # 显示所有表
// ./pg-query.js -t
// # 显示表结构
// ./pg-query.js -d influencers
// # 显示样本数据限制5行
// ./pg-query.js -s posts -l 5
// # 查看表记录数
// ./pg-query.js -c posts
// # 显示索引
// ./pg-query.js -i posts
// # 显示外键
// ./pg-query.js -f posts
// # 显示引用
// ./pg-query.js -r influencers
// # 执行自定义查询
// ./pg-query.js -q "SELECT * FROM influencers WHERE platform = 'Instagram' LIMIT 5"
// # 执行SQL文件
// ./pg-query.js -e schema.sql
const { Client } = require('pg');
const path = require('path');
const fs = require('fs');
const yargs = require('yargs/yargs');
const { hideBin } = require('yargs/helpers');
// 加载.env文件 - 使用正确的相对路径
require('dotenv').config({ path: path.resolve(__dirname, '../../../.env') });
// 显示连接信息(不含密码)以便调试
function getConnectionString() {
// 使用.env中的DATABASE_URL
const databaseUrl = process.env.DATABASE_URL;
if (!databaseUrl) {
console.error('错误: 未找到DATABASE_URL环境变量');
process.exit(1);
}
// 显示连接信息但隐藏密码
const sanitizedUrl = databaseUrl.replace(/:[^:@]+@/, ':***@');
console.log(`使用连接: ${sanitizedUrl}`);
return databaseUrl;
}
// 创建一个新的客户端
async function runQuery(query, params = []) {
const client = new Client({
connectionString: getConnectionString()
});
try {
await client.connect();
console.log('数据库连接成功');
const result = await client.query(query, params);
return result.rows;
} catch (err) {
console.error('查询执行错误:', err.message);
return null;
} finally {
await client.end();
}
}
// 显示所有表
async function showTables() {
const query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name;";
const tables = await runQuery(query);
if (tables && tables.length > 0) {
console.log('数据库中的表:');
console.table(tables);
} else {
console.log('没有找到表或连接失败');
}
}
// 显示表结构
async function showTableStructure(tableName) {
const query = `
SELECT
column_name AS "列名",
data_type AS "数据类型",
CASE WHEN is_nullable = 'YES' THEN '允许为空' ELSE '不允许为空' END AS "是否可空",
column_default AS "默认值",
character_maximum_length AS "最大长度"
FROM
information_schema.columns
WHERE
table_schema = 'public' AND
table_name = $1
ORDER BY
ordinal_position;
`;
const columns = await runQuery(query, [tableName]);
if (columns && columns.length > 0) {
console.log(`${tableName} 的结构:`);
console.table(columns);
} else {
console.log(`${tableName} 不存在或连接失败`);
}
}
// 显示样本数据
async function showSampleData(tableName, limit = 10) {
const query = `SELECT * FROM "${tableName}" LIMIT ${limit};`;
const data = await runQuery(query);
if (data && data.length > 0) {
console.log(`${tableName} 的样本数据 (${limit} 行):`);
console.table(data);
} else {
console.log(`${tableName} 为空或不存在`);
}
}
// 显示记录计数
async function showRecordCount(tableName) {
const query = `SELECT COUNT(*) AS "记录数" FROM "${tableName}";`;
const count = await runQuery(query);
if (count) {
console.log(`${tableName} 的记录数:`);
console.table(count);
} else {
console.log(`${tableName} 不存在或连接失败`);
}
}
// 显示索引信息
async function showIndexes(tableName) {
const query = `
SELECT
indexname AS "索引名称",
indexdef AS "索引定义"
FROM
pg_indexes
WHERE
tablename = $1
ORDER BY
indexname;
`;
const indexes = await runQuery(query, [tableName]);
if (indexes && indexes.length > 0) {
console.log(`${tableName} 的索引:`);
console.table(indexes);
} else {
console.log(`${tableName} 没有索引或不存在`);
}
}
// 显示外键
async function showForeignKeys(tableName) {
const query = `
SELECT
conname AS "外键名称",
pg_get_constraintdef(oid) AS "外键定义"
FROM
pg_constraint
WHERE
conrelid = $1::regclass AND contype = 'f';
`;
const foreignKeys = await runQuery(query, [tableName]);
if (foreignKeys && foreignKeys.length > 0) {
console.log(`${tableName} 的外键:`);
console.table(foreignKeys);
} else {
console.log(`${tableName} 没有外键或不存在`);
}
}
// 显示引用当前表的外键
async function showReferencingKeys(tableName) {
const query = `
SELECT
c.conname AS "外键名称",
t.relname AS "引用表",
pg_get_constraintdef(c.oid) AS "外键定义"
FROM
pg_constraint c
JOIN
pg_class t ON c.conrelid = t.oid
WHERE
c.confrelid = $1::regclass AND c.contype = 'f';
`;
const referencingKeys = await runQuery(query, [tableName]);
if (referencingKeys && referencingKeys.length > 0) {
console.log(`引用表 ${tableName} 的外键关系:`);
console.table(referencingKeys);
} else {
console.log(`没有找到引用表 ${tableName} 的外键关系`);
}
}
// 执行自定义查询
async function executeQuery(query) {
const result = await runQuery(query);
if (result) {
console.log('查询结果:');
console.table(result);
} else {
console.log('查询执行失败或无结果');
}
}
// 执行SQL文件
async function executeSqlFile(filename) {
try {
const sql = fs.readFileSync(filename, 'utf8');
console.log(`执行SQL文件: ${filename}`);
await executeQuery(sql);
} catch (err) {
console.error(`执行SQL文件失败: ${err.message}`);
}
}
// 主函数
async function main() {
try {
const argv = yargs(hideBin(process.argv))
.usage('PostgreSQL 查询工具\n\n用法: $0 [选项]')
.option('t', {
alias: 'tables',
describe: '显示所有表',
type: 'boolean'
})
.option('d', {
alias: 'describe',
describe: '显示表结构',
type: 'string'
})
.option('s', {
alias: 'sample',
describe: '显示表样本数据',
type: 'string'
})
.option('l', {
alias: 'limit',
describe: '样本数据行数限制',
type: 'number',
default: 10
})
.option('c', {
alias: 'count',
describe: '计算表中的记录数',
type: 'string'
})
.option('i', {
alias: 'indexes',
describe: '显示表索引',
type: 'string'
})
.option('f', {
alias: 'foreign-keys',
describe: '显示表外键关系',
type: 'string'
})
.option('r', {
alias: 'references',
describe: '显示引用此表的外键',
type: 'string'
})
.option('q', {
alias: 'query',
describe: '执行自定义SQL查询',
type: 'string'
})
.option('e', {
alias: 'execute-file',
describe: '执行SQL文件',
type: 'string'
})
.example('$0 -t', '显示所有表')
.example('$0 -d influencers', '显示influencers表结构')
.example('$0 -s posts -l 5', '显示posts表前5行数据')
.epilog('更多信息请访问项目文档')
.help()
.alias('h', 'help')
.argv;
if (argv.tables) {
await showTables();
} else if (argv.describe) {
await showTableStructure(argv.describe);
} else if (argv.sample) {
await showSampleData(argv.sample, argv.limit);
} else if (argv.count) {
await showRecordCount(argv.count);
} else if (argv.indexes) {
await showIndexes(argv.indexes);
} else if (argv.foreignKeys) {
await showForeignKeys(argv.foreignKeys);
} else if (argv.references) {
await showReferencingKeys(argv.references);
} else if (argv.query) {
await executeQuery(argv.query);
} else if (argv.executeFile) {
await executeSqlFile(argv.executeFile);
} else {
yargs(hideBin(process.argv)).showHelp();
}
} catch (err) {
console.error('程序执行错误:', err.message);
process.exit(1);
}
}
// 执行主函数
main().catch(err => {
console.error('程序执行错误:', err);
process.exit(1);
});