Skip to content

2025-10-26

Building CRM Systems with Event-Driven Architecture

A practical guide to implementing customer relationship management using event sourcing, CQRS, and event-driven patterns for marketing automation and consent management

Abstract

Traditional CRM systems store customer state directly: a record per customer, mutated in place with each interaction. That model breaks when the product needs real-time personalization, GDPR-compliant audit trails, and multi-channel orchestration at the same time, because the current-state row has no memory of how it got there. Event-driven CRM inverts the model: every interaction is captured as an immutable event, and any view (profile, consent ledger, channel history) is a projection over that event stream. This post covers the architecture of an event-driven CRM on AWS, the projections for personalization and consent, the cross-channel orchestration layer, and the trade-offs (storage cost, eventual-consistency budgets, replay windows) that the pattern introduces.

The Event-Driven CRM Landscape

Most CRM systems start simple: a database with customers, contacts, and interactions. This works until you need to answer questions like “What marketing emails has this customer received?” or “When did they consent to SMS?” or “Why did we send them this notification?”

I’ve worked with teams migrating from traditional CRM architectures to event-driven systems, and the shift requires rethinking how you model customer data. Instead of updating a customer record when preferences change, you emit a CustomerPreferencesUpdated event. Instead of deleting consent records for GDPR, you emit a ConsentRevoked event.

The fundamental difference: your database becomes a projection of events, not the source of truth.

Why Event-Driven Architecture for CRM?

The CRM domain has specific characteristics that make event-driven architecture particularly valuable:

  1. Audit Requirements: GDPR mandates knowing exactly when consent was granted and for what purpose
  2. Multi-Channel Complexity: Customers interact across email, SMS, push, in-app, and each channel has different rules
  3. Real-Time Personalization: Marketing automation needs to react immediately to customer behavior
  4. Data Privacy: The “right to be forgotten” is easier when you can replay events with redaction
  5. Eventual Consistency: Marketing campaigns can tolerate slight delays if it means better scalability

Here’s a realistic scenario: A customer browses your product page, abandons their cart, opts into SMS notifications, then completes purchase via email link. In a traditional CRM, you’d update the customer record multiple times, losing the sequence of events. In an event-driven system, you have the complete story.

System Architecture Overview

Let me show you how the core components fit together:

Delivery Channels

Domain Services

Query Side - CQRS Read

Command Side - CQRS Write

Event Infrastructure

Customer Touchpoints

Check Consent

Check Preferences

Web App

Mobile App

Email Client

Support System

API Gateway

Event Bus

EventBridge/Kafka

Event Store

DynamoDB/EventStoreDB

Command Handlers

Business Rules

Event Emitters

Read Models

Customer 360

Consent DB

Preferences DB

Event Processors

Consent Service

Campaign Service

Channel Orchestrator

Email Provider

SendGrid/SES

SMS Provider

Twilio

Push Service

FCM/APNs

This architecture separates concerns effectively:

  • Write path: Commands validate business rules and emit events
  • Read path: Projections materialize views optimized for queries
  • Services: React to events and orchestrate workflows
  • Channels: Handle delivery with retry logic and failure tracking

Practical Implementation Guide

Before diving deep into components, let me show you how to get started with a real implementation.

Step-by-Step Getting Started

Step 1: Define Your Core Events

Start simple. Don’t try to model everything at once:

// Start with just customer creation and consent
const coreEvents = [
  'CustomerCreated',
  'ConsentGranted',
  'ConsentRevoked',
  'EmailSent'
];

Step 2: Set Up Event Store

Use what you have. DynamoDB works well for AWS shops, EventStoreDB for event sourcing purists:

// Simple DynamoDB event store
const eventStoreConfig = {
  tableName: 'customer-events',
  partitionKey: 'customerId',  // PK: CUSTOMER#{id}
  sortKey: 'timestamp_eventId',  // SK: EVENT#{timestamp}#{eventId}
  ttl: 7 * 365 * 86400  // 7 years retention
};

Step 3: Create Command Handlers

Business logic lives here:

// One handler per aggregate
class CustomerCommandHandler {
  async execute(command: Command): Promise<void> {
    // 1. Load events
    // 2. Rebuild state
    // 3. Validate business rules
    // 4. Emit new events
  }
}

Step 4: Build Projections

Start with one read model - the customer view:

// Single projection for customer queries
class CustomerProjection {
  async handleEvent(event: CustomerEvent): Promise<void> {
    switch (event.eventType) {
      case 'CustomerCreated':
        await this.createCustomer(event);
        break;
      case 'ConsentGranted':
        await this.updateConsent(event);
        break;
    }
  }
}

Step 5: Add Campaign Triggers

Start with one simple campaign - welcome email:

const welcomeCampaign = {
  trigger: 'CustomerCreated',
  actions: [
    { type: 'send-email', template: 'welcome' }
  ]
};

Step 6: Integrate Channels

Use existing providers. Don’t build email infrastructure:

// Wrap your existing email provider
class EmailChannel {
  constructor(private sendGrid: SendGridClient) {}

  async send(customerId: string, template: string): Promise<void> {
    // Get customer data from projection
    // Send via provider
    // Emit EmailSent event
  }
}

Complete End-to-End Example

Here’s a full customer journey from signup to purchase confirmation:

EmailChannelCampaignEngineProjectionsEventBusEventStoreCommandHandlerAPICustomerEmailChannelCampaignEngineProjectionsEventBusEventStoreCommandHandlerAPICustomerCustomer RegistrationProduct BrowsingCart AbandonmentCustomer Leaves SitePurchase FlowPOST /registerRegisterCustomerCommandCustomerCreated EventPublish EventCustomerCreatedCreate Customer RecordCustomerCreatedSend Welcome EmailEmailSent EventWelcome Email DeliveredGET /products/123ProductViewed EventPublish EventUpdate Customer ActivityPOST /cart/addItemAddedToCart EventPublish EventDetect InactivityWait 1 HourCartAbandoned EventSend ReminderCart Reminder EmailPOST /ordersPlaceOrderCommandOrderPlaced EventPublish EventPaymentInitiated EventProcess PaymentPaymentSucceeded EventPublish EventPaymentSucceededOrderConfirmed EventSend ConfirmationOrder Confirmation EmailUpdate Order StatusCustomer Becomes Buyer

Complete code for this flow:

// 1. Customer Registration
async function handleRegistration(request: RegistrationRequest): Promise<string> {
  const customerId = crypto.randomUUID();

  // Emit CustomerCreated
  await eventStore.appendEvent({
    eventId: crypto.randomUUID(),
    customerId,
    timestamp: new Date().toISOString(),
    eventType: 'CustomerCreated',
    email: request.email,
    firstName: request.firstName,
    source: 'web'
  });

  // If consented, emit ConsentGranted
  if (request.marketingConsent) {
    await eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId,
      timestamp: new Date().toISOString(),
      eventType: 'ConsentGranted',
      purpose: 'marketing',
      channel: 'email'
    });
  }

  return customerId;
}

// 2. Projection Updates Customer Record
async function handleCustomerCreated(event: CustomerCreated): Promise<void> {
  await customerDB.putItem({
    customerId: event.customerId,
    email: event.email,
    firstName: event.firstName,
    status: 'active',
    createdAt: event.timestamp
  });
}

// 3. Welcome Campaign Triggers
async function handleCustomerCreatedCampaign(event: CustomerCreated): Promise<void> {
  // Check consent
  const hasConsent = await consentService.hasActiveConsent(
    event.customerId,
    'marketing',
    'email'
  );

  if (hasConsent) {
    await emailChannel.send({
      customerId: event.customerId,
      templateId: 'welcome-email',
      data: { firstName: event.firstName }
    });
  }
}

// 4. Product Browsing Tracked
async function handleProductView(customerId: string, productId: string): Promise<void> {
  await eventStore.appendEvent({
    eventId: crypto.randomUUID(),
    customerId,
    timestamp: new Date().toISOString(),
    eventType: 'ProductViewed',
    productId,
    sessionId: getCurrentSessionId()
  });
}

// 5. Cart Abandonment Detection (runs periodically)
async function detectAbandonedCarts(): Promise<void> {
  const abandonedCarts = await findCartsWithNoActivity(60); // 60 minutes

  for (const cart of abandonedCarts) {
    await eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: cart.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'CartAbandoned',
      cartId: cart.cartId,
      items: cart.items,
      totalValue: cart.total
    });
  }
}

// 6. Cart Abandonment Campaign
async function handleCartAbandoned(event: CartAbandoned): Promise<void> {
  // Wait 1 hour before sending
  await scheduleAction({
    executeAt: Date.now() + 3600000,
    action: async () => {
      await emailChannel.send({
        customerId: event.customerId,
        templateId: 'cart-reminder',
        data: {
          cartItems: event.items,
          cartTotal: event.totalValue
        }
      });
    }
  });
}

// 7. Order Placement
async function handleOrderPlacement(request: PlaceOrderRequest): Promise<string> {
  const orderId = crypto.randomUUID();

  // Emit OrderPlaced
  await eventStore.appendEvent({
    eventId: crypto.randomUUID(),
    customerId: request.customerId,
    timestamp: new Date().toISOString(),
    eventType: 'OrderPlaced',
    orderId,
    items: request.items,
    total: request.total
  });

  // Process payment
  const paymentResult = await paymentProvider.charge({
    amount: request.total,
    customerId: request.customerId
  });

  if (paymentResult.success) {
    await eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: request.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'PaymentSucceeded',
      orderId,
      amount: request.total,
      transactionId: paymentResult.transactionId
    });
  }

  return orderId;
}

// 8. Order Confirmation Campaign
async function handlePaymentSucceeded(event: PaymentSucceeded): Promise<void> {
  // Emit OrderConfirmed
  await eventStore.appendEvent({
    eventId: crypto.randomUUID(),
    customerId: event.customerId,
    timestamp: new Date().toISOString(),
    eventType: 'OrderConfirmed',
    orderId: event.orderId,
    confirmationNumber: generateConfirmationNumber()
  });

  // Send confirmation email
  await emailChannel.send({
    customerId: event.customerId,
    templateId: 'order-confirmation',
    data: {
      orderId: event.orderId,
      amount: event.amount
    }
  });
}

// 9. Projection Updates - Customer Now a Buyer
async function handleFirstPurchase(event: PaymentSucceeded): Promise<void> {
  const purchases = await getPurchaseCount(event.customerId);

  if (purchases === 1) {
    // First purchase - update customer segment
    await eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: event.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'CustomerSegmentAdded',
      segmentId: 'buyers',
      segmentName: 'Customers Who Purchased'
    });
  }
}

This example shows every event, every projection update, and every campaign trigger. Start here, then expand to more complex workflows.

Customer Lifecycle Event Flow

Here’s the complete picture of customer events over time:

Retention

Fulfillment

Purchase

Activation

Acquisition

Yes

No

Success

Failed

Lifecycle Management

PreferencesUpdated

SegmentationRecalculated

ConsentRevoked

CampaignsPaused

CustomerDeactivated

ConsentRevoked

CustomerCreated

EmailVerificationRequested

EmailVerified

ConsentGranted

ProductViewed

ItemAddedToCart

Conversion?

OrderPlaced

CartAbandoned

ReminderSent

PaymentInitiated

Payment?

PaymentSucceeded

PaymentFailed

RetryScheduled

OrderConfirmed

OrderShipped

OrderDelivered

ReviewRequested

ReplenishmentCampaign

ReviewSubmitted

ReorderPlaced

Component Deep Dive

Event Sourcing for Customer Data

The core pattern: instead of storing current state, you store the sequence of events that led to that state. Here’s a practical implementation:

// Event definitions - the source of truth
interface CustomerEvent {
  eventId: string;
  customerId: string;
  timestamp: string;
  eventType: string;
}

interface CustomerCreated extends CustomerEvent {
  eventType: 'CustomerCreated';
  email: string;
  source: 'web' | 'mobile' | 'api';
}

interface ConsentGranted extends CustomerEvent {
  eventType: 'ConsentGranted';
  purpose: 'marketing' | 'analytics' | 'essential';
  channel: 'email' | 'sms' | 'push';
  ipAddress: string;
  userAgent: string;
}

interface ConsentRevoked extends CustomerEvent {
  eventType: 'ConsentRevoked';
  purpose: 'marketing' | 'analytics' | 'essential';
  channel: 'email' | 'sms' | 'push';
  reason?: string;
}

interface PreferencesUpdated extends CustomerEvent {
  eventType: 'PreferencesUpdated';
  preferences: {
    emailFrequency?: 'daily' | 'weekly' | 'never';
    categories?: string[];
    timezone?: string;
  };
}

The event store becomes your single source of truth:

class EventStore {
  constructor(
    private dynamoDB: DynamoDBClient,
    private eventBus: EventBridge
  ) {}

  async appendEvent(event: CustomerEvent): Promise<void> {
    // Store event with optimistic locking
    await this.dynamoDB.putItem({
      TableName: 'customer-events',
      Item: {
        pk: { S: `CUSTOMER#${event.customerId}` },
        sk: { S: `EVENT#${event.timestamp}#${event.eventId}` },
        eventType: { S: event.eventType },
        payload: { S: JSON.stringify(event) },
        version: { N: '1' },
        ttl: { N: String(Math.floor(Date.now() / 1000) + 7 * 365 * 86400) }
      },
      ConditionExpression: 'attribute_not_exists(pk)'
    });

    // Publish to event bus for consumers
    await this.eventBus.putEvents({
      Entries: [{
        Source: 'crm.customer',
        DetailType: event.eventType,
        Detail: JSON.stringify(event),
        EventBusName: 'customer-events'
      }]
    });
  }

  async getCustomerEvents(
    customerId: string,
    fromTimestamp?: string
  ): Promise<CustomerEvent[]> {
    const params = {
      TableName: 'customer-events',
      KeyConditionExpression: 'pk = :pk AND sk >= :sk',
      ExpressionAttributeValues: {
        ':pk': { S: `CUSTOMER#${customerId}` },
        ':sk': { S: fromTimestamp ? `EVENT#${fromTimestamp}` : 'EVENT#' }
      }
    };

    const result = await this.dynamoDB.query(params);
    return result.Items?.map(item =>
      JSON.parse(item.payload.S!)
    ) ?? [];
  }
}

Key gotcha: Event versioning becomes critical. When your event schema evolves, you need upcasters:

interface EventUpcaster {
  fromVersion: number;
  toVersion: number;
  upcast(event: any): any;
}

// Example: Adding GDPR context to consent events
const consentEventUpcaster: EventUpcaster = {
  fromVersion: 1,
  toVersion: 2,
  upcast(event: any) {
    if (event.eventType === 'ConsentGranted' && !event.gdprContext) {
      return {
        ...event,
        version: 2,
        gdprContext: {
          legalBasis: 'consent',
          retentionPeriod: '2years',
          dataController: 'company-name'
        }
      };
    }
    return event;
  }
};

CQRS: Separating Reads and Writes

CQRS (Command Query Responsibility Segregation) means your write model and read model are completely different. In CRM context, this is powerful because marketing queries need different data structures than consent validation.

Write Model - Optimized for business rules:

class CustomerCommandHandler {
  constructor(
    private eventStore: EventStore,
    private validator: BusinessRuleValidator
  ) {}

  async grantConsent(command: GrantConsentCommand): Promise<void> {
    // Load event history to validate
    const events = await this.eventStore.getCustomerEvents(command.customerId);
    const customer = this.rehydrateCustomer(events);

    // Business rule: Can't grant consent if customer is deleted
    if (customer.isDeleted) {
      throw new Error('Cannot grant consent for deleted customer');
    }

    // Business rule: Can't grant same consent twice without revocation
    const existingConsent = customer.consents.find(
      c => c.purpose === command.purpose &&
           c.channel === command.channel &&
           c.status === 'active'
    );

    if (existingConsent) {
      throw new Error('Consent already exists');
    }

    // Emit new event
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: command.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'ConsentGranted',
      purpose: command.purpose,
      channel: command.channel,
      ipAddress: command.ipAddress,
      userAgent: command.userAgent
    });
  }

  private rehydrateCustomer(events: CustomerEvent[]): Customer {
    // Rebuild state from events - this is event sourcing
    return events.reduce((customer, event) => {
      switch (event.eventType) {
        case 'CustomerCreated':
          return { ...customer, email: event.email };
        case 'ConsentGranted':
          return {
            ...customer,
            consents: [...customer.consents, {
              purpose: event.purpose,
              channel: event.channel,
              grantedAt: event.timestamp,
              status: 'active'
            }]
          };
        case 'ConsentRevoked':
          return {
            ...customer,
            consents: customer.consents.map(c =>
              c.purpose === event.purpose && c.channel === event.channel
                ? { ...c, status: 'revoked', revokedAt: event.timestamp }
                : c
            )
          };
        default:
          return customer;
      }
    }, { consents: [] } as Customer);
  }
}

Read Model - Optimized for queries:

// Projection builder - runs async from event bus
class ConsentProjectionBuilder {
  constructor(private readDB: DynamoDBClient) {}

  async handleConsentGranted(event: ConsentGranted): Promise<void> {
    // Materialized view optimized for "can we contact this customer?"
    await this.readDB.putItem({
      TableName: 'customer-consents',
      Item: {
        pk: { S: `CUSTOMER#${event.customerId}` },
        sk: { S: `CONSENT#${event.purpose}#${event.channel}` },
        status: { S: 'active' },
        grantedAt: { S: event.timestamp },
        expiresAt: { S: this.calculateExpiry(event.timestamp) },
        ipAddress: { S: event.ipAddress },
        // GSI for querying by purpose
        gsi1pk: { S: `PURPOSE#${event.purpose}` },
        gsi1sk: { S: event.customerId }
      }
    });
  }

  async handleConsentRevoked(event: ConsentRevoked): Promise<void> {
    await this.readDB.updateItem({
      TableName: 'customer-consents',
      Key: {
        pk: { S: `CUSTOMER#${event.customerId}` },
        sk: { S: `CONSENT#${event.purpose}#${event.channel}` }
      },
      UpdateExpression: 'SET #status = :revoked, revokedAt = :timestamp',
      ExpressionAttributeNames: { '#status': 'status' },
      ExpressionAttributeValues: {
        ':revoked': { S: 'revoked' },
        ':timestamp': { S: event.timestamp }
      }
    });
  }

  private calculateExpiry(grantedAt: string): string {
    // GDPR requires re-consent after reasonable period
    const granted = new Date(grantedAt);
    granted.setFullYear(granted.getFullYear() + 2);
    return granted.toISOString();
  }
}

The trade-off: eventual consistency. When a customer revokes consent, there’s a delay before the read model updates. For CRM, this is usually acceptable - if a customer unsubscribes, a few seconds delay before campaigns stop is reasonable.

Complete CRUD Operations

Understanding how basic operations translate to events is fundamental. Let me walk through the complete lifecycle of customer data management.

Customer Creation Flow

When a new customer signs up, you’re not just inserting a row - you’re starting an event stream:

interface CustomerRegistrationCommand {
  email: string;
  firstName: string;
  lastName: string;
  phone?: string;
  source: 'web' | 'mobile' | 'api' | 'import';
  marketingConsent: boolean;
  termsAccepted: boolean;
  ipAddress: string;
  userAgent: string;
}

class CustomerRegistrationHandler {
  constructor(
    private eventStore: EventStore,
    private validator: EmailValidator
  ) {}

  async registerCustomer(
    command: CustomerRegistrationCommand
  ): Promise<string> {
    // Step 1: Validate before creating any events
    await this.validateRegistration(command);

    const customerId = crypto.randomUUID();
    const timestamp = new Date().toISOString();

    // Step 2: Emit CustomerCreated event
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId,
      timestamp,
      eventType: 'CustomerCreated',
      email: command.email,
      firstName: command.firstName,
      lastName: command.lastName,
      phone: command.phone,
      source: command.source,
      ipAddress: command.ipAddress,
      userAgent: command.userAgent
    });

    // Step 3: If they consented to marketing, emit ConsentGranted
    if (command.marketingConsent) {
      await this.eventStore.appendEvent({
        eventId: crypto.randomUUID(),
        customerId,
        timestamp,
        eventType: 'ConsentGranted',
        purpose: 'marketing',
        channel: 'email',
        ipAddress: command.ipAddress,
        userAgent: command.userAgent,
        consentMethod: 'registration-checkbox'
      });
    }

    // Step 4: Emit EmailVerificationRequested
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId,
      timestamp,
      eventType: 'EmailVerificationRequested',
      email: command.email,
      verificationToken: crypto.randomUUID()
    });

    return customerId;
  }

  private async validateRegistration(
    command: CustomerRegistrationCommand
  ): Promise<void> {
    // Email format validation
    if (!this.validator.isValid(command.email)) {
      throw new Error('Invalid email format');
    }

    // Check if customer already exists
    const existing = await this.customerQuery.findByEmail(command.email);
    if (existing) {
      throw new Error('Customer already exists');
    }

    // Terms acceptance is required
    if (!command.termsAccepted) {
      throw new Error('Terms must be accepted');
    }
  }
}

Projection building from these events:

class CustomerProjectionBuilder {
  async handleCustomerCreated(event: CustomerCreated): Promise<void> {
    // Create initial customer record in read database
    await this.readDB.putItem({
      TableName: 'customers',
      Item: {
        customerId: { S: event.customerId },
        email: { S: event.email },
        firstName: { S: event.firstName },
        lastName: { S: event.lastName },
        phone: { S: event.phone || '' },
        source: { S: event.source },
        status: { S: 'pending-verification' },
        createdAt: { S: event.timestamp },
        updatedAt: { S: event.timestamp },
        // GSI for email lookups
        emailLowercase: { S: event.email.toLowerCase() }
      }
    });
  }

  async handleEmailVerificationRequested(
    event: EmailVerificationRequested
  ): Promise<void> {
    // Trigger welcome email with verification link
    await this.campaignService.triggerCampaign({
      campaignId: 'welcome-verification',
      customerId: event.customerId,
      data: {
        verificationToken: event.verificationToken,
        email: event.email
      }
    });
  }
}

Key gotcha: Registration flow needs to handle failures gracefully. If consent event fails to write but customer creation succeeds, you have an inconsistent state. Use event batching or saga patterns for atomic multi-event operations.

Customer Update Operations

Updates are where event sourcing shines - you have complete history of what changed and when:

interface UpdateCustomerEmailCommand {
  customerId: string;
  newEmail: string;
  ipAddress: string;
  userAgent: string;
}

interface UpdateCustomerProfileCommand {
  customerId: string;
  updates: {
    firstName?: string;
    lastName?: string;
    phone?: string;
    dateOfBirth?: string;
    address?: Address;
  };
}

class CustomerUpdateHandler {
  async updateEmail(command: UpdateCustomerEmailCommand): Promise<void> {
    // Load current state from events
    const events = await this.eventStore.getCustomerEvents(command.customerId);
    const customer = this.rehydrateCustomer(events);

    // Business rule: Can't update email for deleted customer
    if (customer.status === 'deleted') {
      throw new Error('Cannot update deleted customer');
    }

    // Business rule: Email must be different
    if (customer.email === command.newEmail) {
      throw new Error('Email unchanged');
    }

    // Emit email change event
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: command.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'CustomerEmailUpdated',
      oldEmail: customer.email,
      newEmail: command.newEmail,
      ipAddress: command.ipAddress,
      userAgent: command.userAgent,
      requiresVerification: true
    });

    // Trigger verification for new email
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: command.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'EmailVerificationRequested',
      email: command.newEmail,
      verificationToken: crypto.randomUUID()
    });
  }

  async updateProfile(command: UpdateCustomerProfileCommand): Promise<void> {
    const events = await this.eventStore.getCustomerEvents(command.customerId);
    const customer = this.rehydrateCustomer(events);

    if (customer.status === 'deleted') {
      throw new Error('Cannot update deleted customer');
    }

    // Emit specific events for each type of update
    const timestamp = new Date().toISOString();

    if (command.updates.firstName || command.updates.lastName) {
      await this.eventStore.appendEvent({
        eventId: crypto.randomUUID(),
        customerId: command.customerId,
        timestamp,
        eventType: 'CustomerNameUpdated',
        oldFirstName: customer.firstName,
        oldLastName: customer.lastName,
        newFirstName: command.updates.firstName || customer.firstName,
        newLastName: command.updates.lastName || customer.lastName
      });
    }

    if (command.updates.phone) {
      await this.eventStore.appendEvent({
        eventId: crypto.randomUUID(),
        customerId: command.customerId,
        timestamp,
        eventType: 'CustomerPhoneUpdated',
        oldPhone: customer.phone,
        newPhone: command.updates.phone
      });
    }

    if (command.updates.address) {
      await this.eventStore.appendEvent({
        eventId: crypto.randomUUID(),
        customerId: command.customerId,
        timestamp,
        eventType: 'CustomerAddressUpdated',
        oldAddress: customer.address,
        newAddress: command.updates.address
      });
    }
  }
}

Projection updates handle incremental changes:

class CustomerProjectionBuilder {
  async handleCustomerEmailUpdated(
    event: CustomerEmailUpdated
  ): Promise<void> {
    await this.readDB.updateItem({
      TableName: 'customers',
      Key: { customerId: { S: event.customerId } },
      UpdateExpression:
        'SET email = :newEmail, emailLowercase = :emailLower, ' +
        'emailVerified = :verified, updatedAt = :timestamp',
      ExpressionAttributeValues: {
        ':newEmail': { S: event.newEmail },
        ':emailLower': { S: event.newEmail.toLowerCase() },
        ':verified': { BOOL: false },
        ':timestamp': { S: event.timestamp }
      }
    });

    // Audit trail projection for compliance
    await this.auditDB.putItem({
      TableName: 'customer-audit-trail',
      Item: {
        customerId: { S: event.customerId },
        timestamp: { S: event.timestamp },
        eventType: { S: 'EmailUpdated' },
        oldValue: { S: event.oldEmail },
        newValue: { S: event.newEmail },
        ipAddress: { S: event.ipAddress },
        userAgent: { S: event.userAgent }
      }
    });
  }

  async handleCustomerAddressUpdated(
    event: CustomerAddressUpdated
  ): Promise<void> {
    await this.readDB.updateItem({
      TableName: 'customers',
      Key: { customerId: { S: event.customerId } },
      UpdateExpression: 'SET address = :address, updatedAt = :timestamp',
      ExpressionAttributeValues: {
        ':address': { S: JSON.stringify(event.newAddress) },
        ':timestamp': { S: event.timestamp }
      }
    });
  }
}

Customer Deletion and Deactivation

This is where event sourcing differs significantly from traditional systems:

interface DeactivateCustomerCommand {
  customerId: string;
  reason: 'customer-request' | 'fraud' | 'terms-violation' | 'other';
  notes?: string;
}

interface DeleteCustomerDataCommand {
  customerId: string;
  reason: 'gdpr-request' | 'data-retention-policy';
  deletionType: 'soft' | 'hard' | 'anonymize';
}

class CustomerDeletionHandler {
  // Soft delete - customer account is deactivated but data retained
  async deactivateCustomer(
    command: DeactivateCustomerCommand
  ): Promise<void> {
    const events = await this.eventStore.getCustomerEvents(command.customerId);
    const customer = this.rehydrateCustomer(events);

    if (customer.status === 'deleted') {
      throw new Error('Customer already deleted');
    }

    // Emit deactivation event
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: command.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'CustomerDeactivated',
      reason: command.reason,
      notes: command.notes,
      previousStatus: customer.status
    });

    // Automatically revoke all active marketing consents
    const activeConsents = customer.consents.filter(c => c.status === 'active');

    for (const consent of activeConsents) {
      await this.eventStore.appendEvent({
        eventId: crypto.randomUUID(),
        customerId: command.customerId,
        timestamp: new Date().toISOString(),
        eventType: 'ConsentRevoked',
        purpose: consent.purpose,
        channel: consent.channel,
        reason: 'account-deactivated'
      });
    }
  }

  // GDPR deletion - different from deactivation
  async deleteCustomerData(
    command: DeleteCustomerDataCommand
  ): Promise<void> {
    const timestamp = new Date().toISOString();

    // Emit deletion request event
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: command.customerId,
      timestamp,
      eventType: 'CustomerDataDeletionRequested',
      reason: command.reason,
      deletionType: command.deletionType
    });

    if (command.deletionType === 'anonymize') {
      // Anonymize PII in all events
      await this.gdprService.anonymizeCustomerEvents(command.customerId);
    } else if (command.deletionType === 'hard') {
      // Actually delete events (rare, only for specific legal requirements)
      await this.gdprService.hardDeleteCustomerEvents(command.customerId);
    }

    // Mark as deleted in projections
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: command.customerId,
      timestamp,
      eventType: 'CustomerDataDeleted',
      deletionType: command.deletionType,
      completedAt: timestamp
    });
  }
}

Impact on active campaigns:

class CampaignService {
  async handleCustomerDeactivated(
    event: CustomerDeactivated
  ): Promise<void> {
    // Cancel all scheduled campaigns for this customer
    const scheduledCampaigns = await this.getScheduledCampaigns(
      event.customerId
    );

    for (const campaign of scheduledCampaigns) {
      await this.cancelCampaign(campaign.id, 'customer-deactivated');
    }

    // Remove from all segments
    await this.segmentService.removeFromAllSegments(event.customerId);
  }

  async handleCustomerDataDeleted(
    event: CustomerDataDeleted
  ): Promise<void> {
    // Purge customer from all systems
    await this.purgeFromCampaignQueues(event.customerId);
    await this.purgeFromSegments(event.customerId);
    await this.purgeFromRecommendations(event.customerId);

    // Record compliance completion
    await this.complianceLog.recordDeletion({
      customerId: event.customerId,
      deletionType: event.deletionType,
      completedAt: event.timestamp
    });
  }
}

Key difference: Deactivation is reversible and retains data for analytics. GDPR deletion is permanent and requires careful handling of related data across all systems.

Marketing Automation with Event Triggers

Marketing automation becomes a series of event processors watching for trigger conditions:

interface CampaignTrigger {
  triggerId: string;
  campaignId: string;
  eventPattern: {
    eventType: string;
    conditions?: Record<string, any>;
  };
  actions: CampaignAction[];
}

interface CampaignAction {
  type: 'send-email' | 'send-sms' | 'add-to-segment' | 'wait';
  config: any;
}

class CampaignOrchestrator {
  constructor(
    private triggers: CampaignTrigger[],
    private consentService: ConsentService,
    private channelOrchestrator: ChannelOrchestrator
  ) {}

  async handleEvent(event: CustomerEvent): Promise<void> {
    // Find matching triggers
    const matchingTriggers = this.triggers.filter(trigger =>
      this.eventMatches(event, trigger.eventPattern)
    );

    for (const trigger of matchingTriggers) {
      await this.executeCampaign(event.customerId, trigger);
    }
  }

  private async executeCampaign(
    customerId: string,
    trigger: CampaignTrigger
  ): Promise<void> {
    // Check consent before any outbound communication
    const hasConsent = await this.consentService.hasActiveConsent(
      customerId,
      'marketing',
      'email' // Would derive from action type
    );

    if (!hasConsent) {
      console.log(`Skipping campaign ${trigger.campaignId} - no consent`);
      return;
    }

    // Execute actions in sequence
    for (const action of trigger.actions) {
      await this.executeAction(customerId, action, trigger.campaignId);
    }
  }

  private async executeAction(
    customerId: string,
    action: CampaignAction,
    campaignId: string
  ): Promise<void> {
    switch (action.type) {
      case 'send-email':
        await this.channelOrchestrator.sendEmail({
          customerId,
          campaignId,
          templateId: action.config.templateId,
          // Idempotency key to prevent duplicate sends
          idempotencyKey: `${campaignId}-${customerId}-${Date.now()}`
        });
        break;

      case 'wait':
        // Implement as scheduled event, not blocking wait
        await this.scheduleDelayedAction(
          customerId,
          campaignId,
          action.config.duration
        );
        break;

      case 'add-to-segment':
        await this.eventStore.appendEvent({
          eventId: crypto.randomUUID(),
          customerId,
          timestamp: new Date().toISOString(),
          eventType: 'CustomerSegmentAdded',
          segmentId: action.config.segmentId,
          source: `campaign:${campaignId}`
        });
        break;
    }
  }

  private eventMatches(
    event: CustomerEvent,
    pattern: CampaignTrigger['eventPattern']
  ): boolean {
    if (event.eventType !== pattern.eventType) return false;

    if (!pattern.conditions) return true;

    // Simple condition matching - production would use JSONPath or similar
    return Object.entries(pattern.conditions).every(([key, value]) =>
      (event as any)[key] === value
    );
  }
}

Real-world example: Abandoned cart campaign

// Trigger configuration
const abandonedCartTrigger: CampaignTrigger = {
  triggerId: 'abandoned-cart-v2',
  campaignId: 'abandoned-cart-email',
  eventPattern: {
    eventType: 'CartAbandoned',
    conditions: {
      cartValue: { $gte: 50 } // Only for carts over $50
    }
  },
  actions: [
    {
      type: 'wait',
      config: { duration: '1hour' }
    },
    {
      type: 'send-email',
      config: {
        templateId: 'abandoned-cart-reminder',
        // Dynamic content would be injected
        personalization: ['cartItems', 'discountCode']
      }
    },
    {
      type: 'wait',
      config: { duration: '24hours' }
    },
    {
      type: 'send-email',
      config: {
        templateId: 'abandoned-cart-final-offer',
        personalization: ['cartItems', 'largerDiscountCode']
      }
    }
  ]
};

Critical gotcha: Idempotency. Events might be processed multiple times due to retries. Every action needs an idempotency key:

class EmailChannelHandler {
  private sentMessages = new Set<string>();

  async sendEmail(request: SendEmailRequest): Promise<void> {
    // Check if already sent using idempotency key
    const exists = await this.messageStore.exists(request.idempotencyKey);

    if (exists) {
      console.log(`Email already sent: ${request.idempotencyKey}`);
      return;
    }

    // Send via provider
    const result = await this.emailProvider.send({
      to: request.recipientEmail,
      template: request.templateId,
      data: request.personalization
    });

    // Record send event
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: request.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'EmailSent',
      campaignId: request.campaignId,
      templateId: request.templateId,
      messageId: result.messageId,
      idempotencyKey: request.idempotencyKey
    });
  }
}

Channel Orchestration and Preference Management

Different customers want different channels at different times. Event-driven architecture makes preference management straightforward:

class ChannelOrchestrator {
  constructor(
    private preferenceStore: PreferenceProjection,
    private channels: Map<string, ChannelHandler>
  ) {}

  async determineChannel(
    customerId: string,
    messageType: string
  ): Promise<string[]> {
    // Get customer preferences from read model
    const prefs = await this.preferenceStore.getPreferences(customerId);

    // Business logic: Choose channels based on preferences and message type
    const availableChannels: string[] = [];

    if (prefs.emailEnabled && this.shouldUseEmail(messageType, prefs)) {
      availableChannels.push('email');
    }

    if (prefs.smsEnabled && this.shouldUseSMS(messageType, prefs)) {
      availableChannels.push('sms');
    }

    if (prefs.pushEnabled && this.shouldUsePush(messageType, prefs)) {
      availableChannels.push('push');
    }

    // Fallback strategy if no preferences set
    if (availableChannels.length === 0) {
      return this.getDefaultChannels(messageType);
    }

    return availableChannels;
  }

  private shouldUseEmail(
    messageType: string,
    prefs: CustomerPreferences
  ): boolean {
    // Transactional emails always go through
    if (messageType === 'transactional') return true;

    // Marketing emails respect frequency preference
    if (messageType === 'marketing') {
      const lastEmail = prefs.lastEmailSent;
      const frequency = prefs.emailFrequency || 'weekly';

      if (!lastEmail) return true;

      const hoursSinceLastEmail =
        (Date.now() - new Date(lastEmail).getTime()) / (1000 * 60 * 60);

      switch (frequency) {
        case 'daily': return hoursSinceLastEmail >= 24;
        case 'weekly': return hoursSinceLastEmail >= 168;
        case 'never': return false;
        default: return true;
      }
    }

    return true;
  }

  private shouldUseSMS(
    messageType: string,
    prefs: CustomerPreferences
  ): boolean {
    // SMS is expensive - use sparingly
    // Only for high-value transactional or urgent marketing
    return messageType === 'transactional' ||
           (messageType === 'urgent-marketing' && prefs.smsForPromotions);
  }

  private shouldUsePush(
    messageType: string,
    prefs: CustomerPreferences
  ): boolean {
    // Push has low cost but can be ignored
    // Good for time-sensitive content
    const lastPush = prefs.lastPushSent;

    // Don't spam - at most one push per hour
    if (lastPush) {
      const hoursSinceLastPush =
        (Date.now() - new Date(lastPush).getTime()) / (1000 * 60 * 60);

      if (hoursSinceLastPush < 1) return false;
    }

    return prefs.pushCategories?.includes(messageType) ?? false;
  }
}

Here’s the event flow for preference updates:

CampaignEngineReadDBPreferenceProjectorEventBusCommandHandlerAPICustomerCampaignEngineReadDBPreferenceProjectorEventBusCommandHandlerAPICustomerMay pause or reschedulepending messagesUpdate Email Frequency to WeeklyUpdatePreferencesCommandPreferencesUpdated EventEvent NotificationUpdate Materialized ViewEvent NotificationRecalculate Active Campaigns

GDPR Compliance Through Events

The “right to be forgotten” is actually easier with event sourcing:

class GDPRComplianceService {
  constructor(
    private eventStore: EventStore,
    private projectionRebuilder: ProjectionRebuilder
  ) {}

  async handleDataDeletionRequest(customerId: string): Promise<void> {
    // Step 1: Emit deletion event
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId,
      timestamp: new Date().toISOString(),
      eventType: 'CustomerDataDeletionRequested',
      reason: 'gdpr-right-to-be-forgotten'
    });

    // Step 2: Anonymize PII in existing events
    // Keep events for analytics but remove identifying data
    const events = await this.eventStore.getCustomerEvents(customerId);

    for (const event of events) {
      await this.anonymizeEvent(event);
    }

    // Step 3: Rebuild projections with anonymized data
    await this.projectionRebuilder.rebuild(customerId);

    // Step 4: Emit completion event for audit trail
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId,
      timestamp: new Date().toISOString(),
      eventType: 'CustomerDataDeleted',
      eventsAnonymized: events.length
    });
  }

  private async anonymizeEvent(event: CustomerEvent): Promise<void> {
    // Replace PII with anonymized values
    const anonymized = {
      ...event,
      email: this.hashPII(event.email || ''),
      ipAddress: this.maskIP(event.ipAddress || ''),
      userAgent: '[REDACTED]',
      // Keep non-PII for analytics
      eventType: event.eventType,
      timestamp: event.timestamp
    };

    await this.eventStore.replaceEvent(event.eventId, anonymized);
  }

  private hashPII(value: string): string {
    // One-way hash for anonymization
    return crypto.createHash('sha256').update(value).digest('hex');
  }

  private maskIP(ip: string): string {
    // Keep general location, remove specific identifier
    const parts = ip.split('.');
    return `${parts[0]}.${parts[1]}.0.0`;
  }
}

Important consideration: Decide early whether you need true deletion or anonymization. For analytics and business intelligence, anonymized events are valuable. For compliance, document your approach clearly.

Purchase Flow & E-commerce Events

E-commerce integration is where event-driven CRM shows its real power. Every step from browsing to delivery generates events that drive marketing automation.

Order Event Chain

A complete purchase generates a rich event stream:

// Complete order lifecycle events
interface CartCreated extends CustomerEvent {
  eventType: 'CartCreated';
  cartId: string;
  sessionId: string;
  source: 'web' | 'mobile' | 'api';
}

interface ItemAddedToCart extends CustomerEvent {
  eventType: 'ItemAddedToCart';
  cartId: string;
  productId: string;
  productName: string;
  quantity: number;
  price: number;
  currency: string;
}

interface CartAbandoned extends CustomerEvent {
  eventType: 'CartAbandoned';
  cartId: string;
  items: CartItem[];
  totalValue: number;
  currency: string;
  abandonedAt: string;
  timeInCart: number; // seconds
}

interface OrderPlaced extends CustomerEvent {
  eventType: 'OrderPlaced';
  orderId: string;
  cartId: string;
  items: OrderItem[];
  subtotal: number;
  tax: number;
  shipping: number;
  total: number;
  currency: string;
  shippingAddress: Address;
  billingAddress: Address;
}

interface PaymentInitiated extends CustomerEvent {
  eventType: 'PaymentInitiated';
  orderId: string;
  paymentMethod: 'credit-card' | 'paypal' | 'bank-transfer';
  amount: number;
  currency: string;
  paymentProvider: string;
}

interface PaymentSucceeded extends CustomerEvent {
  eventType: 'PaymentSucceeded';
  orderId: string;
  paymentId: string;
  amount: number;
  currency: string;
  transactionId: string;
}

interface PaymentFailed extends CustomerEvent {
  eventType: 'PaymentFailed';
  orderId: string;
  paymentId: string;
  amount: number;
  errorCode: string;
  errorMessage: string;
  retryable: boolean;
}

interface OrderConfirmed extends CustomerEvent {
  eventType: 'OrderConfirmed';
  orderId: string;
  confirmationNumber: string;
  estimatedDelivery: string;
}

interface OrderShipped extends CustomerEvent {
  eventType: 'OrderShipped';
  orderId: string;
  trackingNumber: string;
  carrier: string;
  shippedAt: string;
  estimatedDelivery: string;
}

interface OrderDelivered extends CustomerEvent {
  eventType: 'OrderDelivered';
  orderId: string;
  deliveredAt: string;
  signedBy?: string;
}

Order aggregate reconstruction from events:

class OrderAggregate {
  orderId: string;
  customerId: string;
  status: OrderStatus;
  items: OrderItem[] = [];
  totalValue: number = 0;
  paymentStatus: PaymentStatus;
  shippingStatus: ShippingStatus;
  timeline: OrderEvent[] = [];

  static fromEvents(events: CustomerEvent[]): OrderAggregate {
    const order = new OrderAggregate();

    for (const event of events) {
      order.apply(event);
    }

    return order;
  }

  private apply(event: CustomerEvent): void {
    this.timeline.push(event);

    switch (event.eventType) {
      case 'OrderPlaced':
        this.orderId = event.orderId;
        this.customerId = event.customerId;
        this.items = event.items;
        this.totalValue = event.total;
        this.status = 'pending-payment';
        break;

      case 'PaymentSucceeded':
        this.paymentStatus = 'paid';
        this.status = 'confirmed';
        break;

      case 'PaymentFailed':
        this.paymentStatus = 'failed';
        this.status = 'payment-failed';
        break;

      case 'OrderShipped':
        this.shippingStatus = 'shipped';
        this.status = 'in-transit';
        break;

      case 'OrderDelivered':
        this.shippingStatus = 'delivered';
        this.status = 'completed';
        break;
    }
  }

  // Business logic based on event history
  canBeCancelled(): boolean {
    return this.status === 'pending-payment' || this.status === 'confirmed';
  }

  canBeRefunded(): boolean {
    return this.paymentStatus === 'paid' &&
           this.shippingStatus !== 'delivered';
  }

  getTimeInStatus(status: OrderStatus): number {
    const statusEvents = this.timeline.filter(e =>
      this.eventResultsInStatus(e, status)
    );

    if (statusEvents.length === 0) return 0;

    const startTime = new Date(statusEvents[0].timestamp).getTime();
    const endTime = Date.now();
    return endTime - startTime;
  }
}

Payment Event Handling

Payment failures need special attention in event-driven systems:

class PaymentEventHandler {
  async handlePaymentFailed(event: PaymentFailed): Promise<void> {
    // Record failure for analytics
    await this.analyticsService.trackPaymentFailure({
      orderId: event.orderId,
      errorCode: event.errorCode,
      amount: event.amount
    });

    if (event.retryable) {
      // Schedule automatic retry for transient failures
      await this.schedulePaymentRetry(event.orderId, {
        attempt: 1,
        maxAttempts: 3,
        backoffSeconds: 300 // 5 minutes
      });
    } else {
      // Non-retryable failure - trigger recovery campaign
      await this.campaignService.triggerCampaign({
        campaignId: 'payment-failed-recovery',
        customerId: event.customerId,
        data: {
          orderId: event.orderId,
          errorMessage: this.getFriendlyErrorMessage(event.errorCode),
          amount: event.amount,
          currency: event.currency
        }
      });
    }

    // Update order projection
    await this.orderProjection.updatePaymentStatus(
      event.orderId,
      'failed',
      event.errorCode
    );
  }

  async handlePaymentSucceeded(event: PaymentSucceeded): Promise<void> {
    // Trigger order confirmation workflow
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: event.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'OrderConfirmed',
      orderId: event.orderId,
      confirmationNumber: this.generateConfirmationNumber(),
      estimatedDelivery: this.calculateDeliveryDate()
    });
  }
}

Refund handling:

interface RefundInitiated extends CustomerEvent {
  eventType: 'RefundInitiated';
  orderId: string;
  refundId: string;
  amount: number;
  reason: 'customer-request' | 'quality-issue' | 'delivery-failed' | 'other';
  refundType: 'full' | 'partial';
  items?: string[]; // For partial refunds
}

interface RefundCompleted extends CustomerEvent {
  eventType: 'RefundCompleted';
  orderId: string;
  refundId: string;
  amount: number;
  completedAt: string;
  transactionId: string;
}

class RefundHandler {
  async initiateRefund(command: InitiateRefundCommand): Promise<void> {
    const orderEvents = await this.eventStore.getOrderEvents(command.orderId);
    const order = OrderAggregate.fromEvents(orderEvents);

    // Business rules
    if (!order.canBeRefunded()) {
      throw new Error('Order cannot be refunded');
    }

    const refundId = crypto.randomUUID();

    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: order.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'RefundInitiated',
      orderId: command.orderId,
      refundId,
      amount: command.amount,
      reason: command.reason,
      refundType: command.refundType
    });

    // Process with payment provider
    await this.paymentProvider.processRefund({
      transactionId: order.paymentTransactionId,
      amount: command.amount
    });
  }
}

Post-Purchase Marketing Automation

The order lifecycle drives sophisticated marketing campaigns:

class PostPurchaseAutomation {
  private campaigns: CampaignTrigger[] = [
    {
      triggerId: 'order-confirmation',
      campaignId: 'order-confirmation-email',
      eventPattern: {
        eventType: 'OrderConfirmed'
      },
      actions: [
        {
          type: 'send-email',
          config: {
            templateId: 'order-confirmation',
            personalization: ['orderDetails', 'estimatedDelivery']
          }
        }
      ]
    },
    {
      triggerId: 'shipping-notification',
      campaignId: 'shipping-update',
      eventPattern: {
        eventType: 'OrderShipped'
      },
      actions: [
        {
          type: 'send-email',
          config: {
            templateId: 'order-shipped',
            personalization: ['trackingNumber', 'carrier']
          }
        },
        {
          type: 'send-push',
          config: {
            title: 'Your order has shipped!',
            body: 'Track your delivery'
          }
        }
      ]
    },
    {
      triggerId: 'delivery-review-request',
      campaignId: 'post-delivery-review',
      eventPattern: {
        eventType: 'OrderDelivered'
      },
      actions: [
        {
          type: 'wait',
          config: { duration: '3days' }
        },
        {
          type: 'send-email',
          config: {
            templateId: 'review-request',
            personalization: ['products', 'reviewLinks']
          }
        }
      ]
    },
    {
      triggerId: 'replenishment-campaign',
      campaignId: 'reorder-reminder',
      eventPattern: {
        eventType: 'OrderDelivered',
        conditions: {
          // Only for consumable products
          productCategory: 'consumables'
        }
      },
      actions: [
        {
          type: 'wait',
          config: { duration: '30days' }
        },
        {
          type: 'send-email',
          config: {
            templateId: 'reorder-reminder',
            personalization: ['products', 'subscriptionOption']
          }
        }
      ]
    }
  ];

  async handleOrderEvent(event: CustomerEvent): Promise<void> {
    const matchingCampaigns = this.campaigns.filter(campaign =>
      this.eventMatches(event, campaign.eventPattern)
    );

    for (const campaign of matchingCampaigns) {
      await this.executeCampaign(event.customerId, campaign, event);
    }
  }
}

Purchase-Based Customer Segmentation

Events enable sophisticated customer segmentation:

class CustomerSegmentationEngine {
  async handleOrderDelivered(event: OrderDelivered): Promise<void> {
    // Calculate customer lifetime value from event history
    const purchaseEvents = await this.eventStore.getCustomerPurchases(
      event.customerId
    );

    const ltv = this.calculateLTV(purchaseEvents);
    const orderFrequency = this.calculateFrequency(purchaseEvents);
    const avgOrderValue = this.calculateAOV(purchaseEvents);

    // High-value customer identification
    if (ltv > 1000 && orderFrequency > 5) {
      await this.eventStore.appendEvent({
        eventId: crypto.randomUUID(),
        customerId: event.customerId,
        timestamp: new Date().toISOString(),
        eventType: 'CustomerSegmentAdded',
        segmentId: 'high-value-customers',
        segmentName: 'High Value Customers',
        criteria: { ltv, orderFrequency }
      });
    }

    // Product affinity tracking
    const productPreferences = this.analyzeProductAffinity(purchaseEvents);
    for (const [category, affinity] of Object.entries(productPreferences)) {
      if (affinity > 0.7) {
        await this.eventStore.appendEvent({
          eventId: crypto.randomUUID(),
          customerId: event.customerId,
          timestamp: new Date().toISOString(),
          eventType: 'ProductAffinityDetected',
          category,
          affinityScore: affinity,
          recommendedProducts: this.getRecommendations(category)
        });
      }
    }
  }

  // RFM (Recency, Frequency, Monetary) segmentation
  async calculateRFMSegments(customerId: string): Promise<RFMSegment> {
    const events = await this.eventStore.getCustomerPurchases(customerId);
    const now = Date.now();

    // Recency: Days since last purchase
    const lastPurchase = events.filter(e => e.eventType === 'OrderDelivered')
      .sort((a, b) => b.timestamp.localeCompare(a.timestamp))[0];

    const daysSinceLastPurchase = lastPurchase
      ? (now - new Date(lastPurchase.timestamp).getTime()) / (1000 * 60 * 60 * 24)
      : 999;

    // Frequency: Number of purchases
    const frequency = events.filter(e => e.eventType === 'OrderDelivered').length;

    // Monetary: Total spend
    const monetary = events
      .filter(e => e.eventType === 'PaymentSucceeded')
      .reduce((sum, e) => sum + e.amount, 0);

    // Score and segment
    const rfmScore = {
      recency: this.scoreRecency(daysSinceLastPurchase),
      frequency: this.scoreFrequency(frequency),
      monetary: this.scoreMonetary(monetary)
    };

    const segment = this.determineRFMSegment(rfmScore);

    return {
      customerId,
      recency: daysSinceLastPurchase,
      frequency,
      monetary,
      score: rfmScore,
      segment,
      calculatedAt: new Date().toISOString()
    };
  }

  private determineRFMSegment(score: RFMScore): string {
    // Champions: High value, frequent, recent
    if (score.recency >= 4 && score.frequency >= 4 && score.monetary >= 4) {
      return 'champions';
    }

    // Loyal Customers: Frequent buyers
    if (score.frequency >= 4) {
      return 'loyal-customers';
    }

    // At Risk: Used to be good, but declining
    if (score.recency <= 2 && score.frequency >= 3 && score.monetary >= 3) {
      return 'at-risk';
    }

    // New Customers: Recent but low frequency
    if (score.recency >= 4 && score.frequency <= 2) {
      return 'new-customers';
    }

    // Need Attention: Below average
    return 'needs-attention';
  }
}

Customer Journey & Funnel Tracking

Tracking customer journeys across touchpoints reveals optimization opportunities and drives personalization.

Journey Event Definitions

Comprehensive journey tracking requires fine-grained events:

// Awareness stage events
interface PageViewed extends CustomerEvent {
  eventType: 'PageViewed';
  pageUrl: string;
  pageTitle: string;
  referrer: string;
  sessionId: string;
  timeOnPage: number;
}

interface ProductViewed extends CustomerEvent {
  eventType: 'ProductViewed';
  productId: string;
  productName: string;
  productCategory: string;
  price: number;
  viewSource: 'search' | 'category' | 'recommendation' | 'direct';
}

interface SearchPerformed extends CustomerEvent {
  eventType: 'SearchPerformed';
  query: string;
  resultsCount: number;
  selectedResult?: string;
  sessionId: string;
}

// Consideration stage events
interface ProductCompared extends CustomerEvent {
  eventType: 'ProductCompared';
  productIds: string[];
  comparisonAttributes: string[];
}

interface ReviewRead extends CustomerEvent {
  eventType: 'ReviewRead';
  productId: string;
  reviewId: string;
  rating: number;
}

interface VideoWatched extends CustomerEvent {
  eventType: 'VideoWatched';
  videoId: string;
  productId?: string;
  watchDuration: number;
  totalDuration: number;
  completionRate: number;
}

// Conversion stage events
interface CheckoutStarted extends CustomerEvent {
  eventType: 'CheckoutStarted';
  cartId: string;
  cartValue: number;
  itemCount: number;
}

interface CheckoutStepCompleted extends CustomerEvent {
  eventType: 'CheckoutStepCompleted';
  cartId: string;
  step: 'shipping' | 'payment' | 'review';
  stepNumber: number;
}

interface CheckoutAbandoned extends CustomerEvent {
  eventType: 'CheckoutAbandoned';
  cartId: string;
  lastCompletedStep: string;
  abandonedValue: number;
  timeInCheckout: number;
}

Building Funnels from Events

Funnel analysis reconstructed from event streams:

class FunnelAnalyzer {
  // Define funnel stages
  private readonly purchaseFunnel = [
    { stage: 'awareness', events: ['PageViewed', 'ProductViewed'] },
    { stage: 'consideration', events: ['ProductCompared', 'ReviewRead'] },
    { stage: 'intent', events: ['ItemAddedToCart', 'CartCreated'] },
    { stage: 'checkout', events: ['CheckoutStarted', 'CheckoutStepCompleted'] },
    { stage: 'purchase', events: ['OrderPlaced', 'PaymentSucceeded'] }
  ];

  async analyzeFunnel(
    customerId: string,
    startDate: string,
    endDate: string
  ): Promise<FunnelAnalysis> {
    const events = await this.eventStore.getCustomerEvents(
      customerId,
      startDate,
      endDate
    );

    const funnelProgress: FunnelStage[] = [];
    let currentStage = 0;

    for (const event of events) {
      const stage = this.getFunnelStage(event.eventType);

      if (stage !== null && stage >= currentStage) {
        funnelProgress.push({
          stage: this.purchaseFunnel[stage].stage,
          event: event.eventType,
          timestamp: event.timestamp,
          data: event
        });
        currentStage = Math.max(currentStage, stage + 1);
      }
    }

    // Identify drop-off points
    const dropOffPoint = this.identifyDropOff(funnelProgress);

    // Calculate time spent in each stage
    const stageMetrics = this.calculateStageMetrics(funnelProgress);

    return {
      customerId,
      stages: funnelProgress,
      dropOffPoint,
      metrics: stageMetrics,
      completed: currentStage === this.purchaseFunnel.length,
      conversionRate: currentStage / this.purchaseFunnel.length
    };
  }

  private identifyDropOff(progress: FunnelStage[]): DropOffAnalysis | null {
    const lastStage = progress[progress.length - 1];
    const expectedNextStage = this.getNextStage(lastStage.stage);

    if (!expectedNextStage) {
      return null; // Completed funnel
    }

    const timeSinceLastStage =
      Date.now() - new Date(lastStage.timestamp).getTime();

    return {
      stage: lastStage.stage,
      nextExpectedStage: expectedNextStage,
      timeSinceLastActivity: timeSinceLastStage,
      likelihood: this.calculateDropOffLikelihood(timeSinceLastStage)
    };
  }

  private calculateStageMetrics(
    progress: FunnelStage[]
  ): Map<string, StageMetrics> {
    const metrics = new Map<string, StageMetrics>();

    for (let i = 0; i < progress.length - 1; i++) {
      const current = progress[i];
      const next = progress[i + 1];

      const timeInStage =
        new Date(next.timestamp).getTime() -
        new Date(current.timestamp).getTime();

      metrics.set(current.stage, {
        stage: current.stage,
        timeSpent: timeInStage,
        progressedToNext: true,
        events: progress.filter(p => p.stage === current.stage)
      });
    }

    return metrics;
  }
}

Multi-Touch Attribution

Understanding which touchpoints drive conversions:

interface TouchPoint {
  timestamp: string;
  channel: 'email' | 'sms' | 'push' | 'web' | 'social' | 'paid-ad';
  campaign?: string;
  eventType: string;
  value?: number;
}

class AttributionEngine {
  async calculateAttribution(
    customerId: string,
    conversionEvent: OrderPlaced
  ): Promise<AttributionModel> {
    // Get all touchpoints leading to conversion
    const touchpoints = await this.getCustomerTouchpoints(
      customerId,
      conversionEvent.timestamp
    );

    // Apply different attribution models
    return {
      firstTouch: this.firstTouchAttribution(touchpoints, conversionEvent),
      lastTouch: this.lastTouchAttribution(touchpoints, conversionEvent),
      linear: this.linearAttribution(touchpoints, conversionEvent),
      timeDecay: this.timeDecayAttribution(touchpoints, conversionEvent),
      positionBased: this.positionBasedAttribution(touchpoints, conversionEvent)
    };
  }

  private firstTouchAttribution(
    touchpoints: TouchPoint[],
    conversion: OrderPlaced
  ): Attribution {
    const first = touchpoints[0];
    return {
      model: 'first-touch',
      attribution: {
        [first.channel]: {
          credit: 100,
          value: conversion.total,
          campaign: first.campaign
        }
      }
    };
  }

  private lastTouchAttribution(
    touchpoints: TouchPoint[],
    conversion: OrderPlaced
  ): Attribution {
    const last = touchpoints[touchpoints.length - 1];
    return {
      model: 'last-touch',
      attribution: {
        [last.channel]: {
          credit: 100,
          value: conversion.total,
          campaign: last.campaign
        }
      }
    };
  }

  private linearAttribution(
    touchpoints: TouchPoint[],
    conversion: OrderPlaced
  ): Attribution {
    const creditPerTouch = 100 / touchpoints.length;
    const valuePerTouch = conversion.total / touchpoints.length;

    const attribution: Record<string, ChannelAttribution> = {};

    for (const touch of touchpoints) {
      if (!attribution[touch.channel]) {
        attribution[touch.channel] = {
          credit: 0,
          value: 0,
          touchCount: 0
        };
      }

      attribution[touch.channel].credit += creditPerTouch;
      attribution[touch.channel].value += valuePerTouch;
      attribution[touch.channel].touchCount += 1;
    }

    return {
      model: 'linear',
      attribution
    };
  }

  private timeDecayAttribution(
    touchpoints: TouchPoint[],
    conversion: OrderPlaced
  ): Attribution {
    const conversionTime = new Date(conversion.timestamp).getTime();
    const halfLife = 7 * 24 * 60 * 60 * 1000; // 7 days in milliseconds

    // Calculate weights using exponential decay
    const weights = touchpoints.map(touch => {
      const touchTime = new Date(touch.timestamp).getTime();
      const age = conversionTime - touchTime;
      return Math.exp(-age / halfLife);
    });

    const totalWeight = weights.reduce((sum, w) => sum + w, 0);

    const attribution: Record<string, ChannelAttribution> = {};

    touchpoints.forEach((touch, i) => {
      const credit = (weights[i] / totalWeight) * 100;
      const value = (weights[i] / totalWeight) * conversion.total;

      if (!attribution[touch.channel]) {
        attribution[touch.channel] = { credit: 0, value: 0, touchCount: 0 };
      }

      attribution[touch.channel].credit += credit;
      attribution[touch.channel].value += value;
      attribution[touch.channel].touchCount += 1;
    });

    return {
      model: 'time-decay',
      attribution
    };
  }

  private positionBasedAttribution(
    touchpoints: TouchPoint[],
    conversion: OrderPlaced
  ): Attribution {
    // 40% to first touch, 40% to last touch, 20% distributed to middle
    const attribution: Record<string, ChannelAttribution> = {};

    if (touchpoints.length === 1) {
      return this.firstTouchAttribution(touchpoints, conversion);
    }

    const first = touchpoints[0];
    const last = touchpoints[touchpoints.length - 1];
    const middle = touchpoints.slice(1, -1);

    // First touch: 40%
    attribution[first.channel] = {
      credit: 40,
      value: conversion.total * 0.4,
      touchCount: 1
    };

    // Last touch: 40%
    if (!attribution[last.channel]) {
      attribution[last.channel] = { credit: 0, value: 0, touchCount: 0 };
    }
    attribution[last.channel].credit += 40;
    attribution[last.channel].value += conversion.total * 0.4;
    attribution[last.channel].touchCount += 1;

    // Middle touches: 20% distributed equally
    if (middle.length > 0) {
      const creditPerMiddle = 20 / middle.length;
      const valuePerMiddle = (conversion.total * 0.2) / middle.length;

      for (const touch of middle) {
        if (!attribution[touch.channel]) {
          attribution[touch.channel] = { credit: 0, value: 0, touchCount: 0 };
        }
        attribution[touch.channel].credit += creditPerMiddle;
        attribution[touch.channel].value += valuePerMiddle;
        attribution[touch.channel].touchCount += 1;
      }
    }

    return {
      model: 'position-based',
      attribution
    };
  }
}

Real-Time Funnel Progression Campaigns

Trigger campaigns based on funnel position:

class FunnelProgressionAutomation {
  private funnelCampaigns: FunnelCampaign[] = [
    {
      name: 'Abandoned Browse Recovery',
      trigger: {
        stage: 'awareness',
        inactivityMinutes: 30,
        condition: 'viewed-multiple-products-no-cart'
      },
      actions: [
        {
          type: 'send-email',
          config: {
            templateId: 'browse-abandonment',
            personalization: ['viewedProducts', 'recommendations']
          }
        }
      ]
    },
    {
      name: 'Checkout Abandonment',
      trigger: {
        stage: 'checkout',
        inactivityMinutes: 60,
        condition: 'started-checkout-not-completed'
      },
      actions: [
        {
          type: 'wait',
          config: { duration: '1hour' }
        },
        {
          type: 'send-email',
          config: {
            templateId: 'checkout-abandonment',
            personalization: ['cartItems', 'checkoutLink', 'incentive']
          }
        },
        {
          type: 'wait',
          config: { duration: '24hours' }
        },
        {
          type: 'send-sms',
          config: {
            message: 'Complete your order and get 10% off!'
          }
        }
      ]
    },
    {
      name: 'Post-Purchase Cross-Sell',
      trigger: {
        stage: 'purchase',
        condition: 'order-delivered'
      },
      actions: [
        {
          type: 'wait',
          config: { duration: '7days' }
        },
        {
          type: 'send-email',
          config: {
            templateId: 'cross-sell',
            personalization: ['purchasedProducts', 'recommendations']
          }
        }
      ]
    }
  ];

  async monitorFunnelProgress(): Promise<void> {
    // Run periodically to check for customers stuck in funnel stages
    const stuckCustomers = await this.findStuckCustomers();

    for (const customer of stuckCustomers) {
      const analysis = await this.funnelAnalyzer.analyzeFunnel(
        customer.customerId,
        customer.sessionStart,
        new Date().toISOString()
      );

      const matchingCampaigns = this.funnelCampaigns.filter(campaign =>
        this.shouldTriggerCampaign(campaign, analysis)
      );

      for (const campaign of matchingCampaigns) {
        await this.executeFunnelCampaign(customer.customerId, campaign, analysis);
      }
    }
  }

  private shouldTriggerCampaign(
    campaign: FunnelCampaign,
    analysis: FunnelAnalysis
  ): boolean {
    if (!analysis.dropOffPoint) return false;

    const dropOff = analysis.dropOffPoint;
    const inactivityMinutes = dropOff.timeSinceLastActivity / (1000 * 60);

    return (
      dropOff.stage === campaign.trigger.stage &&
      inactivityMinutes >= campaign.trigger.inactivityMinutes &&
      this.checkCondition(campaign.trigger.condition, analysis)
    );
  }
}

This comprehensive journey tracking and funnel analysis enables precise, data-driven marketing decisions. By reconstructing customer paths from events, you can identify exactly where customers struggle and intervene with targeted campaigns.

Integration Patterns

Connecting with Third-Party Marketing Tools

Most marketing teams use specialized tools like SendGrid, Mailchimp, or HubSpot. Here’s how to integrate while maintaining event-driven benefits:

interface MarketingIntegration {
  syncCustomer(customerId: string, data: CustomerData): Promise<void>;
  syncConsent(customerId: string, consent: ConsentData): Promise<void>;
  syncSegment(customerId: string, segments: string[]): Promise<void>;
}

class SendGridIntegration implements MarketingIntegration {
  constructor(
    private apiKey: string,
    private eventStore: EventStore
  ) {}

  async handleCustomerEvent(event: CustomerEvent): Promise<void> {
    switch (event.eventType) {
      case 'CustomerCreated':
        await this.syncCustomer(event.customerId, {
          email: event.email,
          created_at: event.timestamp
        });
        break;

      case 'ConsentGranted':
        if (event.purpose === 'marketing' && event.channel === 'email') {
          await this.syncConsent(event.customerId, {
            status: 'subscribed',
            timestamp: event.timestamp
          });
        }
        break;

      case 'ConsentRevoked':
        if (event.purpose === 'marketing' && event.channel === 'email') {
          await this.syncConsent(event.customerId, {
            status: 'unsubscribed',
            timestamp: event.timestamp
          });
        }
        break;

      case 'CustomerSegmentAdded':
        await this.addToList(event.customerId, event.segmentId);
        break;
    }

    // Record integration event for debugging
    await this.eventStore.appendEvent({
      eventId: crypto.randomUUID(),
      customerId: event.customerId,
      timestamp: new Date().toISOString(),
      eventType: 'ThirdPartyIntegrationSynced',
      integration: 'sendgrid',
      action: event.eventType
    });
  }

  async syncCustomer(customerId: string, data: CustomerData): Promise<void> {
    // SendGrid API call with retry logic
    await this.sendGridAPI.post('/marketing/contacts', {
      contacts: [{
        email: data.email,
        custom_fields: {
          customer_id: customerId,
          created_at: data.created_at
        }
      }]
    });
  }

  // Implement other methods...
}

Dead Letter Queues for Failed Communications

Not all messages successfully deliver. Event-driven architecture makes failure handling explicit:

class ChannelHandler {
  constructor(
    private provider: EmailProvider,
    private eventStore: EventStore,
    private dlqHandler: DeadLetterQueueHandler
  ) {}

  async sendMessage(
    customerId: string,
    message: OutboundMessage
  ): Promise<void> {
    try {
      const result = await this.provider.send(message);

      // Record success
      await this.eventStore.appendEvent({
        eventId: crypto.randomUUID(),
        customerId,
        timestamp: new Date().toISOString(),
        eventType: 'MessageSent',
        channel: 'email',
        messageId: result.messageId,
        campaignId: message.campaignId
      });

    } catch (error) {
      // Check if error is retryable
      if (this.isRetryable(error)) {
        throw error; // Let event bus retry
      }

      // Non-retryable error - send to DLQ
      await this.dlqHandler.handleFailedMessage({
        customerId,
        message,
        error: error.message,
        timestamp: new Date().toISOString()
      });

      // Record failure event
      await this.eventStore.appendEvent({
        eventId: crypto.randomUUID(),
        customerId,
        timestamp: new Date().toISOString(),
        eventType: 'MessageFailed',
        channel: 'email',
        campaignId: message.campaignId,
        errorType: error.code,
        errorMessage: error.message
      });
    }
  }

  private isRetryable(error: any): boolean {
    // Provider rate limits, network issues - retry
    const retryableCodes = ['RATE_LIMIT', 'TIMEOUT', 'SERVICE_UNAVAILABLE'];
    return retryableCodes.includes(error.code);
  }
}

class DeadLetterQueueHandler {
  async handleFailedMessage(failure: FailedMessage): Promise<void> {
    // Store in DLQ for manual review
    await this.dlqStore.save(failure);

    // Alert on-call if high failure rate
    const recentFailures = await this.getRecentFailures('1hour');
    if (recentFailures.length > 100) {
      await this.alerting.trigger({
        severity: 'high',
        message: `High email failure rate: ${recentFailures.length} in last hour`,
        failures: recentFailures.slice(0, 10)
      });
    }

    // For specific errors, take automated action
    if (failure.error.includes('invalid-email')) {
      // Mark email as invalid in customer record
      await this.eventStore.appendEvent({
        eventId: crypto.randomUUID(),
        customerId: failure.customerId,
        timestamp: new Date().toISOString(),
        eventType: 'CustomerEmailInvalidated',
        reason: 'bounced-permanent'
      });
    }
  }
}

Scaling Considerations and Trade-offs

Performance: Real-Time vs Batch

I’ve seen teams struggle with the decision: should projections update in real-time or in batches?

Real-time processing:

  • Pros: Customer sees changes immediately, marketing campaigns react faster
  • Cons: Higher costs, more complex infrastructure, potential thundering herd
  • Best for: Consent updates, transactional notifications

Batch processing:

  • Pros: Better throughput, easier to optimize, cheaper
  • Cons: Eventual consistency delay, stale data in queries
  • Best for: Analytics projections, segment calculations, daily email campaigns

Here’s a hybrid approach that worked well:

class ProjectionOrchestrator {
  // Critical projections update immediately
  private realTimeProjections = new Set(['consent', 'preferences']);

  // Analytics projections batch every 5 minutes
  private batchProjections = new Set(['customer-360', 'segments']);

  async handleEvent(event: CustomerEvent): Promise<void> {
    const projectionType = this.classifyProjection(event.eventType);

    if (this.realTimeProjections.has(projectionType)) {
      // Process immediately
      await this.updateProjection(projectionType, event);
    } else {
      // Add to batch queue
      await this.batchQueue.enqueue(projectionType, event);
    }
  }

  private classifyProjection(eventType: string): string {
    // Map events to projection types
    const mapping: Record<string, string> = {
      'ConsentGranted': 'consent',
      'ConsentRevoked': 'consent',
      'PreferencesUpdated': 'preferences',
      'CustomerSegmentAdded': 'segments',
      'ProductViewed': 'customer-360'
    };

    return mapping[eventType] || 'customer-360';
  }
}

Cost Optimization

Event-driven CRM can get expensive fast if you’re not careful. Here’s what I’ve learned:

Event storage costs scale with write volume. Use TTLs aggressively:

// Keep detailed events for 90 days, then aggregate
const eventRetentionPolicy = {
  detailed: 90 * 86400, // 90 days in seconds
  aggregated: 7 * 365 * 86400 // 7 years for compliance
};

class EventArchiver {
  async archiveOldEvents(): Promise<void> {
    const cutoffDate = new Date();
    cutoffDate.setDate(cutoffDate.getDate() - 90);

    // Aggregate old events into summary records
    const customersToArchive = await this.getCustomersWithOldEvents(cutoffDate);

    for (const customerId of customersToArchive) {
      const events = await this.eventStore.getCustomerEvents(
        customerId,
        undefined,
        cutoffDate.toISOString()
      );

      // Create aggregated summary
      const summary = this.aggregateEvents(events);
      await this.summaryStore.save(customerId, summary);

      // Delete detailed events (they have TTL, but cleanup ensures it)
      await this.eventStore.deleteEvents(
        customerId,
        cutoffDate.toISOString()
      );
    }
  }

  private aggregateEvents(events: CustomerEvent[]): EventSummary {
    return {
      totalEvents: events.length,
      eventTypes: this.countByType(events),
      firstEvent: events[0]?.timestamp,
      lastEvent: events[events.length - 1]?.timestamp,
      consentHistory: this.summarizeConsents(events),
      // Keep legally required data
      gdprAuditTrail: this.buildAuditTrail(events)
    };
  }
}

Lambda costs for event processors - batch when possible:

// Instead of processing each event individually
// Process in micro-batches
class BatchedEventProcessor {
  private buffer: CustomerEvent[] = [];
  private flushInterval = 5000; // 5 seconds
  private maxBatchSize = 100;

  constructor() {
    setInterval(() => this.flush(), this.flushInterval);
  }

  async addEvent(event: CustomerEvent): Promise<void> {
    this.buffer.push(event);

    if (this.buffer.length >= this.maxBatchSize) {
      await this.flush();
    }
  }

  private async flush(): Promise<void> {
    if (this.buffer.length === 0) return;

    const batch = this.buffer.splice(0, this.maxBatchSize);

    // Process entire batch in one Lambda invocation
    await this.projectionBuilder.processBatch(batch);
  }
}

Schema Evolution Strategy

Your event schemas will change. Plan for it:

interface EventSchema {
  version: number;
  schema: any;
  compatibleWith?: number[];
}

class SchemaRegistry {
  private schemas = new Map<string, EventSchema[]>();

  registerSchema(eventType: string, schema: EventSchema): void {
    const existing = this.schemas.get(eventType) || [];
    existing.push(schema);
    this.schemas.set(eventType, existing);
  }

  getLatestSchema(eventType: string): EventSchema | undefined {
    const schemas = this.schemas.get(eventType);
    return schemas?.[schemas.length - 1];
  }

  // Validate event against schema before storing
  async validateEvent(event: CustomerEvent): Promise<boolean> {
    const schema = this.getLatestSchema(event.eventType);
    if (!schema) {
      console.warn(`No schema registered for ${event.eventType}`);
      return false;
    }

    // Use JSON Schema or similar for validation
    return this.validator.validate(event, schema.schema);
  }
}

Lessons Learned and Gotchas

After implementing several event-driven CRM systems, here are the patterns that consistently matter:

1. Idempotency is Non-Negotiable

Every external action (email send, API call, database write) must be idempotent. Events will be replayed, processors will retry, and you’ll send duplicate emails if you don’t handle this.

The pattern I use: store idempotency keys with every action and check before executing.

If checking consent adds 200ms to every message send, you’ll have a bottleneck. Cache consent status aggressively, with TTL of 5-10 minutes. For marketing emails, this delay is acceptable. For transactional emails, you might need shorter TTL or real-time checks.

3. Event Ordering Matters Less Than You Think

Most teams worry about event ordering, but for CRM it’s rarely critical. If a customer updates preferences twice in quick succession, the final state is what matters. Use timestamps and version numbers to handle conflicts:

class ConflictResolution {
  mergePreferences(existing: Preferences, incoming: Preferences): Preferences {
    // Last-write-wins based on timestamp
    return {
      emailFrequency:
        incoming.updatedAt > existing.emailFrequency.updatedAt
          ? incoming.emailFrequency
          : existing.emailFrequency,
      categories:
        incoming.updatedAt > existing.categories.updatedAt
          ? incoming.categories
          : existing.categories
    };
  }
}

4. Start Simple, Add Complexity When Needed

I’ve seen teams build complex saga orchestrators for simple “send email after signup” flows. Start with basic event handlers. Add saga patterns only when you have multi-step workflows with compensation logic.

5. Monitoring is Different

Traditional CRM monitoring checks “is the database up?” Event-driven monitoring checks:

  • Event processing lag (how far behind are projections?)
  • Dead letter queue depth (how many failures?)
  • Projection consistency (does aggregate match event replay?)
class CRMHealthCheck {
  async checkHealth(): Promise<HealthStatus> {
    const checks = await Promise.all([
      this.checkEventProcessingLag(),
      this.checkDLQDepth(),
      this.checkProjectionConsistency()
    ]);

    return {
      status: checks.every(c => c.healthy) ? 'healthy' : 'degraded',
      checks
    };
  }

  private async checkEventProcessingLag(): Promise<HealthCheck> {
    const latestEvent = await this.eventStore.getLatestEvent();
    const latestProjection = await this.projectionStore.getLatestUpdate();

    const lagMs = new Date(latestEvent.timestamp).getTime() -
                  new Date(latestProjection.timestamp).getTime();

    return {
      name: 'event-processing-lag',
      healthy: lagMs < 60000, // Less than 1 minute
      value: lagMs,
      message: `Projection lag: ${lagMs}ms`
    };
  }
}

Closing Thoughts

Event-driven CRM architecture solves real problems: GDPR compliance, multi-channel orchestration, and real-time personalization. But it introduces new complexity: eventual consistency, event schema evolution, and more moving parts.

The pattern works best when you need:

  • Complete audit trails for compliance
  • Complex, multi-step marketing automation
  • Integration with many external systems
  • Scalability beyond single-database limits

It’s overkill when you have:

  • Simple email list management
  • Small customer base (< 100k)
  • Primarily transactional communications

Start with event sourcing for consent and preferences - this gives you GDPR compliance benefits without full commitment. Add CQRS when your read patterns diverge significantly from write patterns. Build marketing automation on events when you need sophisticated workflows.

The architecture enables powerful CRM capabilities, but like any pattern, it’s a tool for specific problems. Use it where it adds value, not because it’s interesting.

Related posts

External Authorization Management Systems: Choosing the Right Platform for Your Architecture

A vendor-neutral evaluation of external authorization platforms including AWS Verified Permissions, SpiceDB, OpenFGA, Cerbos, and OPA. Covers architecture patterns, cost analysis, and a decision framework for engineering teams.

authorizationsecurityarchitecture+5
Multi-Account AWS Architecture: Event-Driven Systems at Scale

Learn multi-account AWS architecture patterns for building resilient event-driven systems. Explore account structure, EventBridge routing, cross-service communication, and operational challenges in distributed systems.

awseventbridgemulti-account+5
AWS Cognito + Verified Permissions for SaaS Authorization

A deep dive into building SaaS authorization with AWS Cognito and Verified Permissions. Covers Cedar policy language, multi-tenant patterns, JWT token flow, cost analysis, and common mistakes with TypeScript examples.

authorizationawscognito+4
SpiceDB vs Auth0 FGA: Relationship-Based Authorization Compared

A deep technical comparison of SpiceDB and Auth0 FGA (OpenFGA) -- two Zanzibar-inspired authorization systems with different trade-offs in schema design, consistency models, deployment, and scalability.

authorizationsecurityarchitecture+3
TypeScript AI SDK Comparison: Vercel AI SDK vs OpenAI Agents SDK for Agent Development

A practical comparison of TypeScript AI SDKs for building AI agents - Vercel AI SDK, OpenAI Agents SDK, and AWS Bedrock integration. Includes code examples, decision frameworks, and production patterns.

typescriptai-toolsserverless+4