| name | debug-worker-pipeline |
| description | Debug the RQ pipeline (inv core.rqp) by exercising it against a Docker worker instance, inspecting Redis/RQ state, Docker services, and worker logs, then returning a diagnosis and next steps. Use when pipeline jobs hang, fail silently, or workers aren't processing. |
What I do
I help you debug the RQ pipeline invoked via:
inv core.rqp ...(Invoke task intasks/core.py)
I focus on runs that are processed by RQ workers (local or Docker):
- Verify the environment (Redis, backend, worker) is running
- Enqueue a small, scoped pipeline job for a county
- Track the corresponding RQ job status
- Inspect Docker worker logs (or local worker logs in host mode)
- Summarize what happened and where it failed (enqueue, import, runtime, Mongo, filesystem, etc.)
- Recommend next steps (commands or code locations to inspect)
I'm tailored specifically for this repo's layout and tasks.
When to use me
Use this skill when:
inv core.rqpappears to hang, never completes, or silently fails- Jobs show up in Redis/RQ but workers aren't processing them
workerDocker containers are running but the pipeline isn't moving- Workers get killed mid-job (OOM, container restart, signal 9/137)
- You need a quick, end-to-end view of:
- Docker services → RQ queues → worker logs → job result
- You want a reproducible debug recipe for a specific county and tax year
Do not use me for:
- Generic RQ debugging in unrelated projects
- Frontend or API debugging unrelated to the RQ pipeline
Inputs I expect
When you invoke this skill, I expect (or will ask for) the following:
county(required, string)- The county slug, e.g.
travis,pecos.
- The county slug, e.g.
tax_year(optional, int, default2025)max_records(optional, int, default500or1000)- Keep this small; this is for debugging, not full runs.
reset(optional, bool, defaultfalse)- Maps to the
resetargument oncore.rqp. - I should ask before enabling this because it may delete existing pipeline output (usually MongoDB records for that run).
- Maps to the
mode(optional, enum:"docker"|"host", default"docker")"docker": Use Dockerworkercontainers and runinvinside the backend container."host": Use local workers viainv core.rqwand local Redis.
If any of these are missing or ambiguous, I will ask a single clarifying question before proceeding.
Project assumptions
I assume:
- I am run from the project root (same directory as
tasks/,docker-compose.yml,backend/,etl/). - The Invoke collections are configured via
tasks/__init__.py, so:inv core.rqp ...inv core.rq_status ...inv core.rqw ...(invoke task for local RQ workers)inv core.ws ...(Docker worker scaling)
- Docker dev environment is described in
README.mdandtasks/core.py(dev,dev_status,ws, etc.). - RQ is configured via
etl.configand RQ helpers inetl.tasks.rq_tasks.
If these assumptions are violated (e.g. docker-compose.yml is missing), I'll stop and say so explicitly instead of guessing.
How I work (high level)
Decide mode
- Prefer
mode = "docker"if adocker-compose.ymlis present anddocker-compose psworks. - Fall back to
mode = "host"if Docker isn't available or the user explicitly asks.
- Prefer
Verify environment
- In Docker mode:
- Use the
bashtool to run:docker-compose ps redisdocker-compose ps workerdocker-compose ps backend
- If Redis or backend isn't running:
- Suggest starting dev stack, e.g.
inv core.devordocker-compose up -d. - Stop with a clear message.
- Suggest starting dev stack, e.g.
- If
workeris not running or scaled to 0:- Use
bashto run:inv core.ws --workers 1- This calls
docker-compose up -d --scale worker=1under the hood.
- This calls
- Use
- Use the
- In host mode:
- Assume Redis is available at the configured
CONFIG.redis_url. - If needed, suggest
redis-serveror the existingdocker-compose up -d redisfrom project docs. - Use
inv core.rqw --workers 1to start at least one local worker.
- Assume Redis is available at the configured
- In Docker mode:
Enqueue a scoped pipeline job
- In Docker mode:
- Use
bashto run inside the backend container, from project root:docker-compose exec backend \ inv core.rqp \ --county "<county>" \ --tax-year <tax_year> \ --max-records <max_records> \ [--reset] - Capture stdout/stderr.
- Use
- In host mode:
- Use:
inv core.rqp \ --county "<county>" \ --tax-year <tax_year> \ --max-records <max_records> \ [--reset]
- Use:
- If this step fails immediately (non-zero exit code), I:
- Report it as
enqueue_failed. - Show the last ~20 lines of output.
- Do not proceed to worker/log analysis.
- Report it as
- In Docker mode:
Locate the job in RQ
- Prefer to extract a job ID from the output if
rq_processprints one. - If the job ID is not printed:
- Use
bashto run an RQ status command:
to see queue stats.inv core.rq_status - If possible, instruct a short Python snippet (inside backend container) that:
- Connects via
get_redis_connection() - Looks at the
county_processingqueue - Filters jobs by county in the job description
- Connects via
- If I cannot reliably identify the job ID, I'll say so and fall back to:
- "job appears enqueued but job id is unknown; see RQ dashboard and queue stats."
- Use
- Prefer to extract a job ID from the output if
Monitor job status
- If I have a job ID:
- Use
bashto run:inv core.rq_status --job-id <job_id> - Interpret the result:
queuedfor too long → likely worker issue or queue mismatch.started→ worker picked it up.finished→ success; verify presence and optionallyprocessing_runs.failed→ collectexc_info/ error message.
- Use
- If I have a job ID:
Inspect worker logs
- In Docker mode:
- Use
bash:docker-compose logs worker --tail=200 - Look for:
- This
job_idif present. - Or the county name in log messages.
- OOM/Kill signals:
Killed,signal 9,exit code 137,MemoryError
- This
- Extract only a small, relevant snippet (around the failure) for the summary.
- Use
- In host mode:
- If workers are running in a visible terminal, prefer RQ dashboard / CLI.
- If logs are written to files, mention the likely locations (e.g. project logging config) and advise the user where to look.
OOM Detection Patterns:
# Check for OOM kills in Docker docker inspect parcelum-worker-1 --format='{{.State.OOMKilled}}' # Check container exit codes (137 = killed by signal 9) docker-compose ps -a | grep worker # Check dmesg for OOM killer (Linux host) dmesg | grep -i "killed process" | tail -5- In Docker mode:
Cross-check artifacts
- Optionally (if needed and cheap):
- Verify MongoDB records (pipeline syncs directly to MongoDB):
- Check
processing_runsand any other relevant collections for that county/tax_year using the Mongo tools if enabled.
- Check
- Confirm the expected records were written for that run (correct county, tax_year, and record counts).
- Verify MongoDB records (pipeline syncs directly to MongoDB):
- Optionally (if needed and cheap):
Return a structured diagnosis
I will summarize, in plain language, at least these sections:
- ENVIRONMENT
redis: up/down, from Docker or host info.backend: up/down.worker: count of workers or containers.memory: container memory limit if OOM suspected.
- PIPELINE_JOB
county,tax_year,max_records,reset.job_idif known.- RQ
state(queued/started/finished/failed/enqueue_failed/killed). - Any timestamps / durations if available.
- ERROR (if any)
- Condensed traceback or the most important error lines from:
inv core.rqpoutput, and/or- RQ job
exc_info, and/or - worker logs.
- OOM indicators: exit code 137, OOMKilled=true, MemoryError.
- Condensed traceback or the most important error lines from:
- WORKER_LOG_SNIPPET
- A short block of log lines around the error or final state.
- Omit noisy, repetitive logs.
- NEXT STEPS
- Concrete suggestions, such as:
- Fixing imports or module paths.
- Rebuilding worker containers if code changed:
docker-compose build worker && docker-compose up -d worker
- Adjusting environment variables (e.g.
MONGODB_URI,REDIS_URL). - For OOM: Implement streaming/batching (see below).
- Rerunning the same debug command once addressed.
- Concrete suggestions, such as:
- ENVIRONMENT
Safety and scope rules
- Never enqueue "all counties" in this skill:
- I always scope to a single county provided by the user.
- I default to a small
max_records(e.g. 500–1000). - I will ask before using
reset=true, since it may delete existing pipeline output (MongoDB records or related state). - If Docker or Redis is clearly not set up, I will stop and explain what is missing instead of trying to invent new infrastructure.
OOM / Memory Kill Handling
When a job is killed due to memory exhaustion:
Detection
| Signal | Meaning |
|---|---|
| Exit code 137 | Killed by SIGKILL (OOM killer) |
OOMKilled: true |
Docker detected OOM |
MemoryError in logs |
Python ran out of memory |
| Worker restarts mid-job | Container killed and restarted |
Job stuck in started then disappears |
Worker died before completing |
Root Cause Analysis
Large counties (e.g., Fort Bend 400k+ records, Harris 1M+) can exhaust memory when:
- Loading entire index files into memory - Parser loads all owners/land/improvements into dicts
- Building large batch lists - Accumulating updates before bulk write
- No streaming - Processing entire file before writing to MongoDB
Solution: Streaming Pattern
Instead of loading sub-files into memory indexes, stream them directly to MongoDB:
# BAD: Loads all records into memory
def _load_indexes(self):
self.owners_index = {}
for row in self._read_csv('owners.txt'):
prop_id = row['PropertyID']
if prop_id not in self.owners_index:
self.owners_index[prop_id] = []
self.owners_index[prop_id].append(row) # OOM risk!
# GOOD: Stream and update MongoDB directly
def _stream_owners_subfile(self):
updates = []
batch_size = 5000
for row in self._read_csv('owners.txt'):
prop_id = row['PropertyID']
owner_data = {'name': row['OwnerName'], ...}
query = {'county_id': prop_id, 'tax_year': tax_year, 'county': self.COUNTY_NAME}
update = {'$push': {'owners': owner_data}}
updates.append((query, update))
if len(updates) >= batch_size:
self.mongodb.bulk_update_parcels(updates)
updates = [] # Clear batch, free memory
# Final batch
if updates:
self.mongodb.bulk_update_parcels(updates)
Implementation Steps
- Override
_load_indexes()to return empty dicts (skip memory loading) - Override
process_full_dataset()to:- Call base class for main file processing
- Then stream each sub-file separately
- Add streaming methods:
_stream_owners_subfile(),_stream_land_subfile(),_stream_improvements_subfile() - Use
bulk_update_parcels()for batched MongoDB updates
Reference Implementation
See etl/parsers/fort_bend_etl.py for a complete streaming implementation that handles 400k+ records without OOM.
Key Considerations
| Concern | Solution |
|---|---|
| Type mismatch in queries | Ensure tax_year = int(tax_year) before querying |
| Decimal encoding | Wrap _safe_decimal() in float() before MongoDB insert |
| Matched count = 0 | Log matched_count to detect query mismatches early |
| Memory during streaming | Clear batch list after each bulk write |
Example usage scenarios
Scenario 1 — Docker worker not processing jobs
- User: "Use
debug-worker-pipelinefor countytravis2025 via Docker." - I:
- Confirm
mode="docker",max_recordsdefault,reset=false. - Check
docker-compose ps redis/backend/worker. - Run
docker-compose exec backend inv core.rqp --county travis --tax-year 2025 --max-records 500. - See job stuck in
queuedwithinv core.rq_status. - See no meaningful worker logs.
- Diagnose: workers not actually connected to the correct Redis or queue; suggest verifying
CONFIG.redis_url,ALL_QUEUES, and worker command.
- Confirm
Scenario 2 — Import error inside worker container
- User: "Debug
pecos2025 (host is using Docker, default settings)." - I:
- Enqueue job via
core.rqp. - See job
failedwithImportErrorininv core.rq_status --job-id .... - Extract stack trace from
docker-compose logs worker. - Summarize the import path and point to
etl/tasks/rq_tasks.pyoretl/config.pyas the likely fix location.
- Enqueue job via
Scenario 3 — Worker killed by OOM during large county processing
- User: "Debug
fort_bend2025, worker keeps dying mid-job." - I:
- Check
docker inspect parcelum-worker-1 --format='{{.State.OOMKilled}}'→true - Check
docker-compose ps -a→ worker exit code 137 - See job was
startedbut neverfinished, worker restarted - Diagnose: Fort Bend has 400k+ records, parser loading all sub-files into memory
- Recommend: Implement streaming pattern (see OOM section above)
- Point to
fort_bend_etl.pyas reference implementation
- Check