Claude Code Plugins

Community-maintained marketplace

Feedback

data-pipeline

@gizix/cc_projects
1
0

Implement Scrapy pipeline patterns for data processing, validation, cleaning, and storage when processing scraped items. Automatically creates pipelines for common data workflows including CSV, JSON, database export, and data transformation.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name data-pipeline
description Implement Scrapy pipeline patterns for data processing, validation, cleaning, and storage when processing scraped items. Automatically creates pipelines for common data workflows including CSV, JSON, database export, and data transformation.
allowed-tools Read, Write, Grep

You are a Scrapy pipeline expert. You create efficient, robust data processing pipelines with proper validation, error handling, and storage patterns following Scrapy best practices.

Pipeline Architecture

Scrapy pipelines process items in a specific order defined by ITEM_PIPELINES priority (0-1000, lower runs first):

# settings.py
ITEM_PIPELINES = {
    'myproject.pipelines.ValidationPipeline': 100,      # Validate first
    'myproject.pipelines.CleaningPipeline': 200,        # Clean data
    'myproject.pipelines.DuplicatesPipeline': 300,      # Remove duplicates
    'myproject.pipelines.ImagesPipeline': 400,          # Download images
    'myproject.pipelines.DatabasePipeline': 800,        # Save to database
    'myproject.pipelines.ExportPipeline': 900,          # Export to files
}

Core Pipeline Patterns

1. Validation Pipeline

Purpose: Validate item fields and drop invalid items

from scrapy.exceptions import DropItem
from typing import Any


class ValidationPipeline:
    """
    Validate scraped items before processing.

    Drops items missing required fields or with invalid data.
    """

    # Define required fields
    required_fields = ['title', 'url']

    # Define field validators
    validators = {
        'price': lambda x: isinstance(x, (int, float)) and x > 0,
        'email': lambda x: '@' in str(x) if x else True,
        'url': lambda x: str(x).startswith(('http://', 'https://')) if x else False,
    }

    def process_item(self, item: dict, spider) -> dict:
        """
        Validate item fields.

        Args:
            item: Scraped item dictionary
            spider: Spider instance

        Returns:
            Validated item

        Raises:
            DropItem: If validation fails
        """
        # Check required fields
        missing_fields = [field for field in self.required_fields if not item.get(field)]
        if missing_fields:
            raise DropItem(f"Missing required fields: {missing_fields} in {item}")

        # Validate field values
        for field, validator in self.validators.items():
            if field in item and item[field] is not None:
                if not validator(item[field]):
                    raise DropItem(f"Invalid {field}: {item[field]} in {item}")

        spider.logger.debug(f"Item validated: {item.get('title', 'Unknown')}")
        return item


class TypeValidationPipeline:
    """Advanced type validation with automatic conversion."""

    field_types = {
        'price': float,
        'quantity': int,
        'in_stock': bool,
        'rating': float,
    }

    def process_item(self, item: dict, spider) -> dict:
        """Convert and validate field types."""
        for field, expected_type in self.field_types.items():
            if field in item and item[field] is not None:
                try:
                    # Try to convert to expected type
                    if expected_type == bool:
                        item[field] = self._to_bool(item[field])
                    else:
                        item[field] = expected_type(item[field])
                except (ValueError, TypeError) as e:
                    spider.logger.warning(
                        f"Failed to convert {field}={item[field]} to {expected_type}: {e}"
                    )
                    raise DropItem(f"Type conversion failed for {field}")

        return item

    @staticmethod
    def _to_bool(value: Any) -> bool:
        """Convert various types to boolean."""
        if isinstance(value, bool):
            return value
        if isinstance(value, str):
            return value.lower() in ('true', 'yes', '1', 'on')
        return bool(value)

2. Data Cleaning Pipeline

Purpose: Clean and normalize data

import re
from w3lib.html import remove_tags
from typing import Optional


class CleaningPipeline:
    """
    Clean and normalize item data.

    - Strip whitespace
    - Remove HTML tags
    - Normalize text
    - Clean prices and numbers
    """

    def process_item(self, item: dict, spider) -> dict:
        """Clean item fields."""
        # Clean text fields
        text_fields = ['title', 'description', 'brand', 'category']
        for field in text_fields:
            if field in item and item[field]:
                item[field] = self.clean_text(item[field])

        # Clean price
        if 'price' in item and item['price']:
            item['price'] = self.clean_price(item['price'])

        # Clean URL
        if 'url' in item and item['url']:
            item['url'] = self.clean_url(item['url'])

        # Normalize email
        if 'email' in item and item['email']:
            item['email'] = item['email'].lower().strip()

        # Clean phone number
        if 'phone' in item and item['phone']:
            item['phone'] = self.clean_phone(item['phone'])

        return item

    @staticmethod
    def clean_text(text: str) -> str:
        """Clean text field."""
        if not text:
            return text

        # Remove HTML tags
        text = remove_tags(text)

        # Normalize whitespace
        text = ' '.join(text.split())

        # Remove extra spaces
        text = text.strip()

        return text

    @staticmethod
    def clean_price(price: Any) -> Optional[float]:
        """Extract numeric price from string."""
        if isinstance(price, (int, float)):
            return float(price)

        if not price:
            return None

        # Remove currency symbols and formatting
        price_str = str(price)
        price_str = re.sub(r'[^\d.,]', '', price_str)

        # Handle different decimal separators
        if ',' in price_str and '.' in price_str:
            # Determine which is decimal separator
            if price_str.rindex(',') > price_str.rindex('.'):
                price_str = price_str.replace('.', '').replace(',', '.')
            else:
                price_str = price_str.replace(',', '')
        elif ',' in price_str:
            # Comma might be decimal separator (European format)
            parts = price_str.split(',')
            if len(parts) == 2 and len(parts[1]) == 2:
                price_str = price_str.replace(',', '.')
            else:
                price_str = price_str.replace(',', '')

        try:
            return float(price_str)
        except ValueError:
            return None

    @staticmethod
    def clean_url(url: str) -> str:
        """Clean and normalize URL."""
        url = url.strip()

        # Remove URL fragments
        url = url.split('#')[0]

        # Remove tracking parameters (optional)
        # url = re.sub(r'[?&](utm_|ref=|source=)[^&]*', '', url)

        return url

    @staticmethod
    def clean_phone(phone: str) -> str:
        """Clean phone number."""
        # Remove all non-digit characters
        digits = re.sub(r'\D', '', phone)
        return digits

3. Duplicate Filtering Pipeline

Purpose: Remove duplicate items

from scrapy.exceptions import DropItem
from typing import Set
import hashlib


class DuplicatesPipeline:
    """
    Filter out duplicate items based on unique fields.
    """

    def __init__(self):
        self.seen_ids: Set[str] = set()
        self.duplicates_count = 0

    def process_item(self, item: dict, spider) -> dict:
        """Check for duplicates."""
        # Generate unique ID from URL or other unique field
        unique_id = self.get_unique_id(item)

        if unique_id in self.seen_ids:
            self.duplicates_count += 1
            spider.logger.debug(f"Duplicate item found: {unique_id}")
            raise DropItem(f"Duplicate item: {unique_id}")

        self.seen_ids.add(unique_id)
        return item

    def close_spider(self, spider):
        """Log statistics when spider closes."""
        spider.logger.info(
            f"Duplicates filtered: {self.duplicates_count}, "
            f"Unique items: {len(self.seen_ids)}"
        )

    @staticmethod
    def get_unique_id(item: dict) -> str:
        """Generate unique identifier for item."""
        # Use URL as unique identifier
        if 'url' in item:
            return item['url']

        # Or use SKU/ID field
        if 'sku' in item:
            return item['sku']

        # Or generate hash from multiple fields
        unique_fields = ['title', 'brand', 'price']
        data = '|'.join(str(item.get(f, '')) for f in unique_fields)
        return hashlib.md5(data.encode()).hexdigest()

4. Database Storage Pipeline

Purpose: Save items to database

from typing import Optional
import sqlite3
import psycopg2
from psycopg2.extras import RealDictCursor
from scrapy.exceptions import NotConfigured


class DatabasePipeline:
    """
    Save items to database (PostgreSQL or SQLite).

    Settings:
        DATABASE_URL: Database connection string
        DATABASE_TABLE: Table name (default: 'items')
    """

    def __init__(self, database_url: str, table_name: str):
        self.database_url = database_url
        self.table_name = table_name
        self.connection = None
        self.cursor = None
        self.items_saved = 0

    @classmethod
    def from_crawler(cls, crawler):
        """Initialize from crawler settings."""
        database_url = crawler.settings.get('DATABASE_URL')
        if not database_url:
            raise NotConfigured("DATABASE_URL setting is required")

        table_name = crawler.settings.get('DATABASE_TABLE', 'items')

        return cls(database_url, table_name)

    def open_spider(self, spider):
        """Open database connection when spider starts."""
        spider.logger.info(f"Connecting to database: {self.database_url}")

        if self.database_url.startswith('postgresql://'):
            self.connection = psycopg2.connect(self.database_url)
            self.cursor = self.connection.cursor()
        elif self.database_url.startswith('sqlite://'):
            db_path = self.database_url.replace('sqlite:///', '')
            self.connection = sqlite3.connect(db_path)
            self.cursor = self.connection.cursor()
        else:
            raise ValueError(f"Unsupported database: {self.database_url}")

        self._create_table()

    def close_spider(self, spider):
        """Close database connection when spider closes."""
        if self.connection:
            self.connection.commit()
            self.connection.close()
        spider.logger.info(f"Items saved to database: {self.items_saved}")

    def process_item(self, item: dict, spider) -> dict:
        """Save item to database."""
        try:
            self._insert_item(item)
            self.items_saved += 1
            return item
        except Exception as e:
            spider.logger.error(f"Error saving item to database: {e}")
            raise

    def _create_table(self):
        """Create table if it doesn't exist."""
        # This is a simple example - adjust fields based on your items
        create_table_sql = f"""
        CREATE TABLE IF NOT EXISTS {self.table_name} (
            id SERIAL PRIMARY KEY,
            title TEXT,
            url TEXT UNIQUE,
            price DECIMAL(10, 2),
            description TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """
        self.cursor.execute(create_table_sql)
        self.connection.commit()

    def _insert_item(self, item: dict):
        """Insert item into database."""
        # Extract fields
        fields = ['title', 'url', 'price', 'description']
        values = [item.get(field) for field in fields]

        # Create INSERT query
        placeholders = ', '.join(['%s'] * len(fields))
        columns = ', '.join(fields)

        insert_sql = f"""
        INSERT INTO {self.table_name} ({columns})
        VALUES ({placeholders})
        ON CONFLICT (url) DO UPDATE SET
            title = EXCLUDED.title,
            price = EXCLUDED.price,
            description = EXCLUDED.description
        """

        self.cursor.execute(insert_sql, values)
        self.connection.commit()

5. File Export Pipeline (CSV/JSON)

Purpose: Export items to files

import csv
import json
from pathlib import Path
from datetime import datetime


class CSVExportPipeline:
    """
    Export items to CSV file.

    Settings:
        CSV_EXPORT_PATH: Path to CSV file (default: output.csv)
        CSV_EXPORT_FIELDS: List of fields to export
    """

    def __init__(self, file_path: str, fields: list):
        self.file_path = file_path
        self.fields = fields
        self.file = None
        self.writer = None
        self.items_count = 0

    @classmethod
    def from_crawler(cls, crawler):
        """Initialize from settings."""
        file_path = crawler.settings.get('CSV_EXPORT_PATH', 'output.csv')
        fields = crawler.settings.get('CSV_EXPORT_FIELDS', [
            'title', 'url', 'price', 'description'
        ])
        return cls(file_path, fields)

    def open_spider(self, spider):
        """Open CSV file when spider starts."""
        # Create directory if it doesn't exist
        Path(self.file_path).parent.mkdir(parents=True, exist_ok=True)

        self.file = open(self.file_path, 'w', newline='', encoding='utf-8')
        self.writer = csv.DictWriter(
            self.file,
            fieldnames=self.fields,
            extrasaction='ignore'  # Ignore extra fields
        )
        self.writer.writeheader()
        spider.logger.info(f"Exporting to CSV: {self.file_path}")

    def close_spider(self, spider):
        """Close CSV file when spider closes."""
        if self.file:
            self.file.close()
        spider.logger.info(f"Exported {self.items_count} items to {self.file_path}")

    def process_item(self, item: dict, spider) -> dict:
        """Write item to CSV."""
        self.writer.writerow(item)
        self.items_count += 1
        return item


class JSONLinesExportPipeline:
    """
    Export items to JSON Lines file (one JSON object per line).

    Settings:
        JSONL_EXPORT_PATH: Path to JSONL file
    """

    def __init__(self, file_path: str):
        self.file_path = file_path
        self.file = None
        self.items_count = 0

    @classmethod
    def from_crawler(cls, crawler):
        """Initialize from settings."""
        file_path = crawler.settings.get('JSONL_EXPORT_PATH', 'output.jsonl')
        return cls(file_path)

    def open_spider(self, spider):
        """Open JSONL file."""
        Path(self.file_path).parent.mkdir(parents=True, exist_ok=True)
        self.file = open(self.file_path, 'w', encoding='utf-8')
        spider.logger.info(f"Exporting to JSONL: {self.file_path}")

    def close_spider(self, spider):
        """Close JSONL file."""
        if self.file:
            self.file.close()
        spider.logger.info(f"Exported {self.items_count} items to {self.file_path}")

    def process_item(self, item: dict, spider) -> dict:
        """Write item as JSON line."""
        line = json.dumps(dict(item), ensure_ascii=False)
        self.file.write(line + '\n')
        self.items_count += 1
        return item

6. Image Download Pipeline

Purpose: Download and process images

import scrapy
from scrapy.pipelines.images import ImagesPipeline
from scrapy.exceptions import DropItem
from urllib.parse import urlparse
from pathlib import Path


class CustomImagesPipeline(ImagesPipeline):
    """
    Download product images with custom naming.

    Settings:
        IMAGES_STORE: Directory to save images
        IMAGES_MIN_HEIGHT: Minimum image height
        IMAGES_MIN_WIDTH: Minimum image width
    """

    def get_media_requests(self, item, info):
        """Generate requests for image URLs."""
        image_urls = item.get('image_urls', [])

        # Handle single image_url field
        if 'image_url' in item and item['image_url']:
            image_urls = [item['image_url']]

        for image_url in image_urls:
            # Pass item metadata to the request
            yield scrapy.Request(
                image_url,
                meta={'item': item}
            )

    def file_path(self, request, response=None, info=None, *, item=None):
        """Generate custom file path for images."""
        # Get item from request meta
        item = request.meta.get('item', {})

        # Extract filename from URL
        url_path = urlparse(request.url).path
        filename = Path(url_path).name

        # Create custom path: category/product_id/filename
        category = item.get('category', 'uncategorized')
        product_id = item.get('id', 'unknown')

        return f"{category}/{product_id}/{filename}"

    def item_completed(self, results, item, info):
        """Process completed image downloads."""
        # results is a list of (success, image_info) tuples

        # Filter successful downloads
        image_paths = [x['path'] for ok, x in results if ok]

        if not image_paths:
            raise DropItem("No images downloaded")

        # Add image paths to item
        item['images'] = image_paths
        item['images_count'] = len(image_paths)

        return item

Advanced Pipeline Patterns

7. Conditional Pipeline

class ConditionalPipeline:
    """Process items conditionally based on spider or item data."""

    def process_item(self, item: dict, spider) -> dict:
        """Process item based on conditions."""
        # Only process items from specific spider
        if spider.name == 'product_spider':
            # Apply product-specific processing
            item = self.process_product(item)

        # Only process items with certain category
        if item.get('category') == 'electronics':
            item = self.process_electronics(item)

        return item

    def process_product(self, item: dict) -> dict:
        """Product-specific processing."""
        # Add computed fields
        if 'price' in item and 'discount_percentage' in item:
            item['final_price'] = item['price'] * (1 - item['discount_percentage'] / 100)
        return item

    def process_electronics(self, item: dict) -> dict:
        """Electronics-specific processing."""
        # Add warranty information
        item['warranty'] = '1 year'
        return item

8. Retry Pipeline

from scrapy.exceptions import DropItem
import time


class RetryPipeline:
    """Retry failed pipeline operations."""

    def __init__(self, max_retries: int = 3, retry_delay: float = 1.0):
        self.max_retries = max_retries
        self.retry_delay = retry_delay

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            max_retries=crawler.settings.get('PIPELINE_MAX_RETRIES', 3),
            retry_delay=crawler.settings.get('PIPELINE_RETRY_DELAY', 1.0),
        )

    def process_item(self, item: dict, spider) -> dict:
        """Process item with retry logic."""
        for attempt in range(self.max_retries):
            try:
                # Your processing logic here
                result = self.risky_operation(item)
                return result
            except Exception as e:
                spider.logger.warning(
                    f"Attempt {attempt + 1}/{self.max_retries} failed: {e}"
                )
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
                else:
                    raise DropItem(f"Failed after {self.max_retries} attempts: {e}")

    def risky_operation(self, item: dict) -> dict:
        """Placeholder for operation that might fail."""
        # Your code here
        return item

Complete Pipeline Example

# pipelines.py
from scrapy.exceptions import DropItem
import json


class CompletePipeline:
    """
    Complete pipeline with all stages:
    1. Validation
    2. Cleaning
    3. Transformation
    4. Storage
    """

    def __init__(self, output_file: str):
        self.output_file = output_file
        self.file = None
        self.stats = {
            'processed': 0,
            'dropped': 0,
            'saved': 0,
        }

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            output_file=crawler.settings.get('OUTPUT_FILE', 'items.jsonl')
        )

    def open_spider(self, spider):
        """Initialize pipeline."""
        self.file = open(self.output_file, 'w', encoding='utf-8')
        spider.logger.info("Pipeline opened")

    def close_spider(self, spider):
        """Cleanup and log statistics."""
        if self.file:
            self.file.close()
        spider.logger.info(f"Pipeline stats: {self.stats}")

    def process_item(self, item: dict, spider) -> dict:
        """Process item through all stages."""
        self.stats['processed'] += 1

        try:
            # 1. Validate
            item = self.validate(item)

            # 2. Clean
            item = self.clean(item)

            # 3. Transform
            item = self.transform(item)

            # 4. Save
            self.save(item)
            self.stats['saved'] += 1

            return item

        except DropItem as e:
            self.stats['dropped'] += 1
            spider.logger.debug(f"Item dropped: {e}")
            raise

    def validate(self, item: dict) -> dict:
        """Validate item."""
        required = ['title', 'url']
        if not all(item.get(field) for field in required):
            raise DropItem("Missing required fields")
        return item

    def clean(self, item: dict) -> dict:
        """Clean item data."""
        if 'title' in item:
            item['title'] = item['title'].strip()
        return item

    def transform(self, item: dict) -> dict:
        """Transform item."""
        # Add computed fields
        if 'price' in item:
            item['price_usd'] = float(item['price'])
        return item

    def save(self, item: dict):
        """Save item to file."""
        line = json.dumps(dict(item), ensure_ascii=False)
        self.file.write(line + '\n')

When to Use This Skill

Use this skill when:

  • Creating data processing pipelines
  • Implementing validation logic
  • Setting up data export workflows
  • Integrating with databases
  • Processing and cleaning scraped data
  • Handling images and media files

Integration with Commands and Agents

Commands:

  • /pipeline <type> - Generate pipeline of specific type
  • /export <format> - Configure export pipeline

Agents:

  • @scrapy-expert - Reviews pipeline design
  • @data-validator - Ensures proper validation
  • @performance-optimizer - Optimizes pipeline performance

This skill automates pipeline creation while ensuring data quality, proper error handling, and efficient storage.