| name | etl-sync-job-builder |
| description | Designs reliable ETL and data synchronization jobs with incremental updates, idempotency guarantees, watermark tracking, error handling, and retry logic. Use for "ETL jobs", "data sync", "incremental sync", or "data pipeline". |
ETL/Sync Job Builder
Build reliable, incremental data synchronization pipelines.
ETL Job Pattern
// jobs/sync-users.ts
interface SyncJob {
name: string;
source: "database" | "api" | "file";
destination: "database" | "warehouse" | "s3";
schedule: string;
}
export class ETLJob {
constructor(private name: string, private watermarkKey: string) {}
async run() {
console.log(`🔄 Starting ${this.name}...`);
try {
// 1. Get last watermark
const lastSync = await this.getWatermark();
console.log(` Last sync: ${lastSync}`);
// 2. Extract data
const data = await this.extract(lastSync);
console.log(` Extracted ${data.length} records`);
// 3. Transform data
const transformed = await this.transform(data);
// 4. Load data
await this.load(transformed);
// 5. Update watermark
await this.updateWatermark(new Date());
console.log(`✅ ${this.name} complete`);
} catch (error) {
console.error(`❌ ${this.name} failed:`, error);
throw error;
}
}
private async extract(since: Date) {
// Extract logic
return [];
}
private async transform(data: any[]) {
// Transform logic
return data;
}
private async load(data: any[]) {
// Load logic
}
private async getWatermark(): Promise<Date> {
const watermark = await prisma.syncWatermark.findUnique({
where: { key: this.watermarkKey },
});
return watermark?.lastSync || new Date(0);
}
private async updateWatermark(timestamp: Date) {
await prisma.syncWatermark.upsert({
where: { key: this.watermarkKey },
create: { key: this.watermarkKey, lastSync: timestamp },
update: { lastSync: timestamp },
});
}
}
Watermark Strategy
// Track sync progress
model SyncWatermark {
key String @id
lastSync DateTime
metadata Json?
@@index([lastSync])
}
// Incremental sync using watermark
async function syncOrdersIncremental() {
// Get last sync time
const watermark = await prisma.syncWatermark.findUnique({
where: { key: "orders_sync" },
});
const lastSync = watermark?.lastSync || new Date(0);
// Fetch only new/updated records
const newOrders = await sourceDb.order.findMany({
where: {
updated_at: { gt: lastSync },
},
orderBy: { updated_at: "asc" },
});
console.log(`📦 Syncing ${newOrders.length} orders...`);
// Process in batches
for (let i = 0; i < newOrders.length; i += 100) {
const batch = newOrders.slice(i, i + 100);
await destinationDb.order.createMany({
data: batch,
skipDuplicates: true, // Idempotency
});
}
// Update watermark to latest record
if (newOrders.length > 0) {
const latestTimestamp = newOrders[newOrders.length - 1].updated_at;
await prisma.syncWatermark.upsert({
where: { key: "orders_sync" },
create: { key: "orders_sync", lastSync: latestTimestamp },
update: { lastSync: latestTimestamp },
});
}
console.log(`✅ Sync complete`);
}
Idempotent Upsert Pattern
// Idempotent sync - safe to run multiple times
async function syncUsersIdempotent(users: User[]) {
for (const user of users) {
await prisma.user.upsert({
where: { id: user.id },
create: user,
update: {
email: user.email,
name: user.name,
updated_at: user.updated_at,
},
});
}
}
// Batch upsert for better performance
async function syncUsersBatch(users: User[]) {
// PostgreSQL: Use ON CONFLICT
await prisma.$executeRaw`
INSERT INTO users (id, email, name, updated_at)
SELECT * FROM UNNEST(
${users.map((u) => u.id)}::bigint[],
${users.map((u) => u.email)}::text[],
${users.map((u) => u.name)}::text[],
${users.map((u) => u.updated_at)}::timestamp[]
)
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
WHERE users.updated_at < EXCLUDED.updated_at
`;
}
Retry Logic with Exponential Backoff
async function syncWithRetry<T>(
operation: () => Promise<T>,
maxRetries: number = 3,
baseDelay: number = 1000
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
if (attempt === maxRetries) throw error;
const delay = baseDelay * Math.pow(2, attempt);
console.log(` Retry ${attempt + 1}/${maxRetries} after ${delay}ms`);
await sleep(delay);
}
}
throw new Error("Max retries exceeded");
}
// Usage
await syncWithRetry(
async () => {
return await syncOrders();
},
3,
1000
);
Change Data Capture (CDC)
// Listen to database changes
import { PrismaClient } from "@prisma/client";
const prisma = new PrismaClient();
// PostgreSQL: Listen to logical replication
async function setupCDC() {
await prisma.$executeRaw`
CREATE PUBLICATION orders_publication FOR TABLE orders;
`;
// Subscribe to changes (using pg library)
const client = await pg.connect();
client.query("LISTEN orders_changed;");
client.on("notification", async (msg) => {
const change = JSON.parse(msg.payload);
if (change.operation === "INSERT" || change.operation === "UPDATE") {
await syncOrder(change.data);
}
});
}
Conflict Resolution
interface ConflictResolution {
strategy: "source-wins" | "dest-wins" | "latest-wins" | "merge";
}
async function syncWithConflictResolution(
sourceRecord: any,
destRecord: any,
strategy: ConflictResolution["strategy"]
) {
if (strategy === "source-wins") {
return sourceRecord;
}
if (strategy === "dest-wins") {
return destRecord;
}
if (strategy === "latest-wins") {
return sourceRecord.updated_at > destRecord.updated_at
? sourceRecord
: destRecord;
}
if (strategy === "merge") {
// Merge non-null fields
return {
...destRecord,
...Object.fromEntries(
Object.entries(sourceRecord).filter(([_, v]) => v != null)
),
};
}
}
Monitoring & Observability
// Track sync job metrics
interface SyncMetrics {
jobName: string;
startTime: Date;
endTime: Date;
recordsProcessed: number;
recordsInserted: number;
recordsUpdated: number;
recordsSkipped: number;
errors: number;
durationMs: number;
}
async function logSyncMetrics(metrics: SyncMetrics) {
await prisma.syncMetric.create({
data: metrics,
});
console.log(`
📊 Sync Metrics
Job: ${metrics.jobName}
Records: ${metrics.recordsProcessed}
Inserted: ${metrics.recordsInserted}
Updated: ${metrics.recordsUpdated}
Errors: ${metrics.errors}
Duration: ${metrics.durationMs}ms
`);
}
Full ETL Job Example
// jobs/sync-orders-to-warehouse.ts
export class OrdersETLJob extends ETLJob {
constructor() {
super("orders-etl", "orders_warehouse_sync");
}
async extract(since: Date): Promise<Order[]> {
return prisma.order.findMany({
where: {
updated_at: { gt: since },
},
include: {
items: true,
user: true,
},
orderBy: { updated_at: "asc" },
});
}
async transform(orders: Order[]): Promise<WarehouseOrder[]> {
return orders.map((order) => ({
order_id: order.id,
user_email: order.user.email,
total_amount: order.total,
item_count: order.items.length,
status: order.status,
order_date: order.created_at,
synced_at: new Date(),
}));
}
async load(data: WarehouseOrder[]): Promise<void> {
const batchSize = 100;
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
await warehouseDb.$executeRaw`
INSERT INTO orders_fact (
order_id, user_email, total_amount, item_count,
status, order_date, synced_at
)
VALUES ${batch
.map(
(o) => `(
${o.order_id}, '${o.user_email}', ${o.total_amount},
${o.item_count}, '${o.status}', '${o.order_date}',
'${o.synced_at}'
)`
)
.join(",")}
ON CONFLICT (order_id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status,
synced_at = EXCLUDED.synced_at
`;
}
}
}
// Run job
new OrdersETLJob().run();
Scheduling
// Schedule ETL jobs
import cron from "node-cron";
// Run every hour
cron.schedule("0 * * * *", async () => {
await new OrdersETLJob().run();
});
// Run every 15 minutes
cron.schedule("*/15 * * * *", async () => {
await syncUsersIncremental();
});
// Run nightly at 2 AM
cron.schedule("0 2 * * *", async () => {
await fullDataSync();
});
Error Handling & Recovery
async function syncWithErrorHandling() {
const checkpoint = await getCheckpoint();
let processedRecords = 0;
try {
const records = await fetchRecords(checkpoint);
for (const record of records) {
try {
await processRecord(record);
processedRecords++;
// Save checkpoint every 100 records
if (processedRecords % 100 === 0) {
await saveCheckpoint(record.id);
}
} catch (error) {
// Log error but continue
console.error(`Failed to process record ${record.id}:`, error);
await logFailedRecord(record.id, error);
}
}
await saveCheckpoint("completed");
} catch (error) {
// Critical failure - job will retry from checkpoint
console.error("Job failed:", error);
throw error;
}
}
Best Practices
- Incremental sync: Use watermarks, don't full-scan
- Idempotent operations: Safe to retry
- Batch processing: Process 100-1000 records at a time
- Checkpointing: Resume from failure point
- Retry with backoff: Handle transient failures
- Monitor metrics: Track job health
- Test thoroughly: Including failure scenarios
Output Checklist
- ETL job class created
- Watermark tracking implemented
- Incremental sync logic
- Idempotent upsert operations
- Retry logic with backoff
- Conflict resolution strategy
- Monitoring and metrics
- Error handling and recovery
- Job scheduling configured
- Testing including failure cases