Claude Code Plugins

Community-maintained marketplace

Feedback

debug-worker-pipeline

@afrojuju1/county_scraper_2
1
0

Debug the RQ pipeline (inv core.rqp) by exercising it against a Docker worker instance, inspecting Redis/RQ state, Docker services, and worker logs, then returning a diagnosis and next steps. Use when pipeline jobs hang, fail silently, or workers aren't processing.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name debug-worker-pipeline
description Debug the RQ pipeline (inv core.rqp) by exercising it against a Docker worker instance, inspecting Redis/RQ state, Docker services, and worker logs, then returning a diagnosis and next steps. Use when pipeline jobs hang, fail silently, or workers aren't processing.

What I do

I help you debug the RQ pipeline invoked via:

  • inv core.rqp ... (Invoke task in tasks/core.py)

I focus on runs that are processed by RQ workers (local or Docker):

  • Verify the environment (Redis, backend, worker) is running
  • Enqueue a small, scoped pipeline job for a county
  • Track the corresponding RQ job status
  • Inspect Docker worker logs (or local worker logs in host mode)
  • Summarize what happened and where it failed (enqueue, import, runtime, Mongo, filesystem, etc.)
  • Recommend next steps (commands or code locations to inspect)

I'm tailored specifically for this repo's layout and tasks.


When to use me

Use this skill when:

  • inv core.rqp appears to hang, never completes, or silently fails
  • Jobs show up in Redis/RQ but workers aren't processing them
  • worker Docker containers are running but the pipeline isn't moving
  • Workers get killed mid-job (OOM, container restart, signal 9/137)
  • You need a quick, end-to-end view of:
    • Docker services → RQ queues → worker logs → job result
  • You want a reproducible debug recipe for a specific county and tax year

Do not use me for:

  • Generic RQ debugging in unrelated projects
  • Frontend or API debugging unrelated to the RQ pipeline

Inputs I expect

When you invoke this skill, I expect (or will ask for) the following:

  • county (required, string)
    • The county slug, e.g. travis, pecos.
  • tax_year (optional, int, default 2025)
  • max_records (optional, int, default 500 or 1000)
    • Keep this small; this is for debugging, not full runs.
  • reset (optional, bool, default false)
    • Maps to the reset argument on core.rqp.
    • I should ask before enabling this because it may delete existing pipeline output (usually MongoDB records for that run).
  • mode (optional, enum: "docker" | "host", default "docker")
    • "docker": Use Docker worker containers and run inv inside the backend container.
    • "host": Use local workers via inv core.rqw and local Redis.

If any of these are missing or ambiguous, I will ask a single clarifying question before proceeding.


Project assumptions

I assume:

  • I am run from the project root (same directory as tasks/, docker-compose.yml, backend/, etl/).
  • The Invoke collections are configured via tasks/__init__.py, so:
    • inv core.rqp ...
    • inv core.rq_status ...
    • inv core.rqw ... (invoke task for local RQ workers)
    • inv core.ws ... (Docker worker scaling)
  • Docker dev environment is described in README.md and tasks/core.py (dev, dev_status, ws, etc.).
  • RQ is configured via etl.config and RQ helpers in etl.tasks.rq_tasks.

If these assumptions are violated (e.g. docker-compose.yml is missing), I'll stop and say so explicitly instead of guessing.


How I work (high level)

  1. Decide mode

    • Prefer mode = "docker" if a docker-compose.yml is present and docker-compose ps works.
    • Fall back to mode = "host" if Docker isn't available or the user explicitly asks.
  2. Verify environment

    • In Docker mode:
      • Use the bash tool to run:
        • docker-compose ps redis
        • docker-compose ps worker
        • docker-compose ps backend
      • If Redis or backend isn't running:
        • Suggest starting dev stack, e.g. inv core.dev or docker-compose up -d.
        • Stop with a clear message.
      • If worker is not running or scaled to 0:
        • Use bash to run: inv core.ws --workers 1
          • This calls docker-compose up -d --scale worker=1 under the hood.
    • In host mode:
      • Assume Redis is available at the configured CONFIG.redis_url.
      • If needed, suggest redis-server or the existing docker-compose up -d redis from project docs.
      • Use inv core.rqw --workers 1 to start at least one local worker.
  3. Enqueue a scoped pipeline job

    • In Docker mode:
      • Use bash to run inside the backend container, from project root:
        docker-compose exec backend \
          inv core.rqp \
            --county "<county>" \
            --tax-year <tax_year> \
            --max-records <max_records> \
            [--reset]
        
      • Capture stdout/stderr.
    • In host mode:
      • Use:
        inv core.rqp \
          --county "<county>" \
          --tax-year <tax_year> \
          --max-records <max_records> \
          [--reset]
        
    • If this step fails immediately (non-zero exit code), I:
      • Report it as enqueue_failed.
      • Show the last ~20 lines of output.
      • Do not proceed to worker/log analysis.
  4. Locate the job in RQ

    • Prefer to extract a job ID from the output if rq_process prints one.
    • If the job ID is not printed:
      • Use bash to run an RQ status command:
        inv core.rq_status
        
        to see queue stats.
      • If possible, instruct a short Python snippet (inside backend container) that:
        • Connects via get_redis_connection()
        • Looks at the county_processing queue
        • Filters jobs by county in the job description
      • If I cannot reliably identify the job ID, I'll say so and fall back to:
        • "job appears enqueued but job id is unknown; see RQ dashboard and queue stats."
  5. Monitor job status

    • If I have a job ID:
      • Use bash to run:
        inv core.rq_status --job-id <job_id>
        
      • Interpret the result:
        • queued for too long → likely worker issue or queue mismatch.
        • started → worker picked it up.
        • finished → success; verify presence and optionally processing_runs.
        • failed → collect exc_info / error message.
  6. Inspect worker logs

    • In Docker mode:
      • Use bash:
        docker-compose logs worker --tail=200
        
      • Look for:
        • This job_id if present.
        • Or the county name in log messages.
        • OOM/Kill signals: Killed, signal 9, exit code 137, MemoryError
      • Extract only a small, relevant snippet (around the failure) for the summary.
    • In host mode:
      • If workers are running in a visible terminal, prefer RQ dashboard / CLI.
      • If logs are written to files, mention the likely locations (e.g. project logging config) and advise the user where to look.

    OOM Detection Patterns:

    # Check for OOM kills in Docker
    docker inspect parcelum-worker-1 --format='{{.State.OOMKilled}}'
    
    # Check container exit codes (137 = killed by signal 9)
    docker-compose ps -a | grep worker
    
    # Check dmesg for OOM killer (Linux host)
    dmesg | grep -i "killed process" | tail -5
    
  7. Cross-check artifacts

    • Optionally (if needed and cheap):
      • Verify MongoDB records (pipeline syncs directly to MongoDB):
        • Check processing_runs and any other relevant collections for that county/tax_year using the Mongo tools if enabled.
      • Confirm the expected records were written for that run (correct county, tax_year, and record counts).
  8. Return a structured diagnosis

    • I will summarize, in plain language, at least these sections:

      • ENVIRONMENT
        • redis: up/down, from Docker or host info.
        • backend: up/down.
        • worker: count of workers or containers.
        • memory: container memory limit if OOM suspected.
      • PIPELINE_JOB
        • county, tax_year, max_records, reset.
        • job_id if known.
        • RQ state (queued/started/finished/failed/enqueue_failed/killed).
        • Any timestamps / durations if available.
      • ERROR (if any)
        • Condensed traceback or the most important error lines from:
          • inv core.rqp output, and/or
          • RQ job exc_info, and/or
          • worker logs.
        • OOM indicators: exit code 137, OOMKilled=true, MemoryError.
      • WORKER_LOG_SNIPPET
        • A short block of log lines around the error or final state.
        • Omit noisy, repetitive logs.
      • NEXT STEPS
        • Concrete suggestions, such as:
          • Fixing imports or module paths.
          • Rebuilding worker containers if code changed:
            • docker-compose build worker && docker-compose up -d worker
          • Adjusting environment variables (e.g. MONGODB_URI, REDIS_URL).
          • For OOM: Implement streaming/batching (see below).
          • Rerunning the same debug command once addressed.

Safety and scope rules

  • Never enqueue "all counties" in this skill:
    • I always scope to a single county provided by the user.
  • I default to a small max_records (e.g. 500–1000).
  • I will ask before using reset=true, since it may delete existing pipeline output (MongoDB records or related state).
  • If Docker or Redis is clearly not set up, I will stop and explain what is missing instead of trying to invent new infrastructure.

OOM / Memory Kill Handling

When a job is killed due to memory exhaustion:

Detection

Signal Meaning
Exit code 137 Killed by SIGKILL (OOM killer)
OOMKilled: true Docker detected OOM
MemoryError in logs Python ran out of memory
Worker restarts mid-job Container killed and restarted
Job stuck in started then disappears Worker died before completing

Root Cause Analysis

Large counties (e.g., Fort Bend 400k+ records, Harris 1M+) can exhaust memory when:

  1. Loading entire index files into memory - Parser loads all owners/land/improvements into dicts
  2. Building large batch lists - Accumulating updates before bulk write
  3. No streaming - Processing entire file before writing to MongoDB

Solution: Streaming Pattern

Instead of loading sub-files into memory indexes, stream them directly to MongoDB:

# BAD: Loads all records into memory
def _load_indexes(self):
    self.owners_index = {}
    for row in self._read_csv('owners.txt'):
        prop_id = row['PropertyID']
        if prop_id not in self.owners_index:
            self.owners_index[prop_id] = []
        self.owners_index[prop_id].append(row)  # OOM risk!

# GOOD: Stream and update MongoDB directly
def _stream_owners_subfile(self):
    updates = []
    batch_size = 5000
    
    for row in self._read_csv('owners.txt'):
        prop_id = row['PropertyID']
        owner_data = {'name': row['OwnerName'], ...}
        
        query = {'county_id': prop_id, 'tax_year': tax_year, 'county': self.COUNTY_NAME}
        update = {'$push': {'owners': owner_data}}
        updates.append((query, update))
        
        if len(updates) >= batch_size:
            self.mongodb.bulk_update_parcels(updates)
            updates = []  # Clear batch, free memory
    
    # Final batch
    if updates:
        self.mongodb.bulk_update_parcels(updates)

Implementation Steps

  1. Override _load_indexes() to return empty dicts (skip memory loading)
  2. Override process_full_dataset() to:
    • Call base class for main file processing
    • Then stream each sub-file separately
  3. Add streaming methods: _stream_owners_subfile(), _stream_land_subfile(), _stream_improvements_subfile()
  4. Use bulk_update_parcels() for batched MongoDB updates

Reference Implementation

See etl/parsers/fort_bend_etl.py for a complete streaming implementation that handles 400k+ records without OOM.

Key Considerations

Concern Solution
Type mismatch in queries Ensure tax_year = int(tax_year) before querying
Decimal encoding Wrap _safe_decimal() in float() before MongoDB insert
Matched count = 0 Log matched_count to detect query mismatches early
Memory during streaming Clear batch list after each bulk write

Example usage scenarios

Scenario 1 — Docker worker not processing jobs

  • User: "Use debug-worker-pipeline for county travis 2025 via Docker."
  • I:
    • Confirm mode="docker", max_records default, reset=false.
    • Check docker-compose ps redis/backend/worker.
    • Run docker-compose exec backend inv core.rqp --county travis --tax-year 2025 --max-records 500.
    • See job stuck in queued with inv core.rq_status.
    • See no meaningful worker logs.
    • Diagnose: workers not actually connected to the correct Redis or queue; suggest verifying CONFIG.redis_url, ALL_QUEUES, and worker command.

Scenario 2 — Import error inside worker container

  • User: "Debug pecos 2025 (host is using Docker, default settings)."
  • I:
    • Enqueue job via core.rqp.
    • See job failed with ImportError in inv core.rq_status --job-id ....
    • Extract stack trace from docker-compose logs worker.
    • Summarize the import path and point to etl/tasks/rq_tasks.py or etl/config.py as the likely fix location.

Scenario 3 — Worker killed by OOM during large county processing

  • User: "Debug fort_bend 2025, worker keeps dying mid-job."
  • I:
    • Check docker inspect parcelum-worker-1 --format='{{.State.OOMKilled}}'true
    • Check docker-compose ps -a → worker exit code 137
    • See job was started but never finished, worker restarted
    • Diagnose: Fort Bend has 400k+ records, parser loading all sub-files into memory
    • Recommend: Implement streaming pattern (see OOM section above)
    • Point to fort_bend_etl.py as reference implementation