| name | oban-thinking |
| description | This skill should be used when the user asks to "add a background job", "process async", "schedule a task", "retry failed jobs", "add email sending", "run this later", "add a cron job", "unique jobs", "batch process", or mentions Oban, Oban Pro, workflows, job queues, cascades, grafting, recorded values, job args, or troubleshooting job failures. |
Oban Thinking
Paradigm shifts for Oban job processing. These insights prevent common bugs and guide proper patterns.
Part 1: Oban (Non-Pro)
The Iron Law: JSON Serialization
JOB ARGS ARE JSON. ATOMS BECOME STRINGS.
This single fact causes most Oban debugging headaches.
# Creating - atom keys are fine
MyWorker.new(%{user_id: 123})
# Processing - must use string keys (JSON converted atoms to strings)
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
# ...
end
Error Handling: Let It Crash
Don't catch errors in Oban jobs. Let them bubble up to Oban for proper handling.
Why?
- Automatic logging: Oban logs the full error with stacktrace
- Automatic retries: Jobs retry with exponential backoff
- Visibility: Failed jobs appear in Oban Web dashboard
- Consistency: Error states are tracked in the database
Anti-Pattern
# Bad: Swallowing errors
def perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} ->
Logger.error("Failed: #{reason}")
{:ok, :failed} # Silently marks as complete!
end
end
Correct Pattern
# Good: Let errors propagate
def perform(%Oban.Job{} = job) do
result = do_work!(job.args) # Raises on failure
{:ok, result}
end
# Or return error tuple - Oban treats as failure
def perform(%Oban.Job{} = job) do
case do_work(job.args) do
{:ok, result} -> {:ok, result}
{:error, reason} -> {:error, reason} # Oban will retry
end
end
When to Catch Errors
Only catch errors when you need custom retry logic or want to mark a job as permanently failed:
def perform(%Oban.Job{} = job) do
case external_api_call(job.args) do
{:ok, result} -> {:ok, result}
{:error, :not_found} -> {:cancel, :resource_not_found} # Don't retry
{:error, :rate_limited} -> {:snooze, 60} # Retry in 60 seconds
{:error, _} -> {:error, :will_retry} # Normal retry
end
end
Snoozing for Polling
Use {:snooze, seconds} for polling external state instead of manual retry logic:
def perform(%Oban.Job{} = job) do
if external_thing_finished?(job.args) do
{:ok, :done}
else
{:snooze, 5} # Check again in 5 seconds
end
end
Simple Job Chaining
For simple sequential chains (JobA → JobB → JobC), have each job enqueue the next:
def perform(%Oban.Job{} = job) do
result = do_work(job.args)
# Enqueue next job on success
NextWorker.new(%{data: result}) |> Oban.insert()
{:ok, result}
end
Don't reach for Oban Pro Workflows for linear chains.
Unique Jobs
Prevent duplicate jobs with the unique option:
use Oban.Worker,
queue: :default,
unique: [period: 60] # Only one job with same args per 60 seconds
# Or scope uniqueness to specific fields
unique: [period: 300, keys: [:user_id]]
Gotcha: Uniqueness is checked on insert, not execution. Two identical jobs inserted 61 seconds apart will both run.
High Throughput: Chunking
For millions of records, chunk work into batches rather than one job per item:
# Bad: One job per contact (millions of jobs = database strain)
Enum.each(contacts, &ContactWorker.new(%{id: &1.id}) |> Oban.insert())
# Good: Chunk into batches
contacts
|> Enum.chunk_every(100)
|> Enum.each(&BatchWorker.new(%{contact_ids: Enum.map(&1, fn c -> c.id end)}) |> Oban.insert())
Use bulk inserts without uniqueness constraints for maximum throughput.
Part 2: Oban Pro
Cascade Context: Erlang Term Serialization
Unlike regular job args, cascade context preserves atoms:
# Creating - atom keys
Workflow.put_context(%{score_run_id: id})
# Processing - atom keys still work!
def my_cascade(%{score_run_id: id}) do
# ...
end
# Dot notation works too
def later_step(context) do
context.score_run_id
context.previous_result
end
Serialization Summary
| Creating | Processing | |
|---|---|---|
| Regular jobs | atoms ok | strings only |
| Cascade context | atoms ok | atoms ok |
When to Use Workflows
Reserve Workflows for:
- Complex dependency graphs (not just linear chains)
- Fan-out/fan-in patterns
- When you need recorded values across steps
- Conditional branching based on runtime state
Don't use Workflows for simple A → B → C chains.
Workflow Composition with Graft
When you need a parent workflow to wait for a sub-workflow to complete before continuing, use add_graft instead of add_workflow.
Key Differences
| Method | Sub-workflow completes before deps run? | Output accessible? |
|---|---|---|
add_workflow |
No - just inserts jobs | No |
add_graft |
Yes - waits for all jobs | Yes, via recorded values |
Pattern: Composing Independent Concerns
Don't couple unrelated concerns (e.g., notifications) to domain-specific workflows (e.g., scoring). Instead, create a higher-level orchestrator:
# Bad: Notification logic buried in AggregateScores
defmodule AggregateScores do
def workflow(score_run_id) do
Workflow.new()
|> Workflow.add(:aggregate, AggregateJob.new(...))
|> Workflow.add(:send_notification, SendEmail.new(...), deps: :aggregate) # Wrong place!
end
end
# Good: Higher-level workflow composes scoring + notification
defmodule FullRunWithNotifications do
def workflow(site_url, opts) do
notification_opts = build_notification_opts(opts)
Workflow.new()
|> Workflow.put_context(%{notification_opts: notification_opts})
|> Workflow.add_graft(:scoring, &graft_full_run/1)
|> Workflow.add_cascade(:send_notification, &send_notification/1, deps: :scoring)
end
defp graft_full_run(context) do
# Sub-workflow doesn't know about notifications
FullRun.workflow(context.site_url, context.opts)
|> Workflow.apply_graft()
|> Oban.insert_all()
end
end
Recording Values for Dependent Steps
For a grafted workflow's output to be available to dependent steps, the final job must use recorded: true:
defmodule FinalJob do
use Oban.Pro.Worker, queue: :default, recorded: true
def perform(%Oban.Job{} = job) do
# Return value becomes available in context
{:ok, %{score_run_id: score_run_id, composite_score: score}}
end
end
Dynamic Workflow Appending
Add jobs to a running workflow with Workflow.append/2:
def perform(%Oban.Job{} = job) do
if needs_extra_step?(job.args) do
job
|> Workflow.append()
|> Workflow.add(:extra, ExtraWorker.new(%{}), deps: [:current_step])
|> Oban.insert_all()
end
{:ok, :done}
end
Caveat: Cannot override context or add dependencies to already-running jobs. For complex dynamic scenarios, check external state in the job itself.
Fan-Out/Fan-In with Batches
To run a final job after multiple paginated workflows complete, use Batch callbacks:
# Wrap workflows in a shared batch
batch_id = "import-#{import_id}"
pages
|> Enum.each(fn page ->
PageWorkflow.workflow(page)
|> Batch.from_workflow(batch_id: batch_id)
|> Oban.insert_all()
end)
# Add completion callback
Batch.new(batch_id: batch_id)
|> Batch.add_callback(:completed, CompletionWorker)
|> Oban.insert()
Tip: Include pagination workers in the batch to prevent premature completion.
Testing Workflows
Don't use inline testing mode - workflows need database interaction.
# Use run_workflow/1 for integration tests
assert %{completed: 3} =
Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{}))
|> Workflow.add(:b, WorkerB.new(%{}), deps: [:a])
|> Workflow.add(:c, WorkerC.new(%{}), deps: [:b])
|> run_workflow()
For testing recorded values between workers, insert predecessor jobs with pre-filled metadata.
Red Flags - STOP and Reconsider
Non-Pro:
- Pattern matching on atom keys in
perform/1 - Catching all errors and returning
{:ok, _} - Wrapping job logic in try/rescue
- Creating one job per item when processing millions of records
Pro:
- Using
add_workflowwhen you need to wait for completion - Coupling notifications/emails to domain workflows
- Not using
recorded: truewhen you need output from grafted workflows - Using Workflows for simple linear job chains
- Testing workflows with inline mode
Any of these? Re-read the serialization rules.