Data Provenance

by sunnypatneedi

data

|

Skill Details

Repository Files

1 file in this skill directory


name: data-provenance description: | Track data lineage and provenance from source to consumption. Use when auditing data flows, debugging data quality issues, ensuring compliance (GDPR, SOX), or understanding data dependencies. Covers lineage tracking, impact analysis, data catalogs, and metadata management.

Data Provenance & Lineage

Track where data comes from, how it transforms, and where it goes—essential for trust, compliance, and debugging.

When to Use

Use this skill when:

  • Auditing data for compliance (GDPR, HIPAA, SOX, CCPA)
  • Debugging data quality issues ("Where did this bad data come from?")
  • Understanding impact of schema changes ("What breaks if I change this field?")
  • Building data catalogs or governance systems
  • Tracking sensitive data (PII, PHI) through systems
  • Responding to data deletion requests (GDPR "right to be forgotten")

What is Data Provenance?

Provenance: The complete history and lineage of a data element

Question: "Where does the revenue number in this dashboard come from?"

Answer (with provenance):
Dashboard.revenue (computed 2026-01-21 08:00)
  ← warehouse.daily_sales.total (aggregated 2026-01-21 02:00)
    ← etl_pipeline.transform_sales (ran 2026-01-21 01:30)
      ← production_db.orders.amount (order #12345, created 2026-01-20 15:23)
        ← stripe_api.charge (charge_id: ch_abc123, processed 2026-01-20 15:23)
          ← user input (customer: cust_xyz, card ending 4242)

Key questions provenance answers:

  1. Where did this data come from? (source)
  2. When was it created/updated? (timestamp)
  3. How was it transformed? (logic, code version)
  4. Who created/modified it? (user, system, process)
  5. Why does it have this value? (business logic)
  6. What depends on it? (downstream consumers)

Levels of Provenance Tracking

Level 1: Table-Level Lineage

What: Track which tables feed into other tables

┌────────────┐
│ orders     │──┐
└────────────┘  │
                ├──► ┌──────────────┐
┌────────────┐  │    │ daily_sales  │
│ customers  │──┘    └──────────────┘
└────────────┘

Implementation: Metadata table

CREATE TABLE table_lineage (
  downstream_table VARCHAR(255),
  upstream_table VARCHAR(255),
  relationship_type VARCHAR(50),  -- 'direct_copy', 'join', 'aggregate'
  created_at TIMESTAMPTZ DEFAULT NOW(),

  PRIMARY KEY (downstream_table, upstream_table)
);

INSERT INTO table_lineage VALUES
  ('daily_sales', 'orders', 'aggregate'),
  ('daily_sales', 'customers', 'join');

Query: "What tables does daily_sales depend on?"

SELECT upstream_table
FROM table_lineage
WHERE downstream_table = 'daily_sales';
-- Result: orders, customers

Query: "What tables depend on orders?"

SELECT downstream_table
FROM table_lineage
WHERE upstream_table = 'orders';
-- Result: daily_sales, weekly_report, customer_lifetime_value

Level 2: Column-Level Lineage

What: Track which columns feed into which columns

orders.amount ──┐
orders.tax    ──┼──► daily_sales.total_revenue
orders.shipping─┘

Implementation:

CREATE TABLE column_lineage (
  downstream_table VARCHAR(255),
  downstream_column VARCHAR(255),
  upstream_table VARCHAR(255),
  upstream_column VARCHAR(255),
  transformation TEXT,  -- SQL or description
  created_at TIMESTAMPTZ DEFAULT NOW(),

  PRIMARY KEY (downstream_table, downstream_column, upstream_table, upstream_column)
);

INSERT INTO column_lineage VALUES
  ('daily_sales', 'total_revenue', 'orders', 'amount', 'SUM(amount + tax + shipping)'),
  ('daily_sales', 'order_count', 'orders', 'id', 'COUNT(id)'),
  ('daily_sales', 'customer_name', 'customers', 'name', 'LEFT JOIN on customer_id');

Query: "Where does daily_sales.total_revenue come from?"

SELECT
  upstream_table,
  upstream_column,
  transformation
FROM column_lineage
WHERE downstream_table = 'daily_sales'
  AND downstream_column = 'total_revenue';

Level 3: Row-Level Lineage

What: Track individual record transformations

orders.id=12345 (amount=$100) ──► daily_sales.id=67 (date=2026-01-20, total=$100)
orders.id=12346 (amount=$50)  ──┘

Implementation: Lineage table

CREATE TABLE row_lineage (
  id BIGSERIAL PRIMARY KEY,
  downstream_table VARCHAR(255),
  downstream_pk BIGINT,
  upstream_table VARCHAR(255),
  upstream_pk BIGINT,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- After ETL run
INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk)
SELECT
  'daily_sales',
  ds.id,
  'orders',
  o.id
FROM daily_sales ds
JOIN orders o ON DATE(o.created_at) = ds.sale_date;

Query: "What orders contributed to daily_sales row 67?"

SELECT o.*
FROM row_lineage rl
JOIN orders o ON rl.upstream_pk = o.id
WHERE rl.downstream_table = 'daily_sales'
  AND rl.downstream_pk = 67;

Level 4: Value-Level Lineage (Finest)

What: Track transformations at the field value level

order.amount = $100
order.tax = $10
order.shipping = $5
  ↓ (SUM transformation)
daily_sales.total_revenue = $115

Implementation: Event log

CREATE TABLE value_lineage (
  id BIGSERIAL PRIMARY KEY,
  entity_type VARCHAR(50),
  entity_id BIGINT,
  field_name VARCHAR(100),
  old_value TEXT,
  new_value TEXT,
  transformation TEXT,
  source_values JSONB,  -- Array of source values
  created_at TIMESTAMPTZ DEFAULT NOW(),
  created_by VARCHAR(255)  -- User or process
);

-- Example: Revenue calculation
INSERT INTO value_lineage VALUES (
  DEFAULT,
  'daily_sales',
  67,
  'total_revenue',
  NULL,
  '115.00',
  'SUM(orders.amount + orders.tax + orders.shipping) WHERE date = 2026-01-20',
  '[{"table": "orders", "id": 12345, "amount": 100, "tax": 10, "shipping": 5}]',
  NOW(),
  'etl_pipeline_v1.2.3'
);

Provenance Capture Methods

Method 1: Code Instrumentation

Manual tracking in ETL code:

def etl_orders_to_daily_sales():
    # Extract
    orders = db.query("SELECT * FROM orders WHERE date = ?", yesterday)

    # Transform
    daily_sales = {}
    for order in orders:
        date = order['created_at'].date()
        if date not in daily_sales:
            daily_sales[date] = {'total': 0, 'count': 0, 'order_ids': []}

        daily_sales[date]['total'] += order['amount']
        daily_sales[date]['count'] += 1
        daily_sales[date]['order_ids'].append(order['id'])

    # Load with lineage tracking
    for date, metrics in daily_sales.items():
        ds_id = db.insert(
            "INSERT INTO daily_sales (date, total, count) VALUES (?, ?, ?)",
            date, metrics['total'], metrics['count']
        )

        # Track lineage
        for order_id in metrics['order_ids']:
            db.insert(
                "INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) VALUES (?, ?, ?, ?)",
                'daily_sales', ds_id, 'orders', order_id
            )

Method 2: SQL Parsing

Automatically extract lineage from SQL queries:

import sqlparse
from sqllineage.runner import LineageRunner

sql = """
INSERT INTO daily_sales (date, total_revenue, order_count)
SELECT
  DATE(created_at) as date,
  SUM(amount + tax + shipping) as total_revenue,
  COUNT(*) as order_count
FROM orders
LEFT JOIN customers ON orders.customer_id = customers.id
WHERE created_at >= '2026-01-20'
GROUP BY DATE(created_at)
"""

# Parse lineage
runner = LineageRunner(sql)

print("Source tables:", runner.source_tables)
# {'orders', 'customers'}

print("Target tables:", runner.target_tables)
# {'daily_sales'}

# Store in lineage table
for source in runner.source_tables:
    db.insert(
        "INSERT INTO table_lineage (downstream_table, upstream_table) VALUES (?, ?)",
        'daily_sales', source
    )

Method 3: Database Triggers

Capture changes automatically:

-- Audit trail for all changes
CREATE TABLE audit_log (
  id BIGSERIAL PRIMARY KEY,
  table_name VARCHAR(255),
  record_id BIGINT,
  operation VARCHAR(10),  -- INSERT, UPDATE, DELETE
  old_values JSONB,
  new_values JSONB,
  changed_by VARCHAR(255),
  changed_at TIMESTAMPTZ DEFAULT NOW()
);

-- Trigger on orders table
CREATE OR REPLACE FUNCTION audit_orders()
RETURNS TRIGGER AS $$
BEGIN
  INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values, changed_by)
  VALUES (
    'orders',
    COALESCE(NEW.id, OLD.id),
    TG_OP,
    row_to_json(OLD),
    row_to_json(NEW),
    current_user
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_orders();

Method 4: CDC (Change Data Capture)

Stream database changes:

# Using Debezium or similar CDC tool
from kafka import KafkaConsumer

consumer = KafkaConsumer('postgres.public.orders')

for message in consumer:
    change_event = json.loads(message.value)

    # Store in lineage system
    db.insert(
        "INSERT INTO change_log (table_name, operation, before, after, timestamp) VALUES (?, ?, ?, ?, ?)",
        change_event['source']['table'],
        change_event['op'],  # 'c' (create), 'u' (update), 'd' (delete)
        change_event.get('before'),
        change_event.get('after'),
        change_event['ts_ms']
    )

Impact Analysis

Downstream Impact

Question: "If I change orders.amount, what breaks?"

-- Find all downstream dependencies
WITH RECURSIVE dependencies AS (
  -- Base: Direct dependencies
  SELECT
    downstream_table,
    downstream_column,
    1 as depth
  FROM column_lineage
  WHERE upstream_table = 'orders'
    AND upstream_column = 'amount'

  UNION ALL

  -- Recursive: Dependencies of dependencies
  SELECT
    cl.downstream_table,
    cl.downstream_column,
    d.depth + 1
  FROM column_lineage cl
  JOIN dependencies d
    ON cl.upstream_table = d.downstream_table
   AND cl.upstream_column = d.downstream_column
  WHERE d.depth < 10  -- Prevent infinite loops
)
SELECT DISTINCT
  downstream_table,
  downstream_column,
  depth
FROM dependencies
ORDER BY depth, downstream_table, downstream_column;

Result:

| downstream_table       | downstream_column  | depth |
|------------------------|--------------------|-------|
| daily_sales            | total_revenue      | 1     |
| monthly_revenue        | total              | 2     |
| executive_dashboard    | ytd_revenue        | 3     |
| investor_report        | arr                | 4     |

Interpretation: Changing orders.amount affects 4 layers of downstream tables!

Upstream Impact

Question: "What source data feeds into this dashboard metric?"

-- Trace backwards to original sources
WITH RECURSIVE sources AS (
  -- Base: Direct sources
  SELECT
    upstream_table,
    upstream_column,
    1 as depth
  FROM column_lineage
  WHERE downstream_table = 'executive_dashboard'
    AND downstream_column = 'ytd_revenue'

  UNION ALL

  -- Recursive: Sources of sources
  SELECT
    cl.upstream_table,
    cl.upstream_column,
    s.depth + 1
  FROM column_lineage cl
  JOIN sources s
    ON cl.downstream_table = s.upstream_table
   AND cl.downstream_column = s.upstream_column
  WHERE s.depth < 10
)
SELECT DISTINCT
  upstream_table,
  upstream_column,
  depth
FROM sources
WHERE upstream_table NOT IN (
  SELECT DISTINCT downstream_table FROM column_lineage
)  -- Only leaf nodes (true sources)
ORDER BY upstream_table, upstream_column;

Result: Original sources for dashboard metric

| upstream_table | upstream_column | depth |
|----------------|-----------------|-------|
| orders         | amount          | 4     |
| orders         | tax             | 4     |
| orders         | shipping        | 4     |
| stripe_events  | charge_amount   | 5     |

Data Catalog

Schema Registry

Track all datasets and their metadata:

CREATE TABLE data_catalog (
  id BIGSERIAL PRIMARY KEY,
  dataset_name VARCHAR(255) UNIQUE NOT NULL,
  dataset_type VARCHAR(50),  -- 'table', 'view', 'api', 'file'
  description TEXT,
  owner VARCHAR(255),
  steward VARCHAR(255),  -- Data steward (responsible for quality)
  sensitivity VARCHAR(50),  -- 'public', 'internal', 'confidential', 'restricted'
  contains_pii BOOLEAN DEFAULT FALSE,
  retention_days INT,  -- How long to keep data
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE data_catalog_columns (
  dataset_id BIGINT REFERENCES data_catalog(id),
  column_name VARCHAR(255),
  data_type VARCHAR(50),
  description TEXT,
  is_nullable BOOLEAN,
  is_pii BOOLEAN DEFAULT FALSE,
  pii_type VARCHAR(50),  -- 'email', 'ssn', 'phone', 'name', etc.
  sample_values TEXT[],

  PRIMARY KEY (dataset_id, column_name)
);

-- Example: Register orders table
INSERT INTO data_catalog VALUES (
  DEFAULT,
  'orders',
  'table',
  'Customer orders from e-commerce platform',
  'engineering@company.com',
  'data-team@company.com',
  'internal',
  TRUE,  -- Contains PII
  2555,  -- 7 years retention
  NOW(),
  NOW()
);

INSERT INTO data_catalog_columns VALUES
  (1, 'id', 'BIGINT', 'Unique order ID', FALSE, FALSE, NULL, NULL),
  (1, 'customer_email', 'VARCHAR(255)', 'Customer email address', FALSE, TRUE, 'email', NULL),
  (1, 'amount', 'DECIMAL(10,2)', 'Order total in USD', FALSE, FALSE, NULL, '{10.99, 25.50, 100.00}');

Searchable Catalog

Find datasets by keyword:

-- Full-text search
CREATE INDEX idx_catalog_search ON data_catalog
USING GIN(to_tsvector('english', dataset_name || ' ' || description));

-- Search for "revenue"
SELECT
  dataset_name,
  dataset_type,
  description,
  owner
FROM data_catalog
WHERE to_tsvector('english', dataset_name || ' ' || description)
   @@ to_tsquery('english', 'revenue')
ORDER BY dataset_name;

Compliance & Data Privacy

GDPR: Right to be Forgotten

Track all PII to enable deletion:

-- Find all PII for a user
SELECT
  dc.dataset_name,
  dcc.column_name,
  dcc.pii_type
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.is_pii = TRUE;

-- Result: Tables/columns containing PII
| dataset_name    | column_name    | pii_type |
|-----------------|----------------|----------|
| orders          | customer_email | email    |
| users           | email          | email    |
| users           | name           | name     |
| support_tickets | email          | email    |
| analytics_events| user_id        | user_id  |

-- Generate deletion script
SELECT
  'DELETE FROM ' || dataset_name || ' WHERE ' || column_name || ' = ''' || user_email || ''';'
FROM (
  SELECT DISTINCT
    dc.dataset_name,
    dcc.column_name
  FROM data_catalog dc
  JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
  WHERE dcc.pii_type = 'email'
) subq;

-- Output:
-- DELETE FROM orders WHERE customer_email = 'user@example.com';
-- DELETE FROM users WHERE email = 'user@example.com';
-- DELETE FROM support_tickets WHERE email = 'user@example.com';

PII Tracking in Data Flow

Tag PII as it flows through pipeline:

def track_pii_flow(source_table, dest_table, pii_fields):
    """Track movement of PII between tables"""
    for field in pii_fields:
        db.insert(
            """
            INSERT INTO pii_lineage (source_table, source_column, dest_table, dest_column, tracked_at)
            VALUES (?, ?, ?, ?, NOW())
            """,
            source_table, field, dest_table, field
        )

# Usage
track_pii_flow('users', 'orders', ['email'])
track_pii_flow('orders', 'daily_sales_with_emails', ['email'])

# Query: "Where has this user's email propagated?"
db.query("""
  WITH RECURSIVE pii_flow AS (
    SELECT dest_table, dest_column, 1 as depth
    FROM pii_lineage
    WHERE source_table = 'users' AND source_column = 'email'

    UNION ALL

    SELECT pl.dest_table, pl.dest_column, pf.depth + 1
    FROM pii_lineage pl
    JOIN pii_flow pf ON pl.source_table = pf.dest_table AND pl.source_column = pf.dest_column
    WHERE pf.depth < 10
  )
  SELECT DISTINCT dest_table, dest_column FROM pii_flow;
""")

Visualization & Tools

Lineage Graph

Generate visual lineage:

import graphviz

def visualize_lineage(table_name):
    # Fetch lineage
    lineage = db.query("""
        SELECT upstream_table, downstream_table
        FROM table_lineage
        WHERE upstream_table = ? OR downstream_table = ?
    """, table_name, table_name)

    # Create graph
    dot = graphviz.Digraph()

    for row in lineage:
        dot.edge(row['upstream_table'], row['downstream_table'])

    dot.render('lineage_graph', format='png', view=True)

visualize_lineage('orders')

Output:

stripe_api ──► orders ──┬──► daily_sales ──► monthly_revenue
                        │
customers ──────────────┘

Commercial Tools

Tool Use Case Features
Apache Atlas Open-source data governance Metadata management, lineage, search
Collibra Enterprise data governance Catalog, lineage, policies, workflows
Alation Data catalog Metadata search, collaboration, lineage
Amundsen (Lyft) Open-source data discovery Search, lineage, usage analytics
DataHub (LinkedIn) Open-source metadata platform Lineage, discovery, governance
dbt Analytics engineering SQL lineage, documentation, tests

Implementation Checklist

Minimal (Start Here)

[ ] Table-level lineage tracking
[ ] Audit logs for critical tables
[ ] Data catalog for major datasets
[ ] Documentation of ETL processes

Standard

[ ] Column-level lineage
[ ] Automated lineage extraction from SQL
[ ] PII tagging and tracking
[ ] Impact analysis queries
[ ] Change notifications for downstream consumers

Advanced

[ ] Row-level lineage
[ ] Real-time lineage from CDC
[ ] Searchable data catalog
[ ] Automated GDPR compliance tools
[ ] Data quality metrics tied to lineage
[ ] Machine learning for anomaly detection

Output Format

When helping with data provenance:

## Provenance Strategy

### Lineage Level
- [ ] Table-level
- [ ] Column-level
- [ ] Row-level
- [ ] Value-level

### Capture Method
- [ ] Code instrumentation
- [ ] SQL parsing
- [ ] Database triggers
- [ ] CDC (Change Data Capture)

### Data Catalog Schema

[SQL DDL for catalog tables]

### Impact Analysis Queries

[SQL queries for upstream/downstream impact]

### PII Tracking

Tables with PII:
- [Table 1]: [Columns]
- [Table 2]: [Columns]

Deletion strategy:
[Step-by-step process]

### Visualization

[Lineage graph representation]

### Compliance Requirements
- [ ] GDPR
- [ ] CCPA
- [ ] HIPAA
- [ ] SOX
- [ ] Other: [specify]

### Tooling
- Lineage tracking: [Tool/Custom]
- Data catalog: [Tool/Custom]
- Visualization: [Tool/Custom]

Integration

Works with:

  • scalable-data-schema - Track schema evolution over time
  • data-infrastructure-at-scale - Lineage for pipelines and ETL
  • multi-source-data-conflation - Track source of merged data
  • systems-decompose - Plan lineage as part of feature design

Related Skills

Xlsx

Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas

data

Clickhouse Io

ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.

datacli

Clickhouse Io

ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.

datacli

Analyzing Financial Statements

This skill calculates key financial ratios and metrics from financial statement data for investment analysis

data

Data Storytelling

Transform data into compelling narratives using visualization, context, and persuasive structure. Use when presenting analytics to stakeholders, creating data reports, or building executive presentations.

data

Kpi Dashboard Design

Design effective KPI dashboards with metrics selection, visualization best practices, and real-time monitoring patterns. Use when building business dashboards, selecting metrics, or designing data visualization layouts.

designdata

Dbt Transformation Patterns

Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.

testingdocumenttool

Sql Optimization Patterns

Master SQL query optimization, indexing strategies, and EXPLAIN analysis to dramatically improve database performance and eliminate slow queries. Use when debugging slow queries, designing database schemas, or optimizing application performance.

designdata

Anndata

This skill should be used when working with annotated data matrices in Python, particularly for single-cell genomics analysis, managing experimental measurements with metadata, or handling large-scale biological datasets. Use when tasks involve AnnData objects, h5ad files, single-cell RNA-seq data, or integration with scanpy/scverse tools.

arttooldata

Xlsx

Spreadsheet toolkit (.xlsx/.csv). Create/edit with formulas/formatting, analyze data, visualization, recalculate formulas, for spreadsheet processing and analysis.

tooldata

Skill Information

Category:Data
Last Updated:1/23/2026