Dlt

by mmbianco78

tooldata

dlt (data load tool) patterns for SignalRoom ETL pipelines. Use when creating sources, debugging pipeline failures, understanding schema evolution, or implementing incremental loading.

Skill Details

Repository Files

1 file in this skill directory


name: dlt description: dlt (data load tool) patterns for SignalRoom ETL pipelines. Use when creating sources, debugging pipeline failures, understanding schema evolution, or implementing incremental loading.

dlt Data Load Tool

Core Concepts

dlt handles extract, normalize, and load. You define sources and resources; dlt handles schema inference, table creation, and loading.

Source Structure

src/signalroom/sources/{source_name}/
└── __init__.py  # Contains @dlt.source and @dlt.resource

Creating a New Source

import dlt
from signalroom.common import settings

@dlt.source(name="my_source")
def my_source():
    """Source docstring appears in dlt metadata."""

    @dlt.resource(write_disposition="append", primary_key="id")
    def my_resource():
        yield from fetch_data()

    return [my_resource]

Register in Pipeline Runner

Add to src/signalroom/pipelines/runner.py:

SOURCES = {
    "my_source": "signalroom.sources.my_source:my_source",
}

Write Dispositions

Mode Use Case Behavior
append Immutable events (clicks, conversions) Always insert new rows
merge Mutable entities (campaigns, contacts) Upsert by primary_key
replace Full refresh (feature flags, config) Drop and recreate table

Incremental Loading

Only fetch new data since last run:

@dlt.resource(write_disposition="append", primary_key="id")
def events(
    updated_at: dlt.sources.incremental[str] = dlt.sources.incremental(
        "updated_at",
        initial_value="2024-01-01"
    )
):
    # Only fetches records after last loaded timestamp
    yield from api.get_events(since=updated_at.last_value)

WARNING: High-Volume Sources

dlt.sources.incremental tracks every row for deduplication. If many rows share the same cursor value, this causes O(n²) performance.

Rows per cursor value Overhead Recommendation
< 100 Negligible Use incremental
100 - 1,000 Noticeable Monitor performance
> 1,000 Severe Use file-level state instead

For high-volume sources (like S3 CSV imports), use dlt.current.resource_state() for file-level tracking:

@dlt.resource(write_disposition="merge", primary_key=["file_name", "row_id"])
def csv_resource():
    state = dlt.current.resource_state()
    last_date = state.get("last_file_date", "2024-01-01")

    for file in get_files_since(last_date):
        yield from process_file(file)
        state["last_file_date"] = file.date  # Manual state update

Primary Keys

Required for merge disposition:

# Single key
@dlt.resource(primary_key="id")

# Composite key
@dlt.resource(primary_key=["date", "affiliate_id"])

Schema Evolution

dlt auto-evolves schemas. New columns added automatically. To see current schema:

SELECT * FROM {schema}._dlt_loads ORDER BY inserted_at DESC LIMIT 5;

Debugging Failed Loads

Check dlt metadata tables

-- Recent loads
SELECT load_id, schema_name, status, inserted_at
FROM {schema}._dlt_loads
ORDER BY inserted_at DESC LIMIT 10;

-- Pipeline state
SELECT * FROM {schema}._dlt_pipeline_state;

Common Errors

"Primary key violation"

  • Using append when you need merge
  • Duplicate records in source data

"Column type mismatch"

  • Schema evolved incompatibly
  • Fix: Drop table or add explicit column hints

"Connection refused"

  • Check Supabase pooler settings (port 6543, user format)

Drop Pending Packages

If pipeline is stuck:

dlt pipeline {pipeline_name} drop-pending-packages

SignalRoom Sources

Source Write Mode Primary Key State Tracking
s3_exports merge _file_name, _row_id File-level (resource_state)
everflow merge date, affiliate_id, advertiser_id Row-level (incremental)
redtrack merge date, source_id Row-level (incremental)

Testing Locally

Use DuckDB for fast local testing:

pipeline = dlt.pipeline(
    pipeline_name="test",
    destination="duckdb",
    dataset_name="test"
)

Resources

Related Skills

Xlsx

Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas

data

Clickhouse Io

ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.

datacli

Clickhouse Io

ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.

datacli

Analyzing Financial Statements

This skill calculates key financial ratios and metrics from financial statement data for investment analysis

data

Data Storytelling

Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.

data

Kpi Dashboard Design

Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.

designdata

Dbt Transformation Patterns

Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.

testingdocumenttool

Sql Optimization Patterns

Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.

designdata

Anndata

This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.

arttooldata

Xlsx

Spreadsheet toolkit (.xlsx/.csv). Create/edit with formulas/formatting, analyze data, visualization, recalculate formulas, for spreadsheet processing and analysis.

tooldata

Skill Information

Category:Technical
Last Updated:12/22/2025