Substreams Sql

by pinax-network

datacli

Expert knowledge for building SQL database sinks from Substreams. Covers database changes (CDC), relational mappings, PostgreSQL, ClickHouse, and materialized views.

Skill Details

Repository Files

5 files in this skill directory


name: substreams-sql description: Expert knowledge for building SQL database sinks from Substreams. Covers database changes (CDC), relational mappings, PostgreSQL, ClickHouse, and materialized views. license: Apache-2.0 compatibility: platforms: [claude-code, cursor, vscode, windsurf] metadata: version: 1.0.0 author: StreamingFast documentation: https://substreams.streamingfast.io

Substreams SQL Expert

Expert assistant for building SQL database sinks from Substreams data - transforming blockchain data into relational databases.

Prerequisites

  • substreams-sink-sql: Required CLI tool for database sink workflows. Install from releases.

Core Concepts

What is Substreams SQL?

Substreams SQL enables you to:

  • Transform blockchain data into structured SQL tables
  • Stream changes to PostgreSQL, ClickHouse, and other databases
  • Build materialized views with real-time updates
  • Handle reorgs with proper cursor-based streaming
  • Scale horizontally with parallel processing

Two Main Approaches

  1. Database Changes (CDC) - Stream individual row changes
  2. Relational Mappings - Transform data into normalized tables

Database Changes (CDC) Approach

Overview

The CDC approach streams individual database operations (INSERT, UPDATE, DELETE) to maintain real-time consistency.

Key Components

Protobuf Schema: The DatabaseChanges protobuf type is provided by the official substreams-sink-database-changes spkg — you do NOT need to define your own proto. Import it in your manifest (see Manifest Configuration below).

Rust Implementation:

use substreams::prelude::*;
use substreams_database_change::pb::sf::substreams::sink::database::v1::DatabaseChanges;
use substreams_database_change::tables::Tables;

#[substreams::handlers::map]
pub fn db_out(events: Events) -> Result<DatabaseChanges, Error> {
    let mut tables = Tables::new();

    for transfer in events.transfers {
        tables
            .create_row("transfers", format!("{}-{}", transfer.tx_hash, transfer.log_index))
            .set("tx_hash", transfer.tx_hash)
            .set("from_addr", transfer.from)
            .set("to_addr", transfer.to)
            .set("amount", transfer.amount)
            .set("block_num", transfer.block_number)
            .set("timestamp", transfer.timestamp);
    }

    for balance in events.balance_changes {
        tables
            .update_row("balances", balance.address)
            .set("balance", balance.new_balance)
            .set("updated_at", balance.timestamp);
    }

    Ok(tables.to_database_changes())
}

Manifest Configuration

The manifest requires importing the database changes and sink-sql protodefs spkgs, and a sink: section:

specVersion: v0.1.0
package:
  name: my-substreams-sql
  version: v0.1.0

imports:
    database: https://github.com/streamingfast/substreams-sink-database-changes/releases/download/v3.0.0/substreams-sink-database-changes-v3.0.0.spkg
    sql: https://github.com/streamingfast/substreams-sink-sql/releases/download/protodefs-v1.0.7/substreams-sink-sql-protodefs-v1.0.7.spkg

protobuf:
  excludePaths:
    - sf/substreams
    - google

binaries:
  default:
    type: wasm/rust-v1
    file: ./target/wasm32-unknown-unknown/release/my_substreams_sql.wasm

network: mainnet

modules:
  - name: db_out
    kind: map
    inputs:
      - map: map_events
    output:
      type: proto:sf.substreams.sink.database.v1.DatabaseChanges

sink:
  module: db_out
  type: sf.substreams.sink.sql.v1.Service
  config:
    schema: ./schema.sql
    engine: postgres

Cargo.toml dependency (v4 with delta updates support):

[dependencies]
substreams-database-change = "4"

Running the sink (DSN is passed on the command line, not in the manifest):

# 1. Build
substreams build

# 2. Setup system tables + apply schema.sql
substreams-sink-sql setup "psql://user:pass@localhost:5432/db?sslmode=disable" my-substreams-sql-v0.1.0.spkg

# 3. Run the sink
substreams-sink-sql run "psql://user:pass@localhost:5432/db?sslmode=disable" my-substreams-sql-v0.1.0.spkg

Advantages

  • Real-time consistency - Changes applied immediately
  • Handles reorgs - Can reverse/replay operations
  • Efficient updates - Only changed data is transmitted
  • ACID compliance - Database transactions ensure consistency

Use Cases

  • Real-time dashboards
  • Trading applications
  • Balance tracking
  • Event sourcing

Delta Updates for Aggregations

Delta updates enable atomic modifications to database rows without read-modify-write cycles. This is essential for building aggregation patterns like candles, counters, and real-time analytics — pushing chain-wide computation directly into the database instead of using Substreams store modules.

Note: Delta updates require PostgreSQL and substreams-sink-sql >= v4.12.0.

use substreams_database_change::tables::Tables;
use substreams_database_change::pb::sf::substreams::sink::database::v1::DatabaseChanges;

#[substreams::handlers::map]
fn db_out(events: Events) -> Result<DatabaseChanges, substreams::errors::Error> {
    let mut tables = Tables::new();

    for event in &events.items {
        // Composite primary keys use an array of tuples
        tables.upsert_row("aggregates", [
            ("key1", value1),
            ("key2", value2),
        ])
            .set_if_null("first_seen", &timestamp)  // First write wins
            .set("last_seen", &timestamp)            // Always overwrite
            .max("highest", value)                   // Track maximum
            .min("lowest", value)                    // Track minimum
            .add("total", amount)                    // Accumulate
            .add("count", 1i64);                     // Count
    }

    Ok(tables.to_database_changes())
}

Supported Delta Operations:

Operation SQL Equivalent Use Case
set_if_null COALESCE(column, value) First-write-wins
set column = value Always overwrite
max GREATEST(column, value) Track maximum
min LEAST(column, value) Track minimum
add COALESCE(column, 0) + value Accumulate
sub COALESCE(column, 0) - value Decrement

Important Notes:

  • The add() operation requires values implementing NumericAddable — pass owned values (volume.clone() or volume) rather than &String references
  • Ordinals are automatically managed by the Tables struct — no manual management needed

Relational Mappings Approach

Overview

Transform Substreams data into normalized relational tables with proper foreign keys and indexes.

Schema Design

Example Schema (schema.sql):

-- Blocks table
CREATE TABLE blocks (
    number BIGINT PRIMARY KEY,
    hash VARCHAR(66) NOT NULL,
    timestamp TIMESTAMP NOT NULL,
    parent_hash VARCHAR(66),
    size_bytes BIGINT
);

-- Transactions table  
CREATE TABLE transactions (
    hash VARCHAR(66) PRIMARY KEY,
    block_number BIGINT REFERENCES blocks(number),
    from_addr VARCHAR(42),
    to_addr VARCHAR(42),
    value NUMERIC(78,0),
    gas_limit BIGINT,
    gas_used BIGINT,
    status INTEGER
);

-- ERC20 Transfers
CREATE TABLE erc20_transfers (
    id SERIAL PRIMARY KEY,
    tx_hash VARCHAR(66) REFERENCES transactions(hash),
    log_index INTEGER,
    contract_address VARCHAR(42),
    from_addr VARCHAR(42),
    to_addr VARCHAR(42),
    amount NUMERIC(78,0),
    block_number BIGINT
);

-- Token balances (aggregated)
CREATE TABLE token_balances (
    address VARCHAR(42),
    token_address VARCHAR(42),
    balance NUMERIC(78,0),
    last_updated BIGINT,
    PRIMARY KEY (address, token_address)
);

-- Indexes for performance
CREATE INDEX idx_transfers_contract ON erc20_transfers(contract_address);
CREATE INDEX idx_transfers_from ON erc20_transfers(from_addr);
CREATE INDEX idx_transfers_to ON erc20_transfers(to_addr);
CREATE INDEX idx_transfers_block ON erc20_transfers(block_number);

Rust Implementation:

#[substreams::handlers::map]
pub fn db_out(block: Block) -> Result<DatabaseChanges, Error> {
    let mut tables = Tables::new();

    // Insert block data
    tables
        .create_row("blocks", block.number.to_string())
        .set("number", block.number)
        .set("hash", Hex::encode(&block.hash))
        .set("timestamp", block.timestamp_seconds())
        .set("parent_hash", Hex::encode(&block.parent_hash))
        .set("size_bytes", block.size);

    for (tx_idx, tx) in block.transaction_traces.iter().enumerate() {
        // Insert transaction
        let tx_hash = Hex::encode(&tx.hash);
        tables
            .create_row("transactions", &tx_hash)
            .set("hash", &tx_hash)
            .set("block_number", block.number)
            .set("from_addr", Hex::encode(&tx.from))
            .set("to_addr", Hex::encode(&tx.to))
            .set("value", tx.value.to_string())
            .set("gas_limit", tx.gas_limit)
            .set("gas_used", tx.gas_used)
            .set("status", tx.status);

        // Process ERC20 transfers
        for (log_idx, log) in tx.receipt.logs.iter().enumerate() {
            if is_erc20_transfer(log) {
                let transfer = decode_transfer(log);
                let transfer_id = format!("{}-{}", tx_hash, log_idx);
                
                tables
                    .create_row("erc20_transfers", transfer_id)
                    .set("tx_hash", &tx_hash)
                    .set("log_index", log_idx as i32)
                    .set("contract_address", Hex::encode(&log.address))
                    .set("from_addr", transfer.from)
                    .set("to_addr", transfer.to)
                    .set("amount", transfer.amount)
                    .set("block_number", block.number);
            }
        }
    }

    Ok(tables.to_database_changes())
}

Advantages

  • Normalized data - Proper relational structure
  • Complex queries - JOIN operations, aggregations
  • Data integrity - Foreign keys, constraints
  • Analytics friendly - Optimized for reporting

PostgreSQL Implementation

Setup and Configuration

Docker Setup:

# docker-compose.yml
version: '3.8'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: substreams
      POSTGRES_USER: substreams
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./schema.sql:/docker-entrypoint-initdb.d/schema.sql

volumes:
  postgres_data:

Running the Sink:

# The DSN is passed as a CLI argument, not in the manifest
substreams-sink-sql setup "psql://substreams:password@localhost:5432/substreams?sslmode=disable" ./my-substreams.spkg
substreams-sink-sql run "psql://substreams:password@localhost:5432/substreams?sslmode=disable" ./my-substreams.spkg

# Development mode (allows re-processing):
substreams-sink-sql run --development-mode "psql://substreams:password@localhost:5432/substreams?sslmode=disable" ./my-substreams.spkg

PostgreSQL-Specific Features

JSONB Support:

-- Store complex data as JSONB
CREATE TABLE transaction_logs (
    tx_hash VARCHAR(66),
    log_index INTEGER,
    data JSONB,
    topics JSONB
);

-- Index JSONB fields
CREATE INDEX idx_logs_data ON transaction_logs USING GIN (data);

Materialized Views:

-- Daily transfer volumes
CREATE MATERIALIZED VIEW daily_transfer_volumes AS
SELECT 
    DATE(to_timestamp(timestamp)) as date,
    contract_address,
    COUNT(*) as transfer_count,
    SUM(amount) as total_volume
FROM erc20_transfers t
JOIN transactions tx ON t.tx_hash = tx.hash
GROUP BY DATE(to_timestamp(timestamp)), contract_address;

-- Refresh strategy (handled by Substreams)
CREATE UNIQUE INDEX ON daily_transfer_volumes (date, contract_address);

Performance Tuning:

-- Partitioning by block number
CREATE TABLE erc20_transfers (
    -- columns...
    block_number BIGINT
) PARTITION BY RANGE (block_number);

CREATE TABLE erc20_transfers_y2024 PARTITION OF erc20_transfers
    FOR VALUES FROM (18000000) TO (20000000);

-- Parallel processing
SET max_parallel_workers_per_gather = 4;
SET max_parallel_workers = 8;

Best Practices for PostgreSQL

  1. Use appropriate data types:

    -- Use NUMERIC for big integers (token amounts)
    amount NUMERIC(78,0)  -- Not BIGINT
    
    -- Use proper VARCHAR sizes
    address VARCHAR(42)   -- Ethereum addresses
    hash VARCHAR(66)      -- Transaction hashes
    
  2. Index strategically:

    -- Query-specific indexes
    CREATE INDEX idx_transfers_token_date ON erc20_transfers(contract_address, block_number);
    
    -- Partial indexes for active data
    CREATE INDEX idx_active_balances ON token_balances(address) WHERE balance > 0;
    
  3. Use constraints:

    -- Data validation
    ALTER TABLE erc20_transfers ADD CONSTRAINT check_positive_amount 
        CHECK (amount >= 0);
    
    -- Foreign keys for referential integrity
    ALTER TABLE erc20_transfers ADD CONSTRAINT fk_transaction
        FOREIGN KEY (tx_hash) REFERENCES transactions(hash);
    

ClickHouse Implementation

Setup and Configuration

Docker Setup:

# docker-compose.yml
version: '3.8'
services:
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "9000:9000"
      - "8123:8123"
    environment:
      CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
    volumes:
      - clickhouse_data:/var/lib/clickhouse
      - ./clickhouse-schema.sql:/docker-entrypoint-initdb.d/schema.sql

volumes:
  clickhouse_data:

Manifest Configuration (ClickHouse):

# In substreams.yaml, set engine to clickhouse
sink:
  module: db_out
  type: sf.substreams.sink.sql.v1.Service
  config:
    schema: ./clickhouse-schema.sql
    engine: clickhouse

Running the Sink (DSN passed on command line):

substreams-sink-sql setup "clickhouse://default:@localhost:9000/default" ./my-substreams.spkg
substreams-sink-sql run "clickhouse://default:@localhost:9000/default" ./my-substreams.spkg

ClickHouse Schema Design

Optimized for Analytics:

-- Blocks table with MergeTree engine
CREATE TABLE blocks (
    number UInt64,
    hash String,
    timestamp DateTime,
    parent_hash String,
    size_bytes UInt64,
    tx_count UInt32
) ENGINE = MergeTree()
ORDER BY number
SETTINGS index_granularity = 8192;

-- ERC20 transfers optimized for time-series queries
CREATE TABLE erc20_transfers (
    block_number UInt64,
    tx_hash String,
    log_index UInt32,
    contract_address String,
    from_addr String,
    to_addr String,
    amount UInt256,
    timestamp DateTime,
    
    -- Pre-computed fields for analytics
    hour DateTime,
    date Date
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (contract_address, timestamp, block_number)
SETTINGS index_granularity = 8192;

-- Materialized view for real-time aggregations
CREATE MATERIALIZED VIEW hourly_transfer_stats
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (contract_address, hour)
AS SELECT
    contract_address,
    toStartOfHour(timestamp) AS hour,
    count() AS transfer_count,
    sum(amount) AS total_volume
FROM erc20_transfers
GROUP BY contract_address, hour;

ClickHouse-Specific Features:

-- Compression and optimization
ALTER TABLE erc20_transfers MODIFY COLUMN amount Codec(Delta, ZSTD);

-- TTL for data lifecycle management
ALTER TABLE erc20_transfers MODIFY TTL date + INTERVAL 2 YEAR;

-- Dictionaries for dimension data
CREATE DICTIONARY token_metadata (
    address String,
    symbol String,
    decimals UInt8,
    name String
) PRIMARY KEY address
SOURCE(POSTGRESQL(
    host 'postgres'
    port 5432
    user 'substreams'
    password 'password'
    db 'substreams'
    table 'token_metadata'
))
LIFETIME(300)
LAYOUT(HASHED());

Performance Optimization

Partitioning Strategy:

-- Monthly partitioning for time-series data
CREATE TABLE erc20_transfers_distributed AS erc20_transfers
ENGINE = Distributed('cluster', 'default', 'erc20_transfers', rand());

-- Partition pruning
SELECT * FROM erc20_transfers 
WHERE date >= '2024-01-01' AND date < '2024-02-01';

Query Optimization:

-- Use projection for common query patterns
ALTER TABLE erc20_transfers ADD PROJECTION daily_stats (
    SELECT 
        date,
        contract_address,
        sum(amount),
        count()
    GROUP BY date, contract_address
);

-- Pre-aggregated tables
CREATE TABLE daily_token_stats
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, contract_address)
AS SELECT
    date,
    contract_address,
    sumState(amount) as total_volume,
    countState() as transfer_count
FROM erc20_transfers
GROUP BY date, contract_address;

Materialized Views and Real-time Updates

Concept

Materialized views provide pre-computed query results that update automatically as new data arrives.

Implementation Patterns

PostgreSQL Materialized Views:

-- Token holder counts
CREATE MATERIALIZED VIEW token_holder_counts AS
SELECT 
    token_address,
    COUNT(DISTINCT address) as holder_count,
    SUM(balance) as total_supply
FROM token_balances 
WHERE balance > 0
GROUP BY token_address;

-- Refresh trigger (automated by Substreams)
CREATE OR REPLACE FUNCTION refresh_token_holder_counts()
RETURNS TRIGGER AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY token_holder_counts;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER token_balance_change_trigger
    AFTER INSERT OR UPDATE OR DELETE ON token_balances
    FOR EACH STATEMENT
    EXECUTE FUNCTION refresh_token_holder_counts();

ClickHouse Materialized Views:

-- Real-time price calculations
CREATE MATERIALIZED VIEW token_prices_mv
ENGINE = ReplacingMergeTree(timestamp)
ORDER BY (token_address, timestamp)
AS SELECT
    token_address,
    timestamp,
    price_usd,
    volume_24h,
    market_cap
FROM (
    -- Complex price calculation logic
    SELECT 
        t.contract_address as token_address,
        toStartOfMinute(t.timestamp) as timestamp,
        calculatePrice(t.amount, p.eth_price) as price_usd,
        sum(t.amount) OVER (
            PARTITION BY t.contract_address 
            ORDER BY t.timestamp 
            RANGE BETWEEN INTERVAL 24 HOUR PRECEDING AND CURRENT ROW
        ) as volume_24h
    FROM erc20_transfers t
    JOIN eth_prices p ON toStartOfMinute(t.timestamp) = p.timestamp
    WHERE t.contract_address = '0x...' -- USDC
);

Update Strategies

Incremental Updates:

// Track last processed block for incremental updates
#[substreams::handlers::store]
pub fn store_last_block(block: Block, store: StoreSetInt64) {
    store.set(0, "last_processed_block", &(block.number as i64));
}

// Only process new data
#[substreams::handlers::map]
pub fn incremental_db_out(
    block: Block,
    last_block_store: StoreGetInt64
) -> Result<DatabaseChanges, Error> {
    let last_processed = last_block_store.get_last("last_processed_block")
        .unwrap_or(0);

    if block.number <= last_processed as u64 {
        return Ok(DatabaseChanges::default()); // Skip already processed
    }

    // Process only new block data
    process_block_data(block)
}

Cursor-based Consistency:

Cursor management is handled automatically by substreams-sink-sql. The sink stores cursor state in the cursors system table, enabling automatic resumption and reorg handling. No special manifest configuration is needed — just run substreams-sink-sql setup followed by run.

Advanced Patterns

Multi-Database Sinks

To stream to multiple databases, create separate substreams packages (or separate sink runs) with different sink: configurations:

# operational-substreams.yaml - PostgreSQL for operational data
sink:
  module: db_out_operational
  type: sf.substreams.sink.sql.v1.Service
  config:
    schema: ./operational-schema.sql
    engine: postgres
# analytics-substreams.yaml - ClickHouse for analytics
sink:
  module: db_out_analytics
  type: sf.substreams.sink.sql.v1.Service
  config:
    schema: ./analytics-schema.sql
    engine: clickhouse
# Run each sink separately with its own DSN
substreams-sink-sql run "psql://user:pass@postgres:5432/operational" operational.spkg &
substreams-sink-sql run "clickhouse://default:@clickhouse:9000/analytics" analytics.spkg &

Data Transformation Pipelines

// Multi-stage data processing
#[substreams::handlers::map]
pub fn extract_events(block: Block) -> Result<RawEvents, Error> {
    // Stage 1: Extract raw events
    extract_raw_blockchain_events(block)
}

#[substreams::handlers::map]  
pub fn enrich_events(raw_events: RawEvents) -> Result<EnrichedEvents, Error> {
    // Stage 2: Enrich with metadata, decode parameters
    enrich_with_token_metadata(raw_events)
}

#[substreams::handlers::map]
pub fn db_out_operational(enriched: EnrichedEvents) -> Result<DatabaseChanges, Error> {
    // Stage 3: Transform for operational database (normalized)
    create_operational_tables(enriched)
}

#[substreams::handlers::map]
pub fn db_out_analytics(enriched: EnrichedEvents) -> Result<DatabaseChanges, Error> {
    // Stage 4: Transform for analytics database (denormalized)
    create_analytics_tables(enriched)
}

Troubleshooting

Common Issues

Connection Problems:

# Test database connectivity
psql "postgresql://user:pass@localhost:5432/db" -c "SELECT version();"
clickhouse-client --host localhost --port 9000 --query "SELECT version()"

# Check sink logs
substreams run -s 1000000 -t +1000 db_out --debug

Schema Mismatches:

# Compare expected vs actual schema
pg_dump --schema-only dbname > current_schema.sql
diff schema.sql current_schema.sql

# Re-run setup to apply schema changes (drops and recreates in dev mode)
substreams-sink-sql setup "psql://..." my-substreams.spkg

Performance Issues:

-- Monitor query performance
EXPLAIN ANALYZE SELECT * FROM erc20_transfers WHERE contract_address = '0x...';

-- Check index usage
SELECT schemaname, tablename, indexname, idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes 
ORDER BY idx_scan DESC;

-- ClickHouse query profiling
SELECT * FROM system.query_log 
WHERE type = 'QueryFinish' 
ORDER BY event_time DESC 
LIMIT 10;

Data Consistency

Reorg Handling:

// Proper reorg handling in stores
#[substreams::handlers::store]
pub fn store_balances(events: Events, store: StoreSetBigInt) {
    for transfer in events.transfers {
        // Use ordinal for correct ordering within a block
        let key = format!("{}:{}", transfer.contract, transfer.from);
        store.set(transfer.ordinal, &key, &transfer.amount);
    }
}

Cursor Management:

Cursor state is automatically persisted in the cursors table created by substreams-sink-sql setup. On restart, the sink resumes from the last committed cursor. In development mode (--development-mode), undos are handled automatically for reorg safety.

Monitoring and Alerting

Key Metrics:

-- PostgreSQL monitoring
SELECT 
    schemaname,
    tablename,
    n_tup_ins as inserts,
    n_tup_upd as updates,
    n_tup_del as deletes
FROM pg_stat_user_tables 
ORDER BY n_tup_ins DESC;

-- ClickHouse monitoring  
SELECT 
    table,
    sum(rows) as total_rows,
    sum(bytes_on_disk) as size_bytes
FROM system.parts 
WHERE active = 1
GROUP BY table
ORDER BY size_bytes DESC;

Health Checks:

#!/bin/bash
# Database health monitoring script

# Check latest block processed
LATEST_BLOCK=$(psql $DSN -t -c "SELECT MAX(block_number) FROM transactions;")
CHAIN_HEAD=$(curl -s https://api.etherscan.io/api?module=proxy&action=eth_blockNumber | jq -r .result)

LAG=$((CHAIN_HEAD - LATEST_BLOCK))
if [ $LAG -gt 100 ]; then
    echo "WARNING: Database is $LAG blocks behind chain head"
    exit 1
fi

echo "Database is healthy, lag: $LAG blocks"

Resources

Getting Help

Related Skills

Xlsx

Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas

data

Clickhouse Io

ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.

datacli

Clickhouse Io

ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.

datacli

Analyzing Financial Statements

This skill calculates key financial ratios and metrics from financial statement data for investment analysis

data

Data Storytelling

Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.

data

Kpi Dashboard Design

Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.

designdata

Dbt Transformation Patterns

Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.

testingdocumenttool

Sql Optimization Patterns

Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.

designdata

Clinical Decision Support

Generate professional clinical decision support (CDS) documents for pharmaceutical and clinical research settings, including patient cohort analyses (biomarker-stratified with outcomes) and treatment recommendation reports (evidence-based guidelines with decision algorithms). Supports GRADE evidence grading, statistical analysis (hazard ratios, survival curves, waterfall plots), biomarker integration, and regulatory compliance. Outputs publication-ready LaTeX/PDF format optimized for drug develo

developmentdocumentcli

Anndata

This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.

arttooldata

Skill Information

Category:Technical
License:Apache-2.0
Version:1.0.0
Last Updated:1/30/2026