Claude Code Plugins

Community-maintained marketplace

Feedback

customerio-reference-architecture

@jeremylongshore/claude-code-plugins-plus-skills
943
0

|

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name customerio-reference-architecture
description Implement Customer.io reference architecture. Use when designing integrations, planning architecture, or implementing enterprise patterns. Trigger with phrases like "customer.io architecture", "customer.io design", "customer.io enterprise", "customer.io integration pattern".
allowed-tools Read, Write, Edit, Bash(gh:*), Bash(curl:*)
version 1.0.0
license MIT
author Jeremy Longshore <jeremy@intentsolutions.io>

Customer.io Reference Architecture

Overview

Enterprise-grade reference architecture for Customer.io integration with proper separation of concerns, reliability, and scalability.

Architecture Diagram

                                    Customer.io
                                        |
                    +-------------------+-------------------+
                    |                   |                   |
              Track API            App API            Webhooks
                    |                   |                   |
                    v                   v                   v
            +-------+-------+   +-------+-------+   +-------+-------+
            |   Event Bus   |   |  Transactional |   | Webhook Handler|
            |   (Kafka)     |   |    Service     |   |   (Express)   |
            +-------+-------+   +-------+-------+   +-------+-------+
                    |                   |                   |
                    v                   v                   v
            +-------+-------+   +-------+-------+   +-------+-------+
            | CustomerIO    |   |    Email       |   |   Event       |
            | Worker        |   |    Templates   |   |   Processor   |
            +-------+-------+   +-------+-------+   +-------+-------+
                    |                   |                   |
                    +-------------------+-------------------+
                                        |
                                        v
                                +-------+-------+
                                |   Data Lake   |
                                |  (BigQuery)   |
                                +---------------+

Instructions

Step 1: Core Service Layer

// src/services/customerio/index.ts
import { TrackClient, APIClient, RegionUS } from '@customerio/track';
import { EventEmitter } from 'events';

export interface CustomerIOConfig {
  trackSiteId: string;
  trackApiKey: string;
  appApiKey: string;
  region: 'us' | 'eu';
  environment: 'development' | 'staging' | 'production';
}

export class CustomerIOService extends EventEmitter {
  private trackClient: TrackClient;
  private apiClient: APIClient;
  private config: CustomerIOConfig;

  constructor(config: CustomerIOConfig) {
    super();
    this.config = config;

    this.trackClient = new TrackClient(
      config.trackSiteId,
      config.trackApiKey,
      { region: config.region === 'eu' ? RegionEU : RegionUS }
    );

    this.apiClient = new APIClient(config.appApiKey, {
      region: config.region === 'eu' ? RegionEU : RegionUS
    });
  }

  // User management
  async identifyUser(userId: string, attributes: UserAttributes): Promise<void> {
    this.emit('identify:start', { userId, attributes });

    try {
      await this.trackClient.identify(userId, {
        ...attributes,
        _env: this.config.environment,
        _updated_at: Math.floor(Date.now() / 1000)
      });
      this.emit('identify:success', { userId });
    } catch (error) {
      this.emit('identify:error', { userId, error });
      throw error;
    }
  }

  // Event tracking
  async trackEvent(userId: string, event: EventPayload): Promise<void> {
    this.emit('track:start', { userId, event });

    try {
      await this.trackClient.track(userId, {
        name: event.name,
        data: {
          ...event.data,
          _env: this.config.environment
        }
      });
      this.emit('track:success', { userId, event: event.name });
    } catch (error) {
      this.emit('track:error', { userId, event: event.name, error });
      throw error;
    }
  }

  // Transactional messaging
  async sendTransactional(request: TransactionalRequest): Promise<void> {
    return this.apiClient.sendEmail(request);
  }
}

Step 2: Event Bus Integration

// src/services/customerio/event-bus.ts
import { Kafka, Producer, Consumer } from 'kafkajs';
import { CustomerIOService } from './index';

interface CustomerIOEvent {
  type: 'identify' | 'track' | 'transactional';
  userId: string;
  payload: any;
  timestamp: number;
  correlationId: string;
}

export class CustomerIOEventBus {
  private producer: Producer;
  private consumer: Consumer;
  private service: CustomerIOService;

  constructor(kafka: Kafka, service: CustomerIOService) {
    this.producer = kafka.producer();
    this.consumer = kafka.consumer({ groupId: 'customerio-worker' });
    this.service = service;
  }

  async start(): Promise<void> {
    await this.producer.connect();
    await this.consumer.connect();

    await this.consumer.subscribe({
      topics: ['customerio.identify', 'customerio.track', 'customerio.transactional']
    });

    await this.consumer.run({
      eachMessage: async ({ topic, message }) => {
        const event: CustomerIOEvent = JSON.parse(message.value!.toString());
        await this.processEvent(topic, event);
      }
    });
  }

  private async processEvent(topic: string, event: CustomerIOEvent): Promise<void> {
    const startTime = Date.now();

    try {
      switch (event.type) {
        case 'identify':
          await this.service.identifyUser(event.userId, event.payload);
          break;
        case 'track':
          await this.service.trackEvent(event.userId, event.payload);
          break;
        case 'transactional':
          await this.service.sendTransactional(event.payload);
          break;
      }

      // Emit success metrics
      await this.producer.send({
        topic: 'customerio.processed',
        messages: [{
          key: event.correlationId,
          value: JSON.stringify({
            ...event,
            status: 'success',
            processingTime: Date.now() - startTime
          })
        }]
      });
    } catch (error) {
      // Dead letter queue for failed events
      await this.producer.send({
        topic: 'customerio.dlq',
        messages: [{
          key: event.correlationId,
          value: JSON.stringify({
            ...event,
            status: 'failed',
            error: error.message,
            processingTime: Date.now() - startTime
          })
        }]
      });
    }
  }

  // Publish events to be processed
  async publish(event: CustomerIOEvent): Promise<void> {
    await this.producer.send({
      topic: `customerio.${event.type}`,
      messages: [{
        key: event.userId,
        value: JSON.stringify(event)
      }]
    });
  }
}

Step 3: Repository Pattern

// src/repositories/user-messaging.ts
import { CustomerIOService } from '../services/customerio';
import { UserRepository } from './user';

export interface MessagingPreferences {
  email: boolean;
  push: boolean;
  sms: boolean;
  inApp: boolean;
}

export class UserMessagingRepository {
  constructor(
    private cio: CustomerIOService,
    private users: UserRepository
  ) {}

  async syncUser(userId: string): Promise<void> {
    const user = await this.users.findById(userId);
    if (!user) throw new Error(`User ${userId} not found`);

    const preferences = await this.getPreferences(userId);

    await this.cio.identifyUser(userId, {
      email: user.email,
      first_name: user.firstName,
      last_name: user.lastName,
      created_at: Math.floor(user.createdAt.getTime() / 1000),
      plan: user.subscription?.plan || 'free',
      // Preferences
      email_opt_in: preferences.email,
      push_opt_in: preferences.push,
      sms_opt_in: preferences.sms
    });
  }

  async getPreferences(userId: string): Promise<MessagingPreferences> {
    // Load from your preferences store
    return {
      email: true,
      push: false,
      sms: false,
      inApp: true
    };
  }

  async updatePreferences(
    userId: string,
    preferences: Partial<MessagingPreferences>
  ): Promise<void> {
    // Update local store
    await this.savePreferences(userId, preferences);

    // Sync to Customer.io
    await this.cio.identifyUser(userId, {
      email_opt_in: preferences.email,
      push_opt_in: preferences.push,
      sms_opt_in: preferences.sms
    });
  }
}

Step 4: Webhook Handler

// src/webhooks/customerio.ts
import { Router } from 'express';
import { EventEmitter } from 'events';

export class CustomerIOWebhooks extends EventEmitter {
  private router: Router;
  private signingSecret: string;

  constructor(signingSecret: string) {
    super();
    this.signingSecret = signingSecret;
    this.router = Router();
    this.setupRoutes();
  }

  private setupRoutes(): void {
    this.router.post('/', async (req, res) => {
      // Verify signature
      if (!this.verifySignature(req)) {
        return res.status(401).send('Invalid signature');
      }

      // Process events
      const events = req.body.events || [];

      for (const event of events) {
        this.emit(event.metric, event);
        this.emit('*', event);
      }

      res.status(200).json({ received: events.length });
    });
  }

  getRouter(): Router {
    return this.router;
  }
}

// Usage
const webhooks = new CustomerIOWebhooks(process.env.WEBHOOK_SECRET!);

webhooks.on('email_delivered', (event) => {
  // Update delivery status
});

webhooks.on('email_bounced', async (event) => {
  // Handle bounce - suppress user
  await cio.suppress(event.data.customer_id);
});

webhooks.on('*', (event) => {
  // Stream all events to data warehouse
  await streamToDataWarehouse(event);
});

app.use('/webhooks/customerio', webhooks.getRouter());

Step 5: Infrastructure as Code

# terraform/customerio.tf
resource "google_secret_manager_secret" "customerio_site_id" {
  secret_id = "customerio-site-id"

  replication {
    auto {}
  }
}

resource "google_secret_manager_secret" "customerio_api_key" {
  secret_id = "customerio-api-key"

  replication {
    auto {}
  }
}

resource "google_cloud_run_service" "customerio_worker" {
  name     = "customerio-worker"
  location = var.region

  template {
    spec {
      containers {
        image = "gcr.io/${var.project}/customerio-worker:latest"

        env {
          name = "CUSTOMERIO_SITE_ID"
          value_from {
            secret_key_ref {
              name = google_secret_manager_secret.customerio_site_id.secret_id
              key  = "latest"
            }
          }
        }

        env {
          name = "CUSTOMERIO_API_KEY"
          value_from {
            secret_key_ref {
              name = google_secret_manager_secret.customerio_api_key.secret_id
              key  = "latest"
            }
          }
        }
      }
    }
  }
}

resource "google_pubsub_topic" "customerio_events" {
  name = "customerio-events"
}

resource "google_bigquery_dataset" "customerio" {
  dataset_id = "customerio_events"
  location   = var.region
}

resource "google_bigquery_table" "delivery_events" {
  dataset_id = google_bigquery_dataset.customerio.dataset_id
  table_id   = "delivery_events"

  schema = file("${path.module}/schemas/delivery_events.json")

  time_partitioning {
    type  = "DAY"
    field = "timestamp"
  }
}

Architecture Principles

  1. Separation of Concerns: Track API, App API, and Webhooks are handled by separate services
  2. Event-Driven: Use message queues for reliable async processing
  3. Idempotency: All operations can be safely retried
  4. Observability: Events are emitted for monitoring and debugging
  5. Infrastructure as Code: All resources defined in Terraform

Output

  • Core Customer.io service layer
  • Event bus integration (Kafka)
  • Repository pattern for user messaging
  • Webhook handler with signature verification
  • Terraform infrastructure code

Resources

Next Steps

After implementing architecture, proceed to customerio-multi-env-setup for multi-environment configuration.