Data Cleaning Pipeline Generator

by Dexploarer

data

Generates data cleaning pipelines for pandas/polars with handling for missing values, duplicates, outliers, type conversions, and data validation. Use when user asks to "clean data", "generate data pipeline", "handle missing values", or "remove duplicates from dataset".

Skill Details

Repository Files

1 file in this skill directory


name: data-cleaning-pipeline-generator description: Generates data cleaning pipelines for pandas/polars with handling for missing values, duplicates, outliers, type conversions, and data validation. Use when user asks to "clean data", "generate data pipeline", "handle missing values", or "remove duplicates from dataset". allowed-tools: [Write, Read, Bash]

Data Cleaning Pipeline Generator

Generates comprehensive data cleaning and preprocessing pipelines using pandas, polars, or PySpark with best practices for handling messy data.

When to Use

  • "Clean my dataset"
  • "Generate data cleaning pipeline"
  • "Handle missing values"
  • "Remove duplicates"
  • "Fix data types"
  • "Detect and remove outliers"

Instructions

1. Analyze Dataset

import pandas as pd

# Load data
df = pd.read_csv('data.csv')

# Basic info
print(df.info())
print(df.describe())
print(df.head())

# Check for issues
print("\nMissing values:")
print(df.isnull().sum())

print("\nDuplicates:")
print(f"Total duplicates: {df.duplicated().sum()}")

print("\nData types:")
print(df.dtypes)

2. Generate Pandas Cleaning Pipeline

Complete Pipeline:

import pandas as pd
import numpy as np
from datetime import datetime

class DataCleaningPipeline:
    """Data cleaning pipeline for pandas DataFrames."""

    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.original_shape = df.shape
        self.cleaning_log = []

    def log(self, message: str):
        """Log cleaning steps."""
        self.cleaning_log.append(f"[{datetime.now()}] {message}")
        print(message)

    def remove_duplicates(self, subset=None, keep='first'):
        """Remove duplicate rows."""
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset, keep=keep)
        removed = before - len(self.df)
        self.log(f"Removed {removed} duplicate rows")
        return self

    def handle_missing_values(self, strategy='auto'):
        """Handle missing values based on strategy."""
        missing = self.df.isnull().sum()
        columns_with_missing = missing[missing > 0]

        for col in columns_with_missing.index:
            missing_pct = (missing[col] / len(self.df)) * 100

            if missing_pct > 50:
                self.log(f"Dropping column '{col}' ({missing_pct:.1f}% missing)")
                self.df = self.df.drop(columns=[col])
                continue

            if self.df[col].dtype in ['int64', 'float64']:
                # Numeric columns
                if strategy == 'mean':
                    fill_value = self.df[col].mean()
                elif strategy == 'median':
                    fill_value = self.df[col].median()
                else:  # auto
                    fill_value = self.df[col].median()
                self.df[col] = self.df[col].fillna(fill_value)
                self.log(f"Filled '{col}' with {strategy}: {fill_value:.2f}")
            else:
                # Categorical columns
                if strategy == 'mode':
                    fill_value = self.df[col].mode()[0]
                else:  # auto
                    fill_value = 'Unknown'
                self.df[col] = self.df[col].fillna(fill_value)
                self.log(f"Filled '{col}' with: {fill_value}")

        return self

    def fix_data_types(self, type_mapping=None):
        """Convert columns to appropriate data types."""
        if type_mapping is None:
            type_mapping = {}

        for col in self.df.columns:
            if col in type_mapping:
                try:
                    self.df[col] = self.df[col].astype(type_mapping[col])
                    self.log(f"Converted '{col}' to {type_mapping[col]}")
                except Exception as e:
                    self.log(f"Failed to convert '{col}': {e}")
            else:
                # Auto-detect dates
                if 'date' in col.lower() or 'time' in col.lower():
                    try:
                        self.df[col] = pd.to_datetime(self.df[col])
                        self.log(f"Converted '{col}' to datetime")
                    except:
                        pass

        return self

    def remove_outliers(self, columns=None, method='iqr', threshold=1.5):
        """Remove outliers using IQR or Z-score method."""
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns

        before = len(self.df)

        for col in columns:
            if method == 'iqr':
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower = Q1 - threshold * IQR
                upper = Q3 + threshold * IQR
                mask = (self.df[col] >= lower) & (self.df[col] <= upper)
            else:  # z-score
                z_scores = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
                mask = z_scores < threshold

            self.df = self.df[mask]

        removed = before - len(self.df)
        self.log(f"Removed {removed} outlier rows using {method} method")
        return self

    def normalize_text(self, columns=None):
        """Normalize text columns (lowercase, strip whitespace)."""
        if columns is None:
            columns = self.df.select_dtypes(include=['object']).columns

        for col in columns:
            self.df[col] = self.df[col].str.strip().str.lower()
            self.log(f"Normalized text in '{col}'")

        return self

    def encode_categorical(self, columns=None, method='label'):
        """Encode categorical variables."""
        if columns is None:
            columns = self.df.select_dtypes(include=['object']).columns

        for col in columns:
            if method == 'label':
                self.df[col] = pd.Categorical(self.df[col]).codes
                self.log(f"Label encoded '{col}'")
            elif method == 'onehot':
                dummies = pd.get_dummies(self.df[col], prefix=col)
                self.df = pd.concat([self.df.drop(columns=[col]), dummies], axis=1)
                self.log(f"One-hot encoded '{col}'")

        return self

    def validate_ranges(self, range_checks):
        """Validate numeric columns are within expected ranges."""
        for col, (min_val, max_val) in range_checks.items():
            invalid = ((self.df[col] < min_val) | (self.df[col] > max_val)).sum()
            if invalid > 0:
                self.log(f"WARNING: {invalid} values in '{col}' outside range [{min_val}, {max_val}]")
                # Remove invalid rows
                self.df = self.df[(self.df[col] >= min_val) & (self.df[col] <= max_val)]

        return self

    def generate_report(self):
        """Generate cleaning report."""
        report = f"""
Data Cleaning Report
====================

Original Shape: {self.original_shape}
Final Shape: {self.df.shape}
Rows Removed: {self.original_shape[0] - self.df.shape[0]}
Columns Removed: {self.original_shape[1] - self.df.shape[1]}

Cleaning Steps:
"""
        for step in self.cleaning_log:
            report += f"  - {step}\n"

        return report

    def get_cleaned_data(self):
        """Return cleaned DataFrame."""
        return self.df


# Usage
pipeline = DataCleaningPipeline(df)
cleaned_df = (
    pipeline
    .remove_duplicates()
    .handle_missing_values(strategy='auto')
    .fix_data_types()
    .remove_outliers(method='iqr', threshold=1.5)
    .normalize_text()
    .validate_ranges({'age': (0, 120), 'price': (0, 1000000)})
    .get_cleaned_data()
)

print(pipeline.generate_report())
cleaned_df.to_csv('cleaned_data.csv', index=False)

3. Polars Pipeline (Faster for Large Data)

import polars as pl

# Load data
df = pl.read_csv('data.csv')

# Cleaning pipeline
cleaned_df = (
    df
    # Remove duplicates
    .unique()
    # Handle missing values
    .with_columns([
        pl.col('age').fill_null(pl.col('age').median()),
        pl.col('name').fill_null('Unknown'),
    ])
    # Fix data types
    .with_columns([
        pl.col('date').str.strptime(pl.Date, '%Y-%m-%d'),
        pl.col('amount').cast(pl.Float64),
    ])
    # Remove outliers
    .filter(
        (pl.col('age') >= 0) & (pl.col('age') <= 120)
    )
    # Normalize text
    .with_columns([
        pl.col('name').str.to_lowercase().str.strip(),
        pl.col('email').str.to_lowercase(),
    ])
)

# Save
cleaned_df.write_csv('cleaned_data.csv')

4. PySpark Pipeline (For Big Data)

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean, trim, lower
from pyspark.sql.types import IntegerType, DoubleType

spark = SparkSession.builder.appName('DataCleaning').getOrCreate()

# Load data
df = spark.read.csv('data.csv', header=True, inferSchema=True)

# Cleaning pipeline
cleaned_df = (
    df
    # Remove duplicates
    .dropDuplicates()
    # Handle missing values
    .fillna({
        'age': df.select(mean('age')).collect()[0][0],
        'name': 'Unknown',
    })
    # Fix data types
    .withColumn('age', col('age').cast(IntegerType()))
    .withColumn('amount', col('amount').cast(DoubleType()))
    # Remove outliers
    .filter((col('age') >= 0) & (col('age') <= 120))
    # Normalize text
    .withColumn('name', trim(lower(col('name'))))
    .withColumn('email', trim(lower(col('email'))))
)

# Save
cleaned_df.write.csv('cleaned_data', header=True, mode='overwrite')

5. Data Quality Checks

def data_quality_checks(df):
    """Run comprehensive data quality checks."""
    report = []

    # Check 1: Missing values
    missing = df.isnull().sum()
    if missing.sum() > 0:
        report.append(f"⚠️  Missing values found:\n{missing[missing > 0]}")

    # Check 2: Duplicates
    duplicates = df.duplicated().sum()
    if duplicates > 0:
        report.append(f"⚠️  {duplicates} duplicate rows found")

    # Check 3: Data types
    expected_types = {
        'age': 'int64',
        'amount': 'float64',
        'date': 'datetime64[ns]',
    }
    for col, expected in expected_types.items():
        if col in df.columns and df[col].dtype != expected:
            report.append(f"⚠️  Column '{col}' has type {df[col].dtype}, expected {expected}")

    # Check 4: Value ranges
    if 'age' in df.columns:
        invalid_age = ((df['age'] < 0) | (df['age'] > 120)).sum()
        if invalid_age > 0:
            report.append(f"⚠️  {invalid_age} invalid age values")

    # Check 5: Unique identifiers
    if 'id' in df.columns:
        if df['id'].duplicated().any():
            report.append(f"⚠️  Duplicate IDs found")

    # Check 6: Consistency
    if 'email' in df.columns:
        invalid_email = ~df['email'].str.contains('@', na=False)
        if invalid_email.sum() > 0:
            report.append(f"⚠️  {invalid_email.sum()} invalid email addresses")

    if report:
        print("Data Quality Issues:")
        for issue in report:
            print(issue)
    else:
        print("✅ All quality checks passed!")

    return len(report) == 0

# Run checks
data_quality_checks(cleaned_df)

6. Automated Cleaning Function

def auto_clean_dataframe(df, config=None):
    """Automatically clean DataFrame with sensible defaults."""
    if config is None:
        config = {
            'remove_duplicates': True,
            'handle_missing': True,
            'remove_outliers': True,
            'fix_types': True,
            'normalize_text': True,
        }

    print(f"Original shape: {df.shape}")

    if config['remove_duplicates']:
        df = df.drop_duplicates()
        print(f"After removing duplicates: {df.shape}")

    if config['handle_missing']:
        # Numeric: fill with median
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            df[col] = df[col].fillna(df[col].median())

        # Categorical: fill with mode or 'Unknown'
        categorical_cols = df.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            if df[col].mode().empty:
                df[col] = df[col].fillna('Unknown')
            else:
                df[col] = df[col].fillna(df[col].mode()[0])
        print(f"After handling missing values: {df.shape}")

    if config['remove_outliers']:
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            df = df[(df[col] >= Q1 - 1.5 * IQR) & (df[col] <= Q3 + 1.5 * IQR)]
        print(f"After removing outliers: {df.shape}")

    if config['normalize_text']:
        text_cols = df.select_dtypes(include=['object']).columns
        for col in text_cols:
            df[col] = df[col].str.strip().str.lower()

    return df

# Usage
cleaned_df = auto_clean_dataframe(df)

7. Save Pipeline Configuration

# data_cleaning_config.yaml
cleaning_pipeline:
  remove_duplicates:
    enabled: true
    subset: ['id', 'email']
    keep: 'first'

  missing_values:
    strategy: auto
    drop_threshold: 50  # Drop columns with >50% missing
    numeric_fill: median
    categorical_fill: mode

  outliers:
    method: iqr
    threshold: 1.5
    columns: ['age', 'price', 'quantity']

  data_types:
    age: int64
    price: float64
    date: datetime64
    email: string

  text_normalization:
    lowercase: true
    strip_whitespace: true
    remove_special_chars: false

  validation:
    ranges:
      age: [0, 120]
      price: [0, 1000000]
    required_columns: ['id', 'name', 'email']

Best Practices

DO:

  • Always keep original data
  • Log all cleaning steps
  • Validate data quality
  • Handle missing values appropriately
  • Remove duplicates early
  • Check for outliers
  • Validate data types
  • Document assumptions

DON'T:

  • Delete original data
  • Fill all missing with zeros
  • Ignore outliers
  • Mix data types
  • Skip validation
  • Overfit to training data
  • Remove too many rows
  • Forget to save cleaned data

Checklist

  • Loaded and inspected data
  • Removed duplicates
  • Handled missing values
  • Fixed data types
  • Removed/handled outliers
  • Normalized text fields
  • Validated data quality
  • Generated cleaning report
  • Saved cleaned data

Related Skills

Xlsx

Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas

data

Clickhouse Io

ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.

datacli

Clickhouse Io

ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.

datacli

Analyzing Financial Statements

This skill calculates key financial ratios and metrics from financial statement data for investment analysis

data

Data Storytelling

Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.

data

Kpi Dashboard Design

Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.

designdata

Dbt Transformation Patterns

Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.

testingdocumenttool

Sql Optimization Patterns

Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.

designdata

Anndata

This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.

arttooldata

Xlsx

Spreadsheet toolkit (.xlsx/.csv). Create/edit with formulas/formatting, analyze data, visualization, recalculate formulas, for spreadsheet processing and analysis.

tooldata

Skill Information

Category:Data
Allowed Tools:[Write, Read, Bash]
Last Updated:11/1/2025