Data Cleaning Pipeline Generator
by Dexploarer
Generates data cleaning pipelines for pandas/polars with handling for missing values, duplicates, outliers, type conversions, and data validation. Use when user asks to "clean data", "generate data pipeline", "handle missing values", or "remove duplicates from dataset".
Skill Details
Repository Files
1 file in this skill directory
name: data-cleaning-pipeline-generator description: Generates data cleaning pipelines for pandas/polars with handling for missing values, duplicates, outliers, type conversions, and data validation. Use when user asks to "clean data", "generate data pipeline", "handle missing values", or "remove duplicates from dataset". allowed-tools: [Write, Read, Bash]
Data Cleaning Pipeline Generator
Generates comprehensive data cleaning and preprocessing pipelines using pandas, polars, or PySpark with best practices for handling messy data.
When to Use
- "Clean my dataset"
- "Generate data cleaning pipeline"
- "Handle missing values"
- "Remove duplicates"
- "Fix data types"
- "Detect and remove outliers"
Instructions
1. Analyze Dataset
import pandas as pd
# Load data
df = pd.read_csv('data.csv')
# Basic info
print(df.info())
print(df.describe())
print(df.head())
# Check for issues
print("\nMissing values:")
print(df.isnull().sum())
print("\nDuplicates:")
print(f"Total duplicates: {df.duplicated().sum()}")
print("\nData types:")
print(df.dtypes)
2. Generate Pandas Cleaning Pipeline
Complete Pipeline:
import pandas as pd
import numpy as np
from datetime import datetime
class DataCleaningPipeline:
"""Data cleaning pipeline for pandas DataFrames."""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.original_shape = df.shape
self.cleaning_log = []
def log(self, message: str):
"""Log cleaning steps."""
self.cleaning_log.append(f"[{datetime.now()}] {message}")
print(message)
def remove_duplicates(self, subset=None, keep='first'):
"""Remove duplicate rows."""
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset, keep=keep)
removed = before - len(self.df)
self.log(f"Removed {removed} duplicate rows")
return self
def handle_missing_values(self, strategy='auto'):
"""Handle missing values based on strategy."""
missing = self.df.isnull().sum()
columns_with_missing = missing[missing > 0]
for col in columns_with_missing.index:
missing_pct = (missing[col] / len(self.df)) * 100
if missing_pct > 50:
self.log(f"Dropping column '{col}' ({missing_pct:.1f}% missing)")
self.df = self.df.drop(columns=[col])
continue
if self.df[col].dtype in ['int64', 'float64']:
# Numeric columns
if strategy == 'mean':
fill_value = self.df[col].mean()
elif strategy == 'median':
fill_value = self.df[col].median()
else: # auto
fill_value = self.df[col].median()
self.df[col] = self.df[col].fillna(fill_value)
self.log(f"Filled '{col}' with {strategy}: {fill_value:.2f}")
else:
# Categorical columns
if strategy == 'mode':
fill_value = self.df[col].mode()[0]
else: # auto
fill_value = 'Unknown'
self.df[col] = self.df[col].fillna(fill_value)
self.log(f"Filled '{col}' with: {fill_value}")
return self
def fix_data_types(self, type_mapping=None):
"""Convert columns to appropriate data types."""
if type_mapping is None:
type_mapping = {}
for col in self.df.columns:
if col in type_mapping:
try:
self.df[col] = self.df[col].astype(type_mapping[col])
self.log(f"Converted '{col}' to {type_mapping[col]}")
except Exception as e:
self.log(f"Failed to convert '{col}': {e}")
else:
# Auto-detect dates
if 'date' in col.lower() or 'time' in col.lower():
try:
self.df[col] = pd.to_datetime(self.df[col])
self.log(f"Converted '{col}' to datetime")
except:
pass
return self
def remove_outliers(self, columns=None, method='iqr', threshold=1.5):
"""Remove outliers using IQR or Z-score method."""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
before = len(self.df)
for col in columns:
if method == 'iqr':
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - threshold * IQR
upper = Q3 + threshold * IQR
mask = (self.df[col] >= lower) & (self.df[col] <= upper)
else: # z-score
z_scores = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
mask = z_scores < threshold
self.df = self.df[mask]
removed = before - len(self.df)
self.log(f"Removed {removed} outlier rows using {method} method")
return self
def normalize_text(self, columns=None):
"""Normalize text columns (lowercase, strip whitespace)."""
if columns is None:
columns = self.df.select_dtypes(include=['object']).columns
for col in columns:
self.df[col] = self.df[col].str.strip().str.lower()
self.log(f"Normalized text in '{col}'")
return self
def encode_categorical(self, columns=None, method='label'):
"""Encode categorical variables."""
if columns is None:
columns = self.df.select_dtypes(include=['object']).columns
for col in columns:
if method == 'label':
self.df[col] = pd.Categorical(self.df[col]).codes
self.log(f"Label encoded '{col}'")
elif method == 'onehot':
dummies = pd.get_dummies(self.df[col], prefix=col)
self.df = pd.concat([self.df.drop(columns=[col]), dummies], axis=1)
self.log(f"One-hot encoded '{col}'")
return self
def validate_ranges(self, range_checks):
"""Validate numeric columns are within expected ranges."""
for col, (min_val, max_val) in range_checks.items():
invalid = ((self.df[col] < min_val) | (self.df[col] > max_val)).sum()
if invalid > 0:
self.log(f"WARNING: {invalid} values in '{col}' outside range [{min_val}, {max_val}]")
# Remove invalid rows
self.df = self.df[(self.df[col] >= min_val) & (self.df[col] <= max_val)]
return self
def generate_report(self):
"""Generate cleaning report."""
report = f"""
Data Cleaning Report
====================
Original Shape: {self.original_shape}
Final Shape: {self.df.shape}
Rows Removed: {self.original_shape[0] - self.df.shape[0]}
Columns Removed: {self.original_shape[1] - self.df.shape[1]}
Cleaning Steps:
"""
for step in self.cleaning_log:
report += f" - {step}\n"
return report
def get_cleaned_data(self):
"""Return cleaned DataFrame."""
return self.df
# Usage
pipeline = DataCleaningPipeline(df)
cleaned_df = (
pipeline
.remove_duplicates()
.handle_missing_values(strategy='auto')
.fix_data_types()
.remove_outliers(method='iqr', threshold=1.5)
.normalize_text()
.validate_ranges({'age': (0, 120), 'price': (0, 1000000)})
.get_cleaned_data()
)
print(pipeline.generate_report())
cleaned_df.to_csv('cleaned_data.csv', index=False)
3. Polars Pipeline (Faster for Large Data)
import polars as pl
# Load data
df = pl.read_csv('data.csv')
# Cleaning pipeline
cleaned_df = (
df
# Remove duplicates
.unique()
# Handle missing values
.with_columns([
pl.col('age').fill_null(pl.col('age').median()),
pl.col('name').fill_null('Unknown'),
])
# Fix data types
.with_columns([
pl.col('date').str.strptime(pl.Date, '%Y-%m-%d'),
pl.col('amount').cast(pl.Float64),
])
# Remove outliers
.filter(
(pl.col('age') >= 0) & (pl.col('age') <= 120)
)
# Normalize text
.with_columns([
pl.col('name').str.to_lowercase().str.strip(),
pl.col('email').str.to_lowercase(),
])
)
# Save
cleaned_df.write_csv('cleaned_data.csv')
4. PySpark Pipeline (For Big Data)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean, trim, lower
from pyspark.sql.types import IntegerType, DoubleType
spark = SparkSession.builder.appName('DataCleaning').getOrCreate()
# Load data
df = spark.read.csv('data.csv', header=True, inferSchema=True)
# Cleaning pipeline
cleaned_df = (
df
# Remove duplicates
.dropDuplicates()
# Handle missing values
.fillna({
'age': df.select(mean('age')).collect()[0][0],
'name': 'Unknown',
})
# Fix data types
.withColumn('age', col('age').cast(IntegerType()))
.withColumn('amount', col('amount').cast(DoubleType()))
# Remove outliers
.filter((col('age') >= 0) & (col('age') <= 120))
# Normalize text
.withColumn('name', trim(lower(col('name'))))
.withColumn('email', trim(lower(col('email'))))
)
# Save
cleaned_df.write.csv('cleaned_data', header=True, mode='overwrite')
5. Data Quality Checks
def data_quality_checks(df):
"""Run comprehensive data quality checks."""
report = []
# Check 1: Missing values
missing = df.isnull().sum()
if missing.sum() > 0:
report.append(f"⚠️ Missing values found:\n{missing[missing > 0]}")
# Check 2: Duplicates
duplicates = df.duplicated().sum()
if duplicates > 0:
report.append(f"⚠️ {duplicates} duplicate rows found")
# Check 3: Data types
expected_types = {
'age': 'int64',
'amount': 'float64',
'date': 'datetime64[ns]',
}
for col, expected in expected_types.items():
if col in df.columns and df[col].dtype != expected:
report.append(f"⚠️ Column '{col}' has type {df[col].dtype}, expected {expected}")
# Check 4: Value ranges
if 'age' in df.columns:
invalid_age = ((df['age'] < 0) | (df['age'] > 120)).sum()
if invalid_age > 0:
report.append(f"⚠️ {invalid_age} invalid age values")
# Check 5: Unique identifiers
if 'id' in df.columns:
if df['id'].duplicated().any():
report.append(f"⚠️ Duplicate IDs found")
# Check 6: Consistency
if 'email' in df.columns:
invalid_email = ~df['email'].str.contains('@', na=False)
if invalid_email.sum() > 0:
report.append(f"⚠️ {invalid_email.sum()} invalid email addresses")
if report:
print("Data Quality Issues:")
for issue in report:
print(issue)
else:
print("✅ All quality checks passed!")
return len(report) == 0
# Run checks
data_quality_checks(cleaned_df)
6. Automated Cleaning Function
def auto_clean_dataframe(df, config=None):
"""Automatically clean DataFrame with sensible defaults."""
if config is None:
config = {
'remove_duplicates': True,
'handle_missing': True,
'remove_outliers': True,
'fix_types': True,
'normalize_text': True,
}
print(f"Original shape: {df.shape}")
if config['remove_duplicates']:
df = df.drop_duplicates()
print(f"After removing duplicates: {df.shape}")
if config['handle_missing']:
# Numeric: fill with median
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df[col] = df[col].fillna(df[col].median())
# Categorical: fill with mode or 'Unknown'
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
if df[col].mode().empty:
df[col] = df[col].fillna('Unknown')
else:
df[col] = df[col].fillna(df[col].mode()[0])
print(f"After handling missing values: {df.shape}")
if config['remove_outliers']:
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
df = df[(df[col] >= Q1 - 1.5 * IQR) & (df[col] <= Q3 + 1.5 * IQR)]
print(f"After removing outliers: {df.shape}")
if config['normalize_text']:
text_cols = df.select_dtypes(include=['object']).columns
for col in text_cols:
df[col] = df[col].str.strip().str.lower()
return df
# Usage
cleaned_df = auto_clean_dataframe(df)
7. Save Pipeline Configuration
# data_cleaning_config.yaml
cleaning_pipeline:
remove_duplicates:
enabled: true
subset: ['id', 'email']
keep: 'first'
missing_values:
strategy: auto
drop_threshold: 50 # Drop columns with >50% missing
numeric_fill: median
categorical_fill: mode
outliers:
method: iqr
threshold: 1.5
columns: ['age', 'price', 'quantity']
data_types:
age: int64
price: float64
date: datetime64
email: string
text_normalization:
lowercase: true
strip_whitespace: true
remove_special_chars: false
validation:
ranges:
age: [0, 120]
price: [0, 1000000]
required_columns: ['id', 'name', 'email']
Best Practices
DO:
- Always keep original data
- Log all cleaning steps
- Validate data quality
- Handle missing values appropriately
- Remove duplicates early
- Check for outliers
- Validate data types
- Document assumptions
DON'T:
- Delete original data
- Fill all missing with zeros
- Ignore outliers
- Mix data types
- Skip validation
- Overfit to training data
- Remove too many rows
- Forget to save cleaned data
Checklist
- Loaded and inspected data
- Removed duplicates
- Handled missing values
- Fixed data types
- Removed/handled outliers
- Normalized text fields
- Validated data quality
- Generated cleaning report
- Saved cleaned data
Related Skills
Xlsx
Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Clickhouse Io
ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
Analyzing Financial Statements
This skill calculates key financial ratios and metrics from financial statement data for investment analysis
Data Storytelling
Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.
Kpi Dashboard Design
Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.
Dbt Transformation Patterns
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
Sql Optimization Patterns
Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.
Anndata
This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.
Xlsx
Spreadsheet toolkit (.xlsx/.csv). Create/edit with formulas/formatting, analyze data, visualization, recalculate formulas, for spreadsheet processing and analysis.
