| name | background-jobs |
| description | Async task processing with Celery, ARQ, and Redis for Python backends. Use when offloading long-running tasks, scheduling jobs, or building worker pipelines. |
| context | fork |
| agent | data-pipeline-engineer |
| version | 1.0.0 |
| tags | background-jobs, celery, arq, redis, async, python, 2026 |
Background Job Patterns
Offload long-running tasks with async job queues.
When to Use
- Long-running tasks (report generation, data processing)
- Email/notification sending
- Scheduled/periodic tasks
- Webhook processing
- Data export/import pipelines
- Non-LLM async operations (use LangGraph for LLM workflows)
Tool Selection
| Tool | Language | Best For | Complexity |
|---|---|---|---|
| ARQ | Python (async) | FastAPI, simple jobs | Low |
| Celery | Python | Complex workflows, enterprise | High |
| RQ | Python | Simple Redis queues | Low |
| Dramatiq | Python | Reliable messaging | Medium |
ARQ (Async Redis Queue)
Setup
# backend/app/workers/arq_worker.py
from arq import create_pool
from arq.connections import RedisSettings
async def startup(ctx: dict):
"""Initialize worker resources."""
ctx["db"] = await create_db_pool()
ctx["http"] = httpx.AsyncClient()
async def shutdown(ctx: dict):
"""Cleanup worker resources."""
await ctx["db"].close()
await ctx["http"].aclose()
class WorkerSettings:
redis_settings = RedisSettings(host="redis", port=6379)
functions = [
send_email,
generate_report,
process_webhook,
]
on_startup = startup
on_shutdown = shutdown
max_jobs = 10
job_timeout = 300 # 5 minutes
Task Definition
from arq import func
async def send_email(
ctx: dict,
to: str,
subject: str,
body: str,
) -> dict:
"""Send email task."""
http = ctx["http"]
response = await http.post(
"https://api.sendgrid.com/v3/mail/send",
json={"to": to, "subject": subject, "html": body},
headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
)
return {"status": response.status_code, "to": to}
async def generate_report(
ctx: dict,
report_id: str,
format: str = "pdf",
) -> dict:
"""Generate report asynchronously."""
db = ctx["db"]
data = await db.fetch_report_data(report_id)
pdf_bytes = await render_pdf(data)
await db.save_report_file(report_id, pdf_bytes)
return {"report_id": report_id, "size": len(pdf_bytes)}
Enqueue from FastAPI
from arq import create_pool
from arq.connections import RedisSettings
# Dependency
async def get_arq_pool():
return await create_pool(RedisSettings(host="redis"))
@router.post("/api/v1/reports")
async def create_report(
data: ReportRequest,
arq: ArqRedis = Depends(get_arq_pool),
):
report = await service.create_report(data)
# Enqueue background job
job = await arq.enqueue_job(
"generate_report",
report.id,
format=data.format,
)
return {"report_id": report.id, "job_id": job.job_id}
@router.get("/api/v1/jobs/{job_id}")
async def get_job_status(
job_id: str,
arq: ArqRedis = Depends(get_arq_pool),
):
job = Job(job_id, arq)
status = await job.status()
result = await job.result() if status == JobStatus.complete else None
return {"job_id": job_id, "status": status, "result": result}
Celery (Enterprise)
Setup
# backend/app/workers/celery_app.py
from celery import Celery
celery_app = Celery(
"skillforge",
broker="redis://redis:6379/0",
backend="redis://redis:6379/1",
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
task_track_started=True,
task_time_limit=600, # 10 minutes hard limit
task_soft_time_limit=540, # 9 minutes soft limit
worker_prefetch_multiplier=1, # Fair distribution
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True,
)
Task Definition
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def send_email(self, to: str, subject: str, body: str) -> dict:
"""Send email with automatic retry."""
try:
response = requests.post(
"https://api.sendgrid.com/v3/mail/send",
json={"to": to, "subject": subject, "html": body},
headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
timeout=30,
)
response.raise_for_status()
return {"status": "sent", "to": to}
except Exception as exc:
logger.error(f"Email failed: {exc}")
raise self.retry(exc=exc)
@shared_task(bind=True)
def generate_report(self, report_id: str) -> dict:
"""Long-running report generation."""
self.update_state(state="PROGRESS", meta={"step": "fetching"})
data = fetch_report_data(report_id)
self.update_state(state="PROGRESS", meta={"step": "rendering"})
pdf = render_pdf(data)
self.update_state(state="PROGRESS", meta={"step": "saving"})
save_report(report_id, pdf)
return {"report_id": report_id, "size": len(pdf)}
Chains and Groups
from celery import chain, group, chord
# Sequential execution
workflow = chain(
extract_data.s(source_id),
transform_data.s(),
load_data.s(destination_id),
)
result = workflow.apply_async()
# Parallel execution
parallel = group(
process_chunk.s(chunk) for chunk in chunks
)
result = parallel.apply_async()
# Parallel with callback
chord_workflow = chord(
[process_chunk.s(chunk) for chunk in chunks],
aggregate_results.s(),
)
result = chord_workflow.apply_async()
Periodic Tasks (Celery Beat)
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"cleanup-expired-sessions": {
"task": "app.workers.tasks.cleanup_sessions",
"schedule": crontab(minute=0, hour="*/6"), # Every 6 hours
},
"generate-daily-report": {
"task": "app.workers.tasks.daily_report",
"schedule": crontab(minute=0, hour=2), # 2 AM daily
},
"sync-external-data": {
"task": "app.workers.tasks.sync_data",
"schedule": 300.0, # Every 5 minutes
},
}
FastAPI Integration
from fastapi import BackgroundTasks
@router.post("/api/v1/users")
async def create_user(
data: UserCreate,
background_tasks: BackgroundTasks,
):
user = await service.create_user(data)
# Simple background task (in-process)
background_tasks.add_task(send_welcome_email, user.email)
return user
# For distributed tasks, use ARQ/Celery
@router.post("/api/v1/exports")
async def create_export(
data: ExportRequest,
arq: ArqRedis = Depends(get_arq_pool),
):
job = await arq.enqueue_job("export_data", data.dict())
return {"job_id": job.job_id}
Job Status Tracking
from enum import Enum
class JobStatus(Enum):
PENDING = "pending"
STARTED = "started"
PROGRESS = "progress"
SUCCESS = "success"
FAILURE = "failure"
REVOKED = "revoked"
@router.get("/api/v1/jobs/{job_id}")
async def get_job(job_id: str):
# Celery
result = AsyncResult(job_id, app=celery_app)
return {
"job_id": job_id,
"status": result.status,
"result": result.result if result.ready() else None,
"progress": result.info if result.status == "PROGRESS" else None,
}
Anti-Patterns (FORBIDDEN)
# NEVER run long tasks synchronously
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest):
pdf = await generate_pdf(data) # Blocks for minutes!
return pdf
# NEVER lose jobs on failure
@shared_task
def risky_task():
do_work() # No retry, no error handling
# NEVER store large results in Redis
@shared_task
def process_file(file_id: str) -> bytes:
return large_file_bytes # Store in S3/DB instead!
# NEVER use BackgroundTasks for distributed work
background_tasks.add_task(long_running_job) # Lost if server restarts
Key Decisions
| Decision | Recommendation |
|---|---|
| Simple async | ARQ (native async) |
| Complex workflows | Celery (chains, chords) |
| In-process quick | FastAPI BackgroundTasks |
| LLM workflows | LangGraph (not Celery) |
| Result storage | Redis for status, S3/DB for data |
| Retry strategy | Exponential backoff with jitter |
Related Skills
langgraph-checkpoints- LLM workflow persistenceresilience-patterns- Retry and fallbackobservability-monitoring- Job metrics
Capability Details
arq-tasks
Keywords: arq, async queue, redis queue, background task Solves:
- How to run async background tasks in FastAPI?
- Simple Redis job queue
celery-tasks
Keywords: celery, task queue, distributed tasks, worker Solves:
- Enterprise task queue
- Complex job workflows
celery-workflows
Keywords: chain, group, chord, celery workflow Solves:
- Sequential task execution
- Parallel task processing
periodic-tasks
Keywords: periodic, scheduled, cron, celery beat Solves:
- Run tasks on schedule
- Cron-like job scheduling