Clickhouse Io
by affaan-m
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Skill Details
Repository Files
1 file in this skill directory
name: clickhouse-io description: ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
ClickHouse 分析模式
用於高效能分析和資料工程的 ClickHouse 特定模式。
概述
ClickHouse 是一個列式資料庫管理系統(DBMS),用於線上分析處理(OLAP)。它針對大型資料集的快速分析查詢進行了優化。
關鍵特性:
- 列式儲存
- 資料壓縮
- 平行查詢執行
- 分散式查詢
- 即時分析
表格設計模式
MergeTree 引擎(最常見)
CREATE TABLE markets_analytics (
date Date,
market_id String,
market_name String,
volume UInt64,
trades UInt32,
unique_traders UInt32,
avg_trade_size Float64,
created_at DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, market_id)
SETTINGS index_granularity = 8192;
ReplacingMergeTree(去重)
-- 用於可能有重複的資料(例如來自多個來源)
CREATE TABLE user_events (
event_id String,
user_id String,
event_type String,
timestamp DateTime,
properties String
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, event_id, timestamp)
PRIMARY KEY (user_id, event_id);
AggregatingMergeTree(預聚合)
-- 用於維護聚合指標
CREATE TABLE market_stats_hourly (
hour DateTime,
market_id String,
total_volume AggregateFunction(sum, UInt64),
total_trades AggregateFunction(count, UInt32),
unique_users AggregateFunction(uniq, String)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, market_id);
-- 查詢聚合資料
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR)
GROUP BY hour, market_id
ORDER BY hour DESC;
查詢優化模式
高效過濾
-- ✅ 良好:先使用索引欄位
SELECT *
FROM markets_analytics
WHERE date >= '2025-01-01'
AND market_id = 'market-123'
AND volume > 1000
ORDER BY date DESC
LIMIT 100;
-- ❌ 不良:先過濾非索引欄位
SELECT *
FROM markets_analytics
WHERE volume > 1000
AND market_name LIKE '%election%'
AND date >= '2025-01-01';
聚合
-- ✅ 良好:使用 ClickHouse 特定聚合函式
SELECT
toStartOfDay(created_at) AS day,
market_id,
sum(volume) AS total_volume,
count() AS total_trades,
uniq(trader_id) AS unique_traders,
avg(trade_size) AS avg_size
FROM trades
WHERE created_at >= today() - INTERVAL 7 DAY
GROUP BY day, market_id
ORDER BY day DESC, total_volume DESC;
-- ✅ 使用 quantile 計算百分位數(比 percentile 更高效)
SELECT
quantile(0.50)(trade_size) AS median,
quantile(0.95)(trade_size) AS p95,
quantile(0.99)(trade_size) AS p99
FROM trades
WHERE created_at >= now() - INTERVAL 1 HOUR;
視窗函式
-- 計算累計總和
SELECT
date,
market_id,
volume,
sum(volume) OVER (
PARTITION BY market_id
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_volume
FROM markets_analytics
WHERE date >= today() - INTERVAL 30 DAY
ORDER BY market_id, date;
資料插入模式
批量插入(推薦)
import { ClickHouse } from 'clickhouse'
const clickhouse = new ClickHouse({
url: process.env.CLICKHOUSE_URL,
port: 8123,
basicAuth: {
username: process.env.CLICKHOUSE_USER,
password: process.env.CLICKHOUSE_PASSWORD
}
})
// ✅ 批量插入(高效)
async function bulkInsertTrades(trades: Trade[]) {
const values = trades.map(trade => `(
'${trade.id}',
'${trade.market_id}',
'${trade.user_id}',
${trade.amount},
'${trade.timestamp.toISOString()}'
)`).join(',')
await clickhouse.query(`
INSERT INTO trades (id, market_id, user_id, amount, timestamp)
VALUES ${values}
`).toPromise()
}
// ❌ 個別插入(慢)
async function insertTrade(trade: Trade) {
// 不要在迴圈中這樣做!
await clickhouse.query(`
INSERT INTO trades VALUES ('${trade.id}', ...)
`).toPromise()
}
串流插入
// 用於持續資料攝取
import { createWriteStream } from 'fs'
import { pipeline } from 'stream/promises'
async function streamInserts() {
const stream = clickhouse.insert('trades').stream()
for await (const batch of dataSource) {
stream.write(batch)
}
await stream.end()
}
物化視圖
即時聚合
-- 建立每小時統計的物化視圖
CREATE MATERIALIZED VIEW market_stats_hourly_mv
TO market_stats_hourly
AS SELECT
toStartOfHour(timestamp) AS hour,
market_id,
sumState(amount) AS total_volume,
countState() AS total_trades,
uniqState(user_id) AS unique_users
FROM trades
GROUP BY hour, market_id;
-- 查詢物化視圖
SELECT
hour,
market_id,
sumMerge(total_volume) AS volume,
countMerge(total_trades) AS trades,
uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, market_id;
效能監控
查詢效能
-- 檢查慢查詢
SELECT
query_id,
user,
query,
query_duration_ms,
read_rows,
read_bytes,
memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
AND query_duration_ms > 1000
AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY query_duration_ms DESC
LIMIT 10;
表格統計
-- 檢查表格大小
SELECT
database,
table,
formatReadableSize(sum(bytes)) AS size,
sum(rows) AS rows,
max(modification_time) AS latest_modification
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;
常見分析查詢
時間序列分析
-- 每日活躍使用者
SELECT
toDate(timestamp) AS date,
uniq(user_id) AS daily_active_users
FROM events
WHERE timestamp >= today() - INTERVAL 30 DAY
GROUP BY date
ORDER BY date;
-- 留存分析
SELECT
signup_date,
countIf(days_since_signup = 0) AS day_0,
countIf(days_since_signup = 1) AS day_1,
countIf(days_since_signup = 7) AS day_7,
countIf(days_since_signup = 30) AS day_30
FROM (
SELECT
user_id,
min(toDate(timestamp)) AS signup_date,
toDate(timestamp) AS activity_date,
dateDiff('day', signup_date, activity_date) AS days_since_signup
FROM events
GROUP BY user_id, activity_date
)
GROUP BY signup_date
ORDER BY signup_date DESC;
漏斗分析
-- 轉換漏斗
SELECT
countIf(step = 'viewed_market') AS viewed,
countIf(step = 'clicked_trade') AS clicked,
countIf(step = 'completed_trade') AS completed,
round(clicked / viewed * 100, 2) AS view_to_click_rate,
round(completed / clicked * 100, 2) AS click_to_completion_rate
FROM (
SELECT
user_id,
session_id,
event_type AS step
FROM events
WHERE event_date = today()
)
GROUP BY session_id;
世代分析
-- 按註冊月份的使用者世代
SELECT
toStartOfMonth(signup_date) AS cohort,
toStartOfMonth(activity_date) AS month,
dateDiff('month', cohort, month) AS months_since_signup,
count(DISTINCT user_id) AS active_users
FROM (
SELECT
user_id,
min(toDate(timestamp)) OVER (PARTITION BY user_id) AS signup_date,
toDate(timestamp) AS activity_date
FROM events
)
GROUP BY cohort, month, months_since_signup
ORDER BY cohort, months_since_signup;
資料管線模式
ETL 模式
// 提取、轉換、載入
async function etlPipeline() {
// 1. 從來源提取
const rawData = await extractFromPostgres()
// 2. 轉換
const transformed = rawData.map(row => ({
date: new Date(row.created_at).toISOString().split('T')[0],
market_id: row.market_slug,
volume: parseFloat(row.total_volume),
trades: parseInt(row.trade_count)
}))
// 3. 載入到 ClickHouse
await bulkInsertToClickHouse(transformed)
}
// 定期執行
setInterval(etlPipeline, 60 * 60 * 1000) // 每小時
變更資料捕獲(CDC)
// 監聽 PostgreSQL 變更並同步到 ClickHouse
import { Client } from 'pg'
const pgClient = new Client({ connectionString: process.env.DATABASE_URL })
pgClient.query('LISTEN market_updates')
pgClient.on('notification', async (msg) => {
const update = JSON.parse(msg.payload)
await clickhouse.insert('market_updates', [
{
market_id: update.id,
event_type: update.operation, // INSERT, UPDATE, DELETE
timestamp: new Date(),
data: JSON.stringify(update.new_data)
}
])
})
最佳實務
1. 分區策略
- 按時間分區(通常按月或日)
- 避免太多分區(效能影響)
- 分區鍵使用 DATE 類型
2. 排序鍵
- 最常過濾的欄位放在最前面
- 考慮基數(高基數優先)
- 排序影響壓縮
3. 資料類型
- 使用最小的適當類型(UInt32 vs UInt64)
- 重複字串使用 LowCardinality
- 分類資料使用 Enum
4. 避免
- SELECT *(指定欄位)
- FINAL(改為在查詢前合併資料)
- 太多 JOINs(為分析反正規化)
- 小量頻繁插入(改用批量)
5. 監控
- 追蹤查詢效能
- 監控磁碟使用
- 檢查合併操作
- 審查慢查詢日誌
記住:ClickHouse 擅長分析工作負載。為你的查詢模式設計表格,批量插入,並利用物化視圖進行即時聚合。
Related Skills
Xlsx
Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Analyzing Financial Statements
This skill calculates key financial ratios and metrics from financial statement data for investment analysis
Data Storytelling
Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.
Kpi Dashboard Design
Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.
Dbt Transformation Patterns
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
Sql Optimization Patterns
Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.
Clinical Decision Support
Generate professional clinical decision support (CDS) documents for pharmaceutical and clinical research settings, including patient cohort analyses (biomarker-stratified with outcomes) and treatment recommendation reports (evidence-based guidelines with decision algorithms). Supports GRADE evidence grading, statistical analysis (hazard ratios, survival curves, waterfall plots), biomarker integration, and regulatory compliance. Outputs publication-ready LaTeX/PDF format optimized for drug develo
Anndata
This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.
Xlsx
Spreadsheet toolkit (.xlsx/.csv). Create/edit with formulas/formatting, analyze data, visualization, recalculate formulas, for spreadsheet processing and analysis.
