Senior Data Engineer
by borghei
Expert data engineering covering data pipelines, ETL/ELT, data warehousing, streaming, and data quality.
Skill Details
Repository Files
1 file in this skill directory
name: senior-data-engineer description: Expert data engineering covering data pipelines, ETL/ELT, data warehousing, streaming, and data quality. version: 1.0.0 author: Claude Skills category: engineering tags: [data, etl, spark, airflow, snowflake, streaming]
Senior Data Engineer
Expert-level data engineering for scalable data systems.
Core Competencies
- Data pipeline development
- ETL/ELT design
- Data warehousing
- Stream processing
- Data quality and governance
- Data modeling
- Performance optimization
- Cloud data platforms
Data Pipeline Architecture
Batch Pipeline
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Sources │────▶│ Transform │────▶│ Target │
│ (S3, API) │ │ (Spark) │ │ (Snowflake)│
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ ▼ │
│ ┌─────────────┐ │
└──────────▶│ Quality │◀────────────┘
│ Checks │
└─────────────┘
Streaming Pipeline
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Kafka │────▶│ Processor │────▶│ Real-time │
│ Topics │ │ (Flink) │ │ Store │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Batch │
│ Layer │
└─────────────┘
Apache Airflow
DAG Structure
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='daily_etl_pipeline',
default_args=default_args,
description='Daily ETL for analytics',
schedule_interval='0 6 * * *',
start_date=days_ago(1),
catchup=False,
tags=['etl', 'daily'],
) as dag:
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_from_source,
op_kwargs={'date': '{{ ds }}'},
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
)
quality_check = PythonOperator(
task_id='quality_check',
python_callable=run_quality_checks,
)
load_task = SnowflakeOperator(
task_id='load_to_snowflake',
sql='sql/load_data.sql',
snowflake_conn_id='snowflake_default',
params={'date': '{{ ds }}'},
)
extract_task >> transform_task >> quality_check >> load_task
Dynamic DAG Generation
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def dynamic_etl():
@task
def get_sources():
return ['source_a', 'source_b', 'source_c']
@task
def extract(source: str):
print(f'Extracting from {source}')
return f'{source}_data'
@task
def transform(data: str):
print(f'Transforming {data}')
return f'{data}_transformed'
@task
def load(data: list):
print(f'Loading {len(data)} datasets')
sources = get_sources()
extracted = extract.expand(source=sources)
transformed = transform.expand(data=extracted)
load(transformed)
dag = dynamic_etl()
Apache Spark
Spark ETL Job
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder \
.appName("ETL Pipeline") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
def read_source_data(path: str, date: str):
return spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(f"{path}/date={date}/*.csv")
def transform_data(df):
return df \
.withColumn("amount_clean",
when(col("amount").isNull(), 0).otherwise(col("amount"))) \
.withColumn("category_normalized",
when(col("category").isin(["A", "B"]), col("category"))
.otherwise("OTHER")) \
.withColumn("processed_at", current_timestamp()) \
.filter(col("status") == "active") \
.dropDuplicates(["id"])
def aggregate_metrics(df):
return df.groupBy("date", "category") \
.agg(
F.count("*").alias("total_count"),
F.sum("amount").alias("total_amount"),
F.avg("amount").alias("avg_amount"),
F.countDistinct("user_id").alias("unique_users")
)
def write_to_warehouse(df, table_name: str, partition_cols: list):
df.write \
.mode("overwrite") \
.partitionBy(partition_cols) \
.format("delta") \
.saveAsTable(table_name)
# Main pipeline
raw_df = read_source_data("s3://bucket/raw/", "2024-01-15")
clean_df = transform_data(raw_df)
metrics_df = aggregate_metrics(clean_df)
write_to_warehouse(clean_df, "clean.transactions", ["date"])
write_to_warehouse(metrics_df, "analytics.daily_metrics", ["date"])
Performance Optimization
# Partition tuning
df = spark.read.parquet("s3://bucket/data/") \
.repartition(100, "date") # Repartition by key
# Broadcast join for small tables
from pyspark.sql.functions import broadcast
small_df = spark.table("lookup_table")
large_df = spark.table("fact_table")
result = large_df.join(broadcast(small_df), "key")
# Cache intermediate results
cleaned_df = transform(raw_df)
cleaned_df.cache()
metrics_1 = cleaned_df.groupBy("category").count()
metrics_2 = cleaned_df.groupBy("region").sum("amount")
cleaned_df.unpersist()
# Predicate pushdown
df = spark.read.parquet("s3://bucket/data/") \
.filter(col("date") >= "2024-01-01") # Pushed to storage layer
Data Modeling
Dimensional Modeling
Star Schema:
-- Fact Table
CREATE TABLE fact_sales (
sale_id BIGINT PRIMARY KEY,
date_key INT REFERENCES dim_date(date_key),
product_key INT REFERENCES dim_product(product_key),
customer_key INT REFERENCES dim_customer(customer_key),
store_key INT REFERENCES dim_store(store_key),
quantity INT,
unit_price DECIMAL(10,2),
total_amount DECIMAL(10,2),
discount_amount DECIMAL(10,2)
);
-- Dimension Tables
CREATE TABLE dim_date (
date_key INT PRIMARY KEY,
full_date DATE,
year INT,
quarter INT,
month INT,
week INT,
day_of_week INT,
is_weekend BOOLEAN,
is_holiday BOOLEAN
);
CREATE TABLE dim_product (
product_key INT PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(200),
category VARCHAR(100),
subcategory VARCHAR(100),
brand VARCHAR(100),
-- SCD Type 2 columns
effective_date DATE,
end_date DATE,
is_current BOOLEAN
);
dbt Models
staging/stg_orders.sql:
{{ config(materialized='view') }}
with source as (
select * from {{ source('raw', 'orders') }}
),
renamed as (
select
order_id,
customer_id,
order_date,
cast(total_amount as decimal(10,2)) as total_amount,
status,
_loaded_at
from source
where order_date >= '2023-01-01'
)
select * from renamed
marts/fct_orders.sql:
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
) }}
with orders as (
select * from {{ ref('stg_orders') }}
),
customers as (
select * from {{ ref('dim_customers') }}
),
final as (
select
o.order_id,
o.customer_id,
c.customer_segment,
o.order_date,
o.total_amount,
o.status,
current_timestamp() as processed_at
from orders o
left join customers c on o.customer_id = c.customer_id
{% if is_incremental() %}
where o._loaded_at > (select max(processed_at) from {{ this }})
{% endif %}
)
select * from final
Data Quality
Great Expectations
import great_expectations as gx
context = gx.get_context()
# Define expectations
expectation_suite = context.add_expectation_suite("orders_suite")
expectations = [
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id"),
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id"),
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount",
min_value=0,
max_value=100000
),
gx.expectations.ExpectColumnValuesToMatchRegex(
column="email",
regex=r"^[\w\.-]+@[\w\.-]+\.\w+$"
),
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000,
max_value=1000000
),
]
for exp in expectations:
expectation_suite.add_expectation(exp)
# Run validation
batch = context.get_batch(datasource_name="orders", data_asset_name="daily")
results = context.run_validation_operator(
"action_list_operator",
assets_to_validate=[batch],
expectation_suite_name="orders_suite"
)
if not results["success"]:
raise DataQualityError("Validation failed")
Custom Quality Checks
from dataclasses import dataclass
from typing import Callable
@dataclass
class QualityCheck:
name: str
check_fn: Callable
severity: str # 'error' or 'warning'
def check_no_nulls(df, column: str) -> bool:
null_count = df.filter(col(column).isNull()).count()
return null_count == 0
def check_referential_integrity(df, ref_df, key: str) -> bool:
orphans = df.join(ref_df, key, "left_anti")
return orphans.count() == 0
def check_freshness(df, timestamp_col: str, max_hours: int) -> bool:
from pyspark.sql.functions import max as spark_max, current_timestamp
max_ts = df.agg(spark_max(timestamp_col)).collect()[0][0]
hours_old = (datetime.now() - max_ts).total_seconds() / 3600
return hours_old <= max_hours
def run_quality_checks(df, checks: list[QualityCheck]):
results = []
for check in checks:
passed = check.check_fn(df)
results.append({
'check': check.name,
'passed': passed,
'severity': check.severity
})
errors = [r for r in results if not r['passed'] and r['severity'] == 'error']
if errors:
raise DataQualityError(f"Quality checks failed: {errors}")
return results
Stream Processing
Kafka + Flink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Define Kafka source
t_env.execute_sql("""
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
amount DECIMAL(10,2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'order-processor',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Windowed aggregation
t_env.execute_sql("""
CREATE TABLE order_metrics (
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
total_orders BIGINT,
total_amount DECIMAL(10,2)
) WITH (
'connector' = 'kafka',
'topic' = 'order-metrics',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
t_env.execute_sql("""
INSERT INTO order_metrics
SELECT
TUMBLE_START(order_time, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(order_time, INTERVAL '1' MINUTE) as window_end,
COUNT(*) as total_orders,
SUM(amount) as total_amount
FROM orders
GROUP BY TUMBLE(order_time, INTERVAL '1' MINUTE)
""")
Reference Materials
references/pipeline_patterns.md- Pipeline architecture patternsreferences/data_modeling.md- Dimensional modeling guidereferences/quality_framework.md- Data quality practicesreferences/streaming_guide.md- Stream processing patterns
Scripts
# Pipeline scaffolder
python scripts/pipeline_scaffold.py --name daily_etl --schedule daily
# Data quality runner
python scripts/quality_check.py --suite orders_suite --data s3://bucket/orders/
# Schema migration
python scripts/schema_migrate.py --target snowflake --version v2
# Backfill runner
python scripts/backfill.py --dag daily_etl --start 2024-01-01 --end 2024-01-31
Related Skills
Xlsx
Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Analyzing Financial Statements
This skill calculates key financial ratios and metrics from financial statement data for investment analysis
Data Storytelling
Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.
Kpi Dashboard Design
Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.
Dbt Transformation Patterns
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
Sql Optimization Patterns
Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.
Anndata
This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.
Xlsx
Spreadsheet toolkit (.xlsx/.csv). Create/edit with formulas/formatting, analyze data, visualization, recalculate formulas, for spreadsheet processing and analysis.
