Data_Engineer
by ColbyRReichenbach
Database schema design, ETL pipelines, and data quality for RetentionAI. Use this skill when working on: (1) Database schema design and migrations, (2) ETL pipelines (GA4, Stripe, Shopify), (3) Data quality validation, (4) Multi-tenant JSONB patterns, (5) Query optimization and indexing, (6) Data generation and seeding.
Skill Details
Repository Files
1 file in this skill directory
name: data_engineer description: Database schema design, ETL pipelines, and data quality for RetentionAI. Use this skill when working on: (1) Database schema design and migrations, (2) ETL pipelines (GA4, Stripe, Shopify), (3) Data quality validation, (4) Multi-tenant JSONB patterns, (5) Query optimization and indexing, (6) Data generation and seeding.
Data Engineer Skill
Core Responsibilities
- Design flexible multi-tenant database schemas using JSONB
- Build ETL pipelines (BigQuery → CSV → Postgres)
- Implement data quality checks and validation
- Optimize queries with proper indexing
- Generate synthetic data for testing
Multi-Tenant Schema Patterns
Universal Fields + JSONB Strategy
Core Principle: Fixed columns for universal attributes, JSONB for company-specific fields.
-- CUSTOMERS TABLE
CREATE TABLE customers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL REFERENCES organizations(id),
external_id VARCHAR(255) NOT NULL, -- Their system's ID
email VARCHAR(255),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Universal behavioral metrics
first_purchase_at TIMESTAMPTZ,
last_purchase_at TIMESTAMPTZ,
purchase_count INTEGER DEFAULT 0,
total_revenue DECIMAL(12,2) DEFAULT 0,
-- Flexible custom fields
custom_attributes JSONB DEFAULT '{}'::jsonb,
CONSTRAINT unique_customer_per_org UNIQUE(org_id, external_id)
);
-- Critical indexes
CREATE INDEX idx_customers_org_id ON customers(org_id);
CREATE INDEX idx_customers_org_email ON customers(org_id, email);
CREATE INDEX idx_customers_org_external ON customers(org_id, external_id);
CREATE INDEX idx_customers_last_purchase ON customers(org_id, last_purchase_at DESC);
CREATE INDEX idx_custom_attributes_gin ON customers USING GIN (custom_attributes jsonb_path_ops);
-- Row-level security (automatic tenant isolation)
ALTER TABLE customers ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON customers
USING (org_id = current_setting('app.current_tenant', true)::uuid);
JSONB Query Patterns
-- Query nested JSONB fields
SELECT * FROM customers
WHERE org_id = 'abc-123'
AND custom_attributes->>'nps_score' IS NOT NULL
AND (custom_attributes->>'nps_score')::int > 7;
-- Index specific JSONB paths for performance
CREATE INDEX idx_nps_score ON customers
((custom_attributes->>'nps_score'))
WHERE custom_attributes->>'nps_score' IS NOT NULL;
-- Aggregate JSONB values
SELECT
AVG((custom_attributes->>'nps_score')::int) as avg_nps,
COUNT(*) FILTER (WHERE (custom_attributes->>'plan_type') = 'enterprise') as enterprise_count
FROM customers
WHERE org_id = 'abc-123';
Events Table (Flexible Event Stream)
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL REFERENCES organizations(id),
customer_id UUID NOT NULL REFERENCES customers(id) ON DELETE CASCADE,
event_type VARCHAR(100) NOT NULL,
event_timestamp TIMESTAMPTZ NOT NULL,
source VARCHAR(50) NOT NULL, -- stripe, shopify, segment, etc.
properties JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Optimized indexes for common query patterns
CREATE INDEX idx_events_org_customer_time ON events(org_id, customer_id, event_timestamp DESC);
CREATE INDEX idx_events_org_type_time ON events(org_id, event_type, event_timestamp DESC);
CREATE INDEX idx_events_timestamp ON events(event_timestamp DESC) WHERE event_timestamp > NOW() - INTERVAL '90 days';
CREATE INDEX idx_events_properties_gin ON events USING GIN (properties jsonb_path_ops);
-- Partition by month for scalability (optional, for high-volume tenants)
-- CREATE TABLE events_2025_01 PARTITION OF events FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
Event Schema Examples
// Stripe payment failure
{
"event_type": "payment_failed",
"source": "stripe",
"properties": {
"amount": 9900,
"currency": "usd",
"failure_code": "card_declined",
"invoice_id": "in_abc123",
"attempt_count": 2
}
}
// Shopify order
{
"event_type": "order_created",
"source": "shopify",
"properties": {
"order_id": "12345",
"total_price": 125.50,
"items": [
{"product_id": "prod_123", "quantity": 2, "price": 50.00}
],
"shipping_country": "US",
"tags": ["first_order", "email_campaign"]
}
}
// Custom support ticket
{
"event_type": "support_ticket_created",
"source": "zendesk",
"properties": {
"ticket_id": "TKT-5678",
"category": "billing",
"priority": "high",
"resolution_time_hours": null
}
}
### EventLog Table (Input Idempotency & Audit)
**Purpose:** Raw immutable log of every incoming webhook/request for auditing and replay. Distinct from the `events` analytics table.
```sql
CREATE TABLE event_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
request_id VARCHAR(255) NOT NULL UNIQUE, -- Idempotency Key
source VARCHAR(50) NOT NULL, -- stripe, shopify
event_type VARCHAR(100) NOT NULL,
payload JSONB, -- Full raw body
status VARCHAR(20) DEFAULT 'received', -- received, processed, failed
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- No org_id needed if purely infrastructure-level (but recommended for full isolation)
🛡️ MCP Verification Protocol
Critical: You must use the Postgres MCP to verify schema changes and Row-Level Security (RLS) isolation.
1. Schema Verification
When: After creating a migration.
Action: Use postgres.describe_table to verify columns and indexes match the design.
2. RLS Isolation Test (Red Team)
When: Modifying any table with org_id.
Action:
- Connect via Postgres MCP.
- Run
SELECT count(*) FROM table_namewithout settingapp.current_tenant. - Verify result is 0.
Predictions Table (Model Outputs)
CREATE TABLE predictions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL REFERENCES organizations(id),
customer_id UUID NOT NULL REFERENCES customers(id) ON DELETE CASCADE,
prediction_type VARCHAR(50) NOT NULL, -- clv, churn_risk, uplift_segment
predicted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
value DECIMAL(12,4), -- CLV amount, churn probability, etc.
confidence DECIMAL(5,4), -- Model confidence (0-1)
segment VARCHAR(50), -- persuadable, sure_thing, lost_cause
model_version VARCHAR(20) NOT NULL,
features_used JSONB,
expires_at TIMESTAMPTZ, -- For cache invalidation
CONSTRAINT unique_prediction UNIQUE(org_id, customer_id, prediction_type, predicted_at)
);
CREATE INDEX idx_predictions_org_customer_type ON predictions(org_id, customer_id, prediction_type);
CREATE INDEX idx_predictions_org_expires ON predictions(org_id, expires_at) WHERE expires_at IS NOT NULL;
CREATE INDEX idx_predictions_segment ON predictions(org_id, segment) WHERE segment IS NOT NULL;
-- Automatically delete expired predictions
CREATE INDEX idx_predictions_expired ON predictions(expires_at) WHERE expires_at < NOW();
Campaigns Table (Retention Actions)
CREATE TABLE campaigns (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL REFERENCES organizations(id),
name VARCHAR(255) NOT NULL,
segment VARCHAR(50) NOT NULL, -- which segment to target
offer_type VARCHAR(50), -- 10_percent_off, free_shipping, etc.
channel VARCHAR(20) DEFAULT 'email',
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE campaign_sends (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
campaign_id UUID NOT NULL REFERENCES campaigns(id) ON DELETE CASCADE,
customer_id UUID NOT NULL REFERENCES customers(id) ON DELETE CASCADE,
org_id UUID NOT NULL REFERENCES organizations(id),
sent_at TIMESTAMPTZ NOT NULL,
opened_at TIMESTAMPTZ,
clicked_at TIMESTAMPTZ,
converted_at TIMESTAMPTZ,
in_holdout_group BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_campaign_sends_campaign ON campaign_sends(campaign_id, sent_at DESC);
CREATE INDEX idx_campaign_sends_customer ON campaign_sends(org_id, customer_id, sent_at DESC);
CREATE INDEX idx_campaign_sends_holdout ON campaign_sends(org_id, in_holdout_group, sent_at DESC);
Organizations Table (Tenant Configuration)
CREATE TABLE organizations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(100) NOT NULL UNIQUE, -- company-abc
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE org_config (
org_id UUID PRIMARY KEY REFERENCES organizations(id) ON DELETE CASCADE,
field_mappings JSONB NOT NULL DEFAULT '{}'::jsonb,
event_mappings JSONB NOT NULL DEFAULT '{}'::jsonb,
model_settings JSONB NOT NULL DEFAULT '{}'::jsonb,
enabled_features JSONB NOT NULL DEFAULT '{}'::jsonb,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Example org_config.model_settings
{
"clv_horizon_months": 12,
"churn_threshold": 0.2,
"holdout_percentage": 0.05,
"min_purchase_count": 2,
"retrain_frequency_days": 7
}
ETL Pipeline Design
Phase 1: Extract (BigQuery → CSV)
-- data/extract/ga4_extract.sql
-- Extract purchase events from public GA4 dataset
SELECT
user_pseudo_id,
PARSE_TIMESTAMP('%Y%m%d', event_date) as event_date,
TIMESTAMP_MICROS(event_timestamp) as event_timestamp,
event_name,
geo.city,
geo.country,
traffic_source.medium as traffic_medium,
(SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'ga_session_id') as session_id,
(SELECT value.string_value FROM UNNEST(event_params) WHERE key = 'page_location') as page_url,
items.item_id,
items.price,
items.quantity
FROM `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_*`,
UNNEST(items) as items
WHERE event_name IN ('purchase', 'add_to_cart', 'view_item', 'session_start')
AND _TABLE_SUFFIX BETWEEN '20201201' AND '20210101'
ORDER BY event_timestamp
Phase 2: Transform (Clean + Enrich)
# data/transform/etl.py
import pandas as pd
import requests
from datetime import datetime, timedelta
def transform_ga4_events(df: pd.DataFrame) -> pd.DataFrame:
"""Clean and transform GA4 events"""
# Deduplicate
df = df.drop_duplicates(subset=['user_pseudo_id', 'event_timestamp', 'event_name'])
# Type conversions
df['event_timestamp'] = pd.to_datetime(df['event_timestamp'])
df['price'] = pd.to_numeric(df['price'], errors='coerce')
df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce')
# Calculate total_revenue per transaction
df['total_revenue'] = df['price'] * df['quantity']
return df
def enrich_with_weather(df: pd.DataFrame) -> pd.DataFrame:
"""Add weather context using Open-Meteo API"""
# Group by city + date to minimize API calls
unique_locations = df[['city', 'country', 'event_date']].drop_duplicates()
weather_data = []
for _, row in unique_locations.iterrows():
date = row['event_date'].strftime('%Y-%m-%d')
# Get coordinates (simplified - use geocoding in production)
coords = get_city_coords(row['city'], row['country'])
# Open-Meteo API (free, no key needed)
url = f"https://archive-api.open-meteo.com/v1/archive"
params = {
"latitude": coords['lat'],
"longitude": coords['lon'],
"start_date": date,
"end_date": date,
"daily": "temperature_2m_mean,precipitation_sum"
}
response = requests.get(url, params=params)
if response.ok:
data = response.json()
weather_data.append({
'city': row['city'],
'event_date': row['event_date'],
'temperature_celsius': data['daily']['temperature_2m_mean'][0],
'precipitation_mm': data['daily']['precipitation_sum'][0]
})
weather_df = pd.DataFrame(weather_data)
return df.merge(weather_df, on=['city', 'event_date'], how='left')
def synthesize_email_opens(df: pd.DataFrame) -> pd.DataFrame:
"""Back-propagate email open events from traffic_medium == 'email'"""
email_sessions = df[df['traffic_medium'] == 'email'].copy()
# For each email-sourced session, create an email_open event 30s-5min before
email_opens = []
for _, row in email_sessions.iterrows():
email_opens.append({
'user_pseudo_id': row['user_pseudo_id'],
'event_name': 'email_open',
'event_timestamp': row['event_timestamp'] - timedelta(seconds=random.randint(30, 300)),
'properties': {
'campaign_source': 'retention',
'email_type': 'winback'
}
})
return pd.concat([df, pd.DataFrame(email_opens)], ignore_index=True)
Phase 3: Load (CSV → Postgres)
# data/load/seed_database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import pandas as pd
def load_customers(df: pd.DataFrame, org_id: str, session: Session):
"""Bulk insert customers with JSONB custom_attributes"""
# Aggregate by user
customer_df = df.groupby('user_pseudo_id').agg({
'event_timestamp': ['min', 'max', 'count'],
'total_revenue': 'sum'
}).reset_index()
customer_df.columns = ['external_id', 'first_purchase_at', 'last_purchase_at', 'purchase_count', 'total_revenue']
# Prepare bulk insert data
customers = []
for _, row in customer_df.iterrows():
customers.append({
'org_id': org_id,
'external_id': row['external_id'],
'email': f"{row['external_id']}@example.com", # Synthetic
'first_purchase_at': row['first_purchase_at'],
'last_purchase_at': row['last_purchase_at'],
'purchase_count': row['purchase_count'],
'total_revenue': float(row['total_revenue']),
'custom_attributes': {} # Will be populated by company-specific data
})
# Bulk insert (fast)
session.execute(
Customer.__table__.insert(),
customers
)
session.commit()
def load_events(df: pd.DataFrame, org_id: str, customer_map: dict, session: Session):
"""Bulk insert events with JSONB properties"""
events = []
for _, row in df.iterrows():
customer_id = customer_map.get(row['user_pseudo_id'])
if not customer_id:
continue
events.append({
'org_id': org_id,
'customer_id': customer_id,
'event_type': row['event_name'],
'event_timestamp': row['event_timestamp'],
'source': 'ga4',
'properties': {
'item_id': row.get('item_id'),
'price': float(row.get('price', 0)),
'quantity': int(row.get('quantity', 0)),
'traffic_medium': row.get('traffic_medium'),
'city': row.get('city'),
'temperature_celsius': row.get('temperature_celsius'),
'precipitation_mm': row.get('precipitation_mm')
}
})
# Batch insert (chunk by 1000 for performance)
from itertools import islice
batch_size = 1000
for i in range(0, len(events), batch_size):
batch = events[i:i+batch_size]
session.execute(
Event.__table__.insert(),
batch
)
session.commit()
print(f"Inserted batch {i//batch_size + 1}: {len(batch)} events")
Replay Engine (Simulate Live Events)
# data/replay/replay_engine.py
import time
from datetime import datetime, timedelta
import pandas as pd
import requests
class ReplayEngine:
"""Time-shifts historical data to 'now' and streams events in real-time"""
def __init__(self, events_csv: str, webhook_url: str):
self.df = pd.read_csv(events_csv)
self.df['event_timestamp'] = pd.to_datetime(self.df['event_timestamp'])
self.webhook_url = webhook_url
# Calculate time shift
first_event = self.df['event_timestamp'].min()
self.time_delta = datetime.now() - first_event
print(f"Time shift: {self.time_delta}")
def replay(self, speed_multiplier: float = 1.0):
"""Stream events in chronological order with timing preserved"""
self.df['shifted_timestamp'] = self.df['event_timestamp'] + self.time_delta
self.df = self.df.sort_values('shifted_timestamp')
start_time = datetime.now()
for idx, row in self.df.iterrows():
# Calculate when this event should fire
target_time = row['shifted_timestamp']
wait_seconds = (target_time - datetime.now()).total_seconds() / speed_multiplier
if wait_seconds > 0:
time.sleep(wait_seconds)
# Send webhook for actionable events only
if row['event_type'] == 'payment_failed':
self._send_webhook(row)
if idx % 1000 == 0:
print(f"Replayed {idx} events...")
def _send_webhook(self, event: pd.Series):
"""POST to webhook endpoint"""
payload = {
"type": f"customer.{event['event_type']}",
"data": {
"customer": event['user_pseudo_id'],
"amount": int(event.get('total_revenue', 0) * 100), # cents
"timestamp": event['shifted_timestamp'].isoformat()
}
}
try:
response = requests.post(self.webhook_url, json=payload, timeout=5)
print(f"Webhook sent: {payload['type']} -> {response.status_code}")
except Exception as e:
print(f"Webhook failed: {e}")
# Usage
if __name__ == "__main__":
engine = ReplayEngine(
events_csv="data/processed/events.csv",
webhook_url="http://localhost:8000/webhook/stripe"
)
engine.replay(speed_multiplier=1000) # 1000x speed for demo
Data Quality Checks
# src/services/data_quality_service.py
from sqlalchemy import func
from datetime import datetime, timedelta
class DataQualityService:
"""Monitor data freshness, completeness, and anomalies"""
def check_freshness(self, org_id: str) -> dict:
"""Ensure data is < 4 hours old"""
latest_event = db.query(func.max(Event.event_timestamp)) \
.filter(Event.org_id == org_id) \
.scalar()
if not latest_event:
return {"status": "error", "message": "No events found"}
age_hours = (datetime.now() - latest_event).total_seconds() / 3600
return {
"status": "healthy" if age_hours < 4 else "stale",
"latest_event": latest_event.isoformat(),
"age_hours": round(age_hours, 2)
}
def check_completeness(self, org_id: str) -> dict:
"""Check for missing critical fields"""
null_email_count = db.query(func.count(Customer.id)) \
.filter(Customer.org_id == org_id, Customer.email.is_(None)) \
.scalar()
total_customers = db.query(func.count(Customer.id)) \
.filter(Customer.org_id == org_id) \
.scalar()
null_rate = null_email_count / total_customers if total_customers > 0 else 0
return {
"status": "healthy" if null_rate < 0.05 else "degraded",
"null_email_count": null_email_count,
"null_rate": round(null_rate, 4),
"total_customers": total_customers
}
def detect_anomalies(self, org_id: str) -> dict:
"""Detect sudden changes in event volume"""
# Compare last hour vs. previous hour
now = datetime.now()
last_hour_count = db.query(func.count(Event.id)) \
.filter(
Event.org_id == org_id,
Event.event_timestamp >= now - timedelta(hours=1)
).scalar()
prev_hour_count = db.query(func.count(Event.id)) \
.filter(
Event.org_id == org_id,
Event.event_timestamp >= now - timedelta(hours=2),
Event.event_timestamp < now - timedelta(hours=1)
).scalar()
if prev_hour_count == 0:
return {"status": "unknown", "message": "Insufficient history"}
change_pct = (last_hour_count - prev_hour_count) / prev_hour_count
return {
"status": "anomaly" if abs(change_pct) > 0.5 else "normal",
"last_hour_count": last_hour_count,
"prev_hour_count": prev_hour_count,
"change_pct": round(change_pct, 2)
}
Migration Strategy
Use Alembic for schema changes:
# Create migration
alembic revision --autogenerate -m "Add weather fields to events"
# Apply migration
alembic upgrade head
# Rollback
alembic downgrade -1
Example migration:
# alembic/versions/001_add_weather_fields.py
def upgrade():
op.execute("""
ALTER TABLE events
ADD COLUMN IF NOT EXISTS weather_data JSONB DEFAULT '{}'::jsonb;
CREATE INDEX IF NOT EXISTS idx_events_weather
ON events USING GIN (weather_data jsonb_path_ops);
""")
def downgrade():
op.execute("""
DROP INDEX IF EXISTS idx_events_weather;
ALTER TABLE events DROP COLUMN IF EXISTS weather_data;
""")
Performance Optimization
Query Optimization Checklist
- Use indexes on
org_id(multi-tenant isolation) - Add indexes on timestamp columns for time-based queries
- Use GIN indexes for JSONB queries
- Enable row-level security (RLS) for automatic tenant filtering
- Use
EXPLAIN ANALYZEto identify slow queries - Consider table partitioning for high-volume tables (> 10M rows)
JSONB Best Practices
- Use
jsonb(notjson) for indexable, queryable storage - Create GIN indexes with
jsonb_path_opsfor containment queries - Create expression indexes for frequently queried paths
- Avoid deeply nested structures (max 3 levels)
- Use top-level keys for filterable attributes
Common Pitfalls
❌ Don't: Query JSONB without indexes
-- Slow: Full table scan
SELECT * FROM customers
WHERE custom_attributes->>'nps_score' = '8';
✅ Do: Create expression index
CREATE INDEX idx_nps_score ON customers
((custom_attributes->>'nps_score'));
-- Fast: Uses index
SELECT * FROM customers
WHERE custom_attributes->>'nps_score' = '8';
❌ Don't: Forget org_id in queries
-- Dangerous: Returns data from all tenants
SELECT * FROM customers WHERE email = 'user@example.com';
✅ Do: Always filter by org_id
-- Safe: Tenant-isolated
SELECT * FROM customers
WHERE org_id = 'abc-123' AND email = 'user@example.com';
Testing Data Pipelines
# tests/test_etl.py
import pytest
from data.transform.etl import transform_ga4_events
def test_deduplicate_events():
"""Verify duplicate events are removed"""
df = pd.DataFrame({
'user_pseudo_id': ['user1', 'user1', 'user2'],
'event_timestamp': ['2024-01-01', '2024-01-01', '2024-01-02'],
'event_name': ['purchase', 'purchase', 'purchase']
})
result = transform_ga4_events(df)
assert len(result) == 2 # One duplicate removed
def test_weather_enrichment():
"""Verify weather API integration"""
df = pd.DataFrame({
'city': ['San Francisco'],
'country': ['US'],
'event_date': [pd.Timestamp('2024-01-01')]
})
result = enrich_with_weather(df)
assert 'temperature_celsius' in result.columns
assert result['temperature_celsius'].notna().all()
Summary
- Use JSONB for flexible, company-specific fields
- Always include
org_idfor multi-tenant isolation - Create proper indexes (GIN for JSONB, B-tree for timestamps)
- Validate data quality (freshness, completeness, anomalies)
- Use Alembic for schema migrations
- Test ETL pipelines with unit tests
Related Skills
Xlsx
Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Analyzing Financial Statements
This skill calculates key financial ratios and metrics from financial statement data for investment analysis
Data Storytelling
Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.
Team Composition Analysis
This skill should be used when the user asks to "plan team structure", "determine hiring needs", "design org chart", "calculate compensation", "plan equity allocation", or requests organizational design and headcount planning for a startup.
Kpi Dashboard Design
Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.
Dbt Transformation Patterns
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
Sql Optimization Patterns
Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.
Anndata
This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.
