Data_Transform
by vuralserhat86
Transform raw data into analytical assets using ETL/ELT patterns, SQL (dbt), Python (pandas/polars/PySpark), and orchestration (Airflow). Use when building data pipelines, implementing incremental models, migrating from pandas to polars, or orchestrating multi-step transformations with testing and quality checks.
Skill Details
Repository Files
1 file in this skill directory
name: data_transform router_kit: FullStackKit description: Transform raw data into analytical assets using ETL/ELT patterns, SQL (dbt), Python (pandas/polars/PySpark), and orchestration (Airflow). Use when building data pipelines, implementing incremental models, migrating from pandas to polars, or orchestrating multi-step transformations with testing and quality checks. metadata: skillport: category: auto-healed tags: [big data, cleaning, csv, data analysis, data engineering, data science, data transform, database, etl, etl pipelines, export, import, json, json manipulation, machine learning basics, migration, normalization, nosql, numpy, pandas, pipeline, python data stack, query optimization, reporting, schema design, sql, statistics, transformation, visualization]
Data Transformation
Transform raw data into analytical assets using modern transformation patterns, frameworks, and orchestration tools.
Purpose
Select and implement data transformation patterns across the modern data stack. Transform raw data into clean, tested, and documented analytical datasets using SQL (dbt), Python DataFrames (pandas, polars, PySpark), and pipeline orchestration (Airflow, Dagster, Prefect).
When to Use
Invoke this skill when:
- Choosing between ETL and ELT transformation patterns
- Building dbt models (staging, intermediate, marts)
- Implementing incremental data loads and merge strategies
- Migrating pandas code to polars for performance improvements
- Orchestrating data pipelines with dependencies and retries
- Adding data quality tests and validation
- Processing large datasets with PySpark
- Creating production-ready transformation workflows
Quick Start: Common Patterns
dbt Incremental Model
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select order_id, customer_id, order_created_at, sum(revenue) as total_revenue
from {{ ref('int_order_items_joined') }}
group by 1, 2, 3
{% if is_incremental() %}
where order_created_at > (select max(order_created_at) from {{ this }})
{% endif %}
polars High-Performance Transformation
import polars as pl
result = (
pl.scan_csv('large_dataset.csv')
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg(pl.col('revenue').sum())
.collect() # Execute lazy query
)
Airflow Data Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id='daily_sales_pipeline',
schedule_interval='0 2 * * *',
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
extract >> transform
Decision Frameworks
ETL vs ELT Selection
Use ELT (Extract, Load, Transform) when:
- Using modern cloud data warehouse (Snowflake, BigQuery, Databricks)
- Transformation logic changes frequently
- Team includes SQL analysts
- Data volume 10GB-1TB+ (leverage warehouse parallelism)
Tools: dbt, Dataform, Snowflake tasks, BigQuery scheduled queries
Use ETL (Extract, Transform, Load) when:
- Regulatory compliance requires pre-load data redaction (PII/PHI)
- Target system lacks compute power
- Real-time streaming with immediate transformation
- Legacy systems without cloud warehouse
Tools: AWS Glue, Azure Data Factory, custom Python scripts
Use Hybrid when combining sensitive data cleansing (ETL) with analytics transformations (ELT).
Default recommendation: ELT with dbt unless specific compliance or performance constraints require ETL.
For detailed patterns, see references/etl-vs-elt-patterns.md.
DataFrame Library Selection
Choose pandas when:
- Data size < 500MB
- Prototyping or exploratory analysis
- Need compatibility with pandas-only libraries
Choose polars when:
- Data size 500MB-100GB
- Performance critical (10-100x faster than pandas)
- Production pipelines with memory constraints
- Want lazy evaluation with query optimization
Choose PySpark when:
- Data size > 100GB
- Need distributed processing across cluster
- Existing Spark infrastructure (EMR, Databricks)
Migration path: pandas → polars (easier, similar API) or pandas → PySpark (requires cluster)
For comparisons and migration guides, see references/dataframe-comparison.md.
Orchestration Tool Selection
Choose Airflow when:
- Enterprise production (proven at scale)
- Need 5,000+ integrations
- Managed services available (AWS MWAA, GCP Cloud Composer)
Choose Dagster when:
- Heavy dbt usage (native
dbt_assetsintegration) - Data lineage and asset-based workflows prioritized
- ML pipelines requiring testability
Choose Prefect when:
- Dynamic workflows (runtime task generation)
- Cloud-native architecture preferred
- Pythonic API with decorators
Safe default: Airflow (battle-tested) unless specific needs for Dagster/Prefect.
For detailed patterns, see references/orchestration-patterns.md.
SQL Transformations with dbt
Model Layer Structure
-
Staging Layer (
models/staging/)- 1:1 with source tables
- Minimal transformations (renaming, type casting, basic filtering)
- Materialized as views or ephemeral
-
Intermediate Layer (
models/intermediate/)- Business logic and complex joins
- Not exposed to end users
- Often ephemeral (CTEs only)
-
Marts Layer (
models/marts/)- Final models for reporting
- Fact tables (events, transactions)
- Dimension tables (customers, products)
- Materialized as tables or incremental
dbt Materialization Types
View: Query re-run each time model referenced. Use for fast queries, staging layer.
Table: Full refresh on each run. Use for frequently queried models, expensive computations.
Incremental: Only processes new/changed records. Use for large fact tables, event logs.
Ephemeral: CTE only, not persisted. Use for intermediate calculations.
dbt Testing
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_id
- name: total_revenue
tests:
- dbt_utils.accepted_range:
min_value: 0
For comprehensive dbt patterns, see:
references/dbt-best-practices.mdreferences/incremental-strategies.md
Python DataFrame Transformations
pandas Transformation
import pandas as pd
df = pd.read_csv('sales.csv')
result = (
df
.query('year == 2024')
.assign(revenue=lambda x: x['quantity'] * x['price'])
.groupby('region')
.agg({'revenue': ['sum', 'mean']})
)
polars Transformation (10-100x Faster)
import polars as pl
result = (
pl.scan_csv('sales.csv') # Lazy evaluation
.filter(pl.col('year') == 2024)
.with_columns([(pl.col('quantity') * pl.col('price')).alias('revenue')])
.group_by('region')
.agg([
pl.col('revenue').sum().alias('revenue_sum'),
pl.col('revenue').mean().alias('revenue_mean')
])
.collect() # Execute lazy query
)
Key differences:
- polars uses
scan_csv()(lazy) vs pandasread_csv()(eager) - polars uses
with_columns()vs pandasassign() - polars uses
pl.col()expressions vs pandas string references - polars requires
collect()to execute lazy queries
PySpark for Distributed Processing
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("Transform").getOrCreate()
df = spark.read.csv('sales.csv', header=True, inferSchema=True)
result = (
df
.filter(F.col('year') == 2024)
.withColumn('revenue', F.col('quantity') * F.col('price'))
.groupBy('region')
.agg(F.sum('revenue').alias('total_revenue'))
)
For migration guides, see references/dataframe-comparison.md.
Pipeline Orchestration
Airflow DAG Structure
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='data_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
task1 = PythonOperator(task_id='extract', python_callable=extract_fn)
task2 = PythonOperator(task_id='transform', python_callable=transform_fn)
task1 >> task2 # Define dependency
Task Dependency Patterns
Linear: A >> B >> C (sequential)
Fan-out: A >> [B, C, D] (parallel after A)
Fan-in: [A, B, C] >> D (D waits for all)
For Airflow, Dagster, and Prefect patterns, see references/orchestration-patterns.md.
Data Quality and Testing
dbt Tests
Generic tests (reusable): unique, not_null, accepted_values, relationships
Singular tests (custom SQL):
-- tests/assert_positive_revenue.sql
select * from {{ ref('fct_orders') }}
where total_revenue < 0
Great Expectations
import great_expectations as gx
context = gx.get_context()
suite = context.add_expectation_suite("orders_suite")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_revenue", min_value=0
)
)
For comprehensive testing patterns, see references/data-quality-testing.md.
Advanced SQL Patterns
Window functions for analytics:
select
order_date,
daily_revenue,
avg(daily_revenue) over (
partition by region
order by order_date
rows between 6 preceding and current row
) as revenue_7d_ma,
sum(daily_revenue) over (
partition by region
order by order_date
) as cumulative_revenue
from daily_sales
For advanced window functions, see references/window-functions-guide.md.
Production Best Practices
Idempotency
Ensure transformations produce same result when run multiple times:
- Use
mergestatements in incremental models - Implement deduplication logic
- Use
unique_keyin dbt incremental models
Incremental Loading
{% if is_incremental() %}
where created_at > (select max(created_at) from {{ this }})
{% endif %}
Error Handling
try:
result = perform_transformation()
validate_result(result)
except ValidationError as e:
log_error(e)
raise
Monitoring
- Set up Airflow email/Slack alerts on task failure
- Monitor dbt test failures
- Track data freshness (SLAs)
- Log row counts and data quality metrics
Tool Recommendations
SQL Transformations: dbt Core (industry standard, multi-warehouse, rich ecosystem)
pip install dbt-core dbt-snowflake
Python DataFrames: polars (10-100x faster than pandas, multi-threaded, lazy evaluation)
pip install polars
Orchestration: Apache Airflow (battle-tested at scale, 5,000+ integrations)
pip install apache-airflow
Examples
Working examples in:
examples/python/pandas-basics.py- pandas transformationsexamples/python/polars-migration.py- pandas to polars migrationexamples/python/pyspark-transformations.py- PySpark operationsexamples/python/airflow-data-pipeline.py- Complete Airflow DAGexamples/sql/dbt-staging-model.sql- dbt staging layerexamples/sql/dbt-intermediate-model.sql- dbt intermediate layerexamples/sql/dbt-incremental-model.sql- Incremental patternsexamples/sql/window-functions.sql- Advanced SQL
Scripts
scripts/generate_dbt_models.py- Generate dbt model boilerplatescripts/benchmark_dataframes.py- Compare pandas vs polars performance
Related Skills
For data ingestion patterns, see ingesting-data.
For data visualization, see visualizing-data.
For database design, see databases-* skills.
For real-time streaming, see streaming-data.
For data platform architecture, see ai-data-engineering.
For monitoring pipelines, see observability.
Merged Content from etl-pipelines
name: data_transform description: Design ETL/ELT pipelines with proper orchestration, error handling, and monitoring. Use when building data pipelines, designing data workflows, or implementing data transformations.
ETL Designer
Design robust ETL/ELT pipelines for data processing.
Quick Start
Use Airflow for orchestration, implement idempotent operations, add error handling, monitor pipeline health.
Instructions
Airflow DAG Structure
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['alerts@company.com']
}
with DAG(
'etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_from_source
)
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse
)
extract >> transform >> load
Incremental Processing
def extract_incremental(last_run_date):
query = f"""
SELECT * FROM source_table
WHERE updated_at > '{last_run_date}'
"""
return pd.read_sql(query, conn)
Error Handling
def safe_transform(data):
try:
transformed = transform_data(data)
return transformed
except Exception as e:
logger.error(f"Transform failed: {e}")
send_alert(f"Pipeline failed: {e}")
raise
Best Practices
🔄 Workflow
Kaynak: dbt Labs - Best Practices & Polars Performance Guide
Aşama 1: Data Contract & Source Audit
- Data Contracts: Veri kaynağı (Source) ve hedef (Target) arasındaki şemayı sabitle.
- Profiling: Ham verideki eksikleri, null oranlarını ve tipleri (Profiling) analiz et.
- Pattern Selection: Veri boyutuna göre ETL (Pandas/Polars) veya ELT (SQL/dbt) seçimi yap.
Aşama 2: Transformation Engine Setup
- Infrastructure:
dbt-coreprofilini kur veya Cloud IDE yapılandır. - Modular Modeling: Veriyi Staging (Renaming), Intermediate (Logic) ve Marts (Final) katmanlarına ayır.
- Polars Optimization: Python tabanlı dönüşümlerde
lazymodunu (scan_csv/collect) kullanarak bellek ve hız optimizasyonu yap.
Aşama 3: Testing & Orchestration
- Unit Tests: Kritik dönüşüm mantığı için
dbt testsveyaGreat Expectationsile validation yaz. - Idempotency: Boru hattının (Pipeline) hata durumunda tekrar çalıştırılabilir (Idempotent) olduğundan emin ol.
- Orchestration: İş akışını Airflow veya Dagster üzerinde takvime bağla ve hata bildirimlerini kur.
Kontrol Noktaları
| Aşama | Doğrulama |
|---|---|
| 1 | Dönüşüm sonrası veri kaybı yaşandı mı? (Check Sum) |
| 2 | dbt modellerinde ref fonksiyonu dışında hardcoded tablo ismi kullanıldı mı? |
| 3 | Pipeline başarısız olduğunda "Rollback" veya "Reprocessing" stratejisi var mı? |
Data Transformation v2.0 - With Workflow
Related Skills
Xlsx
Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Analyzing Financial Statements
This skill calculates key financial ratios and metrics from financial statement data for investment analysis
Data Storytelling
Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.
Kpi Dashboard Design
Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.
Dbt Transformation Patterns
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
Sql Optimization Patterns
Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.
Anndata
This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.
Xlsx
Spreadsheet toolkit (.xlsx/.csv). Create/edit with formulas/formatting, analyze data, visualization, recalculate formulas, for spreadsheet processing and analysis.
