| name | qstash-job-processing |
| description | Expert knowledge on QStash async job processing, job lifecycle (pending → processing → completed/error), retry logic, timeout handling, continuation scheduling, and debugging stuck jobs. Use this skill when user asks about "qstash", "async job", "background job", "search processing", "stuck job", "job status", or "continuation". |
| allowed-tools | Read, Grep, Bash |
QStash Job Processing Expert
You are an expert in QStash-based asynchronous job processing for this platform. This skill provides knowledge about job lifecycle, retry logic, continuation patterns, and troubleshooting stuck jobs.
When To Use This Skill
This skill activates when users:
- Work with QStash job scheduling
- Debug stuck or timeout jobs
- Implement continuation logic for long-running searches
- Understand job status transitions
- Need retry and error handling patterns
- Investigate job processing delays
- Optimize job throughput
Core Knowledge
Job Lifecycle
States:
pending- Job created, waiting to startprocessing- Job actively runningcompleted- Job finished successfullyerror- Job failed with errortimeout- Job exceeded time limit
State Transitions:
pending → processing → completed
↓
error
↓
timeout
Job Table: /lib/db/schema.ts
scraping_jobs {
id: uuid
userId: text
status: 'pending' | 'processing' | 'completed' | 'error' | 'timeout'
qstashMessageId: text
processedRuns: integer
processedResults: integer
targetResults: integer
timeoutAt: timestamp
createdAt: timestamp
startedAt: timestamp
completedAt: timestamp
error: text
}
QStash Integration
Client: /lib/queue/qstash.ts
import { Client } from '@upstash/qstash';
export const qstash = new Client({
token: process.env.QSTASH_TOKEN!
});
Publishing Job:
import { qstash } from '@/lib/queue/qstash';
import { getWebhookUrl } from '@/lib/utils/url-utils';
// Schedule job
await qstash.publishJSON({
url: `${getWebhookUrl()}/api/qstash/process-search`,
body: { jobId: job.id },
delay: '5s', // Optional delay
retries: 3, // Automatic retries
notifyOnFailure: true
});
Receiving Job: /app/api/qstash/process-search/route.ts
import { Receiver } from '@upstash/qstash';
const receiver = new Receiver({
currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!,
nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!,
});
export async function POST(req: Request) {
// 1. Verify signature
const rawBody = await req.text();
const signature = req.headers.get('Upstash-Signature');
if (shouldVerifySignature()) {
const valid = await receiver.verify({
signature,
body: rawBody,
url: callbackUrl
});
if (!valid) {
return NextResponse.json({ error: 'Invalid signature' }, { status: 401 });
}
}
// 2. Parse body
const { jobId } = JSON.parse(rawBody);
// 3. Process job
const execution = await runSearchJob(jobId);
// 4. Schedule continuation if needed
if (execution.result.hasMore) {
await qstash.publishJSON({
url: callbackUrl,
body: { jobId },
delay: '10s'
});
}
return NextResponse.json({ status: execution.result.status });
}
Continuation Pattern
For Long-Running Jobs:
Instagram US Reels searches target 1000 results but can only fetch 20 per API call. Continuation pattern allows job to process in chunks.
// In job processor
export async function runSearchJob(jobId: string) {
const service = await SearchJobService.load(jobId);
const snapshot = service.snapshot();
// Check if job is complete
if (snapshot.processedResults >= snapshot.targetResults) {
await service.complete('completed', {});
return { status: 'completed', hasMore: false };
}
// Process one batch (e.g., 20 results)
const results = await fetchNextBatch(snapshot);
await service.recordResults(results);
// Check if more work needed
const hasMore = snapshot.processedResults < snapshot.targetResults;
if (!hasMore) {
await service.complete('completed', {});
}
return { status: 'processing', hasMore };
}
// In QStash handler
if (execution.result.hasMore) {
await qstash.publishJSON({
url: callbackUrl,
body: { jobId },
delay: `${config.continuationDelayMs}ms`, // e.g., 10000ms = 10s
retries: 3
});
}
Configuration:
{
continuationDelayMs: 10000, // 10 seconds between batches
maxRuns: 50, // Stop after 50 continuations
batchSize: 20, // Results per batch
timeout: 300000 // 5 minutes per batch
}
Retry Logic
QStash Automatic Retries:
- Configurable via
retriesparameter - Exponential backoff between retries
- Retry on 5xx errors only
await qstash.publishJSON({
url: callbackUrl,
body: { jobId },
retries: 3, // Will retry 3 times on failure
notifyOnFailure: true
});
Manual Retry in Handler:
export async function POST(req: Request) {
try {
const { jobId } = JSON.parse(await req.text());
const execution = await runSearchJob(jobId);
return NextResponse.json({ status: execution.result.status });
} catch (error) {
logger.error('Job failed', error, { jobId });
// Mark job as error
try {
const service = await SearchJobService.load(jobId);
await service.complete('error', { error: error.message });
} catch (completionError) {
logger.error('Failed to mark job as error', completionError);
}
return NextResponse.json({ error: error.message }, { status: 500 });
}
}
Timeout Handling
Setting Timeout:
import { db } from '@/lib/db';
import { scrapingJobs } from '@/lib/db/schema';
// When creating job
const timeoutAt = new Date(Date.now() + 5 * 60 * 1000); // 5 minutes
const [job] = await db.insert(scrapingJobs)
.values({
userId,
status: 'pending',
timeoutAt,
targetResults: 1000
})
.returning();
Checking for Timeout:
export async function runSearchJob(jobId: string) {
const service = await SearchJobService.load(jobId);
const snapshot = service.snapshot();
// Check timeout
if (snapshot.timeoutAt && new Date() > new Date(snapshot.timeoutAt)) {
await service.complete('timeout', { error: 'Job exceeded timeout limit' });
throw new Error('Job timeout');
}
// Process job...
}
Timeout Cleanup Job (Scheduled):
// Run every 5 minutes
export async function cleanupTimeoutJobs() {
const timeoutJobs = await db.query.scrapingJobs.findMany({
where: and(
eq(scrapingJobs.status, 'processing'),
lt(scrapingJobs.timeoutAt, new Date())
)
});
for (const job of timeoutJobs) {
await db.update(scrapingJobs)
.set({ status: 'timeout', error: 'Job timeout', completedAt: new Date() })
.where(eq(scrapingJobs.id, job.id));
}
}
Common Patterns
Pattern 1: Idempotent Job Processing
// Good: Check if already processed
export async function POST(req: Request) {
const { jobId } = JSON.parse(await req.text());
const service = await SearchJobService.load(jobId);
const snapshot = service.snapshot();
// Skip if already completed or error
if (snapshot.status === 'completed' || snapshot.status === 'error') {
return NextResponse.json({
status: snapshot.status,
message: 'Job already processed'
});
}
// Process job...
}
When to use: Always, to handle duplicate QStash deliveries
Pattern 2: Progress Tracking
// Good: Update progress as job runs
export async function runSearchJob(jobId: string) {
const service = await SearchJobService.load(jobId);
while (service.snapshot().processedResults < service.snapshot().targetResults) {
const batch = await fetchNextBatch();
await service.recordResults(batch);
// Update progress
const progress = (service.snapshot().processedResults / service.snapshot().targetResults) * 100;
await db.update(scrapingJobs)
.set({ progress: progress.toFixed(2) })
.where(eq(scrapingJobs.id, jobId));
logger.info('Job progress', {
jobId,
progress: `${progress.toFixed(1)}%`
});
}
}
When to use: Long-running jobs where users need visibility
Pattern 3: Exponential Backoff Continuation
// Good: Increase delay for rate-limited APIs
const baseDelay = 10000; // 10 seconds
const run = service.snapshot().processedRuns;
const delay = Math.min(baseDelay * Math.pow(1.5, run), 60000); // Max 60s
await qstash.publishJSON({
url: callbackUrl,
body: { jobId },
delay: `${delay}ms`
});
When to use: APIs with aggressive rate limits
Anti-Patterns (Avoid These)
Anti-Pattern 1: No Continuation Limit
// BAD: Infinite continuation loop
if (hasMoreResults) {
await qstash.publishJSON({ url: callbackUrl, body: { jobId } });
}
Why it's bad: Job never stops, wastes resources, costs money
Do this instead:
// GOOD: Limit continuations
const MAX_RUNS = 50;
if (hasMoreResults && service.snapshot().processedRuns < MAX_RUNS) {
await qstash.publishJSON({ url: callbackUrl, body: { jobId } });
} else {
await service.complete('completed', { reason: 'Max runs reached' });
}
Anti-Pattern 2: Updating Status to Completed on Error
// BAD: Masking errors
try {
await runSearchJob(jobId);
} catch (error) {
// Still marks as completed!
await service.complete('completed', {});
}
Why it's bad: Users think job succeeded when it failed
Do this instead:
// GOOD: Preserve error status
try {
await runSearchJob(jobId);
await service.complete('completed', {});
} catch (error) {
await service.complete('error', { error: error.message });
}
Anti-Pattern 3: No Signature Verification
// BAD: Accepting unauthenticated requests
export async function POST(req: Request) {
const { jobId } = await req.json();
await runSearchJob(jobId); // Anyone can trigger this!
}
Why it's bad: Anyone can forge requests and run expensive jobs
Do this instead:
// GOOD: Verify QStash signature
const signature = req.headers.get('Upstash-Signature');
if (!signature) {
return NextResponse.json({ error: 'Missing signature' }, { status: 401 });
}
const valid = await receiver.verify({ signature, body: rawBody, url: callbackUrl });
if (!valid) {
return NextResponse.json({ error: 'Invalid signature' }, { status: 401 });
}
Troubleshooting Guide
Problem: Job Stuck in "processing"
Symptoms:
- Job status is "processing" for hours
- No new results added
- No error logged
Diagnosis:
- Check QStash dashboard for failed deliveries
- Look for errors in application logs
- Check if continuation was scheduled
- Verify job hasn't timed out
Solution:
# 1. Inspect job state
node scripts/inspect-user-state.js --email user@example.com
# 2. Check job details manually
curl http://localhost:3000/api/jobs/{jobId} \
-H "x-dev-auth: dev-bypass"
# 3. Manually complete job if truly stuck
# Use debug endpoint or database update
Problem: Jobs Not Processing
Symptoms:
- Jobs stay in "pending" status
- QStash webhook never fires
- No logs from job processor
Diagnosis:
- Verify QStash credentials are set
- Check if webhook URL is accessible
- Look for signature verification failures
- Check QStash dashboard for delivery errors
Solution:
# 1. Verify environment variables
echo $QSTASH_TOKEN
echo $QSTASH_CURRENT_SIGNING_KEY
echo $QSTASH_NEXT_SIGNING_KEY
# 2. Test webhook locally with ngrok
ngrok http 3000
# Update NEXT_PUBLIC_SITE_URL to ngrok URL
# 3. Manually trigger job
curl -X POST http://localhost:3000/api/qstash/process-search \
-H "Content-Type: application/json" \
-d '{"jobId":"xxx-xxx-xxx"}'
Problem: Continuation Loop
Symptoms:
- Job runs 100+ times
- Never completes
processedRunskeeps increasing
Diagnosis:
- Check if
hasMorelogic is correct - Verify target results are achievable
- Look for off-by-one errors
Solution:
// Add max runs check
const MAX_RUNS = 50;
const needsContinuation =
result.status !== 'error' &&
result.hasMore &&
snapshot.processedRuns < MAX_RUNS &&
snapshot.processedResults < snapshot.targetResults;
if (!needsContinuation) {
await service.complete('completed', {
reason: snapshot.processedRuns >= MAX_RUNS ? 'Max runs reached' : 'Target met'
});
}
Related Files
/lib/queue/qstash.ts- QStash client/lib/search-engine/runner.ts- Search job runner/lib/search-engine/job-service.ts- Job state management/app/api/qstash/process-search/route.ts- Job processor endpoint/app/api/qstash/process-results/route.ts- Results processor/app/api/jobs/[id]/route.ts- Job status endpoint/scripts/debug/job/route.ts- Debug script
Testing QStash Jobs
Test Locally:
# 1. Skip signature verification
export SKIP_QSTASH_SIGNATURE=true
# 2. Trigger job manually
curl -X POST http://localhost:3000/api/qstash/process-search \
-H "Content-Type: application/json" \
-d '{"jobId":"your-job-id"}'
# 3. Check job status
curl http://localhost:3000/api/jobs/your-job-id \
-H "x-dev-auth: dev-bypass"
Test with ngrok:
# 1. Start ngrok
ngrok http 3000
# 2. Update .env.local
NEXT_PUBLIC_SITE_URL=https://your-id.ngrok.io
VERIFY_QSTASH_SIGNATURE=true
# 3. Create job via API (will auto-schedule QStash)
curl -X POST http://localhost:3000/api/scraping/instagram-us-reels \
-H "x-dev-auth: dev-bypass" \
-H "Content-Type: application/json" \
-d '{"keywords":["fitness"],"targetResults":100}'
Expected Behavior:
- Job created with status "pending"
- QStash webhook fires within 5-10 seconds
- Status changes to "processing"
- Results accumulate over time
- Continuation scheduled if needed
- Status changes to "completed" when target met