Dlt
by mmbianco78
dlt (data load tool) patterns for SignalRoom ETL pipelines. Use when creating sources, debugging pipeline failures, understanding schema evolution, or implementing incremental loading.
Skill Details
Repository Files
1 file in this skill directory
name: dlt description: dlt (data load tool) patterns for SignalRoom ETL pipelines. Use when creating sources, debugging pipeline failures, understanding schema evolution, or implementing incremental loading.
dlt Data Load Tool
Core Concepts
dlt handles extract, normalize, and load. You define sources and resources; dlt handles schema inference, table creation, and loading.
Source Structure
src/signalroom/sources/{source_name}/
└── __init__.py # Contains @dlt.source and @dlt.resource
Creating a New Source
import dlt
from signalroom.common import settings
@dlt.source(name="my_source")
def my_source():
"""Source docstring appears in dlt metadata."""
@dlt.resource(write_disposition="append", primary_key="id")
def my_resource():
yield from fetch_data()
return [my_resource]
Register in Pipeline Runner
Add to src/signalroom/pipelines/runner.py:
SOURCES = {
"my_source": "signalroom.sources.my_source:my_source",
}
Write Dispositions
| Mode | Use Case | Behavior |
|---|---|---|
append |
Immutable events (clicks, conversions) | Always insert new rows |
merge |
Mutable entities (campaigns, contacts) | Upsert by primary_key |
replace |
Full refresh (feature flags, config) | Drop and recreate table |
Incremental Loading
Only fetch new data since last run:
@dlt.resource(write_disposition="append", primary_key="id")
def events(
updated_at: dlt.sources.incremental[str] = dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01"
)
):
# Only fetches records after last loaded timestamp
yield from api.get_events(since=updated_at.last_value)
WARNING: High-Volume Sources
dlt.sources.incremental tracks every row for deduplication. If many rows share the same cursor value, this causes O(n²) performance.
| Rows per cursor value | Overhead | Recommendation |
|---|---|---|
| < 100 | Negligible | Use incremental |
| 100 - 1,000 | Noticeable | Monitor performance |
| > 1,000 | Severe | Use file-level state instead |
For high-volume sources (like S3 CSV imports), use dlt.current.resource_state() for file-level tracking:
@dlt.resource(write_disposition="merge", primary_key=["file_name", "row_id"])
def csv_resource():
state = dlt.current.resource_state()
last_date = state.get("last_file_date", "2024-01-01")
for file in get_files_since(last_date):
yield from process_file(file)
state["last_file_date"] = file.date # Manual state update
Primary Keys
Required for merge disposition:
# Single key
@dlt.resource(primary_key="id")
# Composite key
@dlt.resource(primary_key=["date", "affiliate_id"])
Schema Evolution
dlt auto-evolves schemas. New columns added automatically. To see current schema:
SELECT * FROM {schema}._dlt_loads ORDER BY inserted_at DESC LIMIT 5;
Debugging Failed Loads
Check dlt metadata tables
-- Recent loads
SELECT load_id, schema_name, status, inserted_at
FROM {schema}._dlt_loads
ORDER BY inserted_at DESC LIMIT 10;
-- Pipeline state
SELECT * FROM {schema}._dlt_pipeline_state;
Common Errors
"Primary key violation"
- Using
appendwhen you needmerge - Duplicate records in source data
"Column type mismatch"
- Schema evolved incompatibly
- Fix: Drop table or add explicit column hints
"Connection refused"
- Check Supabase pooler settings (port 6543, user format)
Drop Pending Packages
If pipeline is stuck:
dlt pipeline {pipeline_name} drop-pending-packages
SignalRoom Sources
| Source | Write Mode | Primary Key | State Tracking |
|---|---|---|---|
s3_exports |
merge | _file_name, _row_id |
File-level (resource_state) |
everflow |
merge | date, affiliate_id, advertiser_id |
Row-level (incremental) |
redtrack |
merge | date, source_id |
Row-level (incremental) |
Testing Locally
Use DuckDB for fast local testing:
pipeline = dlt.pipeline(
pipeline_name="test",
destination="duckdb",
dataset_name="test"
)
Resources
- dlt Documentation
- Write Dispositions
- Schema Evolution
- SignalRoom API Reference:
docs/API_REFERENCE.md— Live docs, auth, request/response examples
Related Skills
Xlsx
Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Analyzing Financial Statements
This skill calculates key financial ratios and metrics from financial statement data for investment analysis
Data Storytelling
Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.
Kpi Dashboard Design
Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.
Dbt Transformation Patterns
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
Sql Optimization Patterns
Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.
Anndata
This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.
Xlsx
Spreadsheet toolkit (.xlsx/.csv). Create/edit with formulas/formatting, analyze data, visualization, recalculate formulas, for spreadsheet processing and analysis.
