İçeriğe atla

2025-12-04

AWS Step Functions Derinlemesine: Dayanıklı Workflow Orchestration Geliştirme

Production-ready serverless workflow'lar için AWS Step Functions'ı öğren. Standard vs Express workflow'lar, Distributed Map processing, error handling pattern'leri, callback entegrasyonu ve CDK örnekleriyle maliyet optimizasyonu stratejilerini keşfet.

Özet

AWS Step Functions, serverless uygulamalar için güçlü workflow orchestration sağlıyor ama Standard vs Express workflow seçimi, doğru error handling implementasyonu ve maliyet optimizasyonu pratik deneyim gerektiriyor. Bu rehber, production-ready pattern’leri kapsıyor: büyük ölçekli processing için Distributed Map, Task Token’larla callback pattern’leri, direct service integration’lar ve masrafları %90’ın üzerinde azaltabilen maliyet optimizasyon stratejileri. Çalışan CDK kod örnekleri Amazon States Language (ASL) pattern’lerini, exponential backoff ile error handling’i ve production ortamlar için monitoring kurulumunu gösteriyor.

Orchestration Zorluğu

Sadece Lambda ile karmaşık serverless workflow’lar geliştirmek bakım sorunları yaratıyor. Lambda fonksiyonlarının içinde orchestration logic’in yaşadığı sistemlerle çalıştım - yüzlerce satır retry, error handling, state tracking ve conditional branching yönetimi. Production sorunlarını debug etmek, execution flow’ları yeniden oluşturmak için CloudWatch log’larını parse etmek anlamına geliyordu. Yeni adımlar eklemek kod değişikliği ve yeniden deployment gerektiriyordu.

Asıl karmaşıklık şu durumlarda ortaya çıkıyor:

  • Multi-step process’ler: Validation, payment, inventory update ve shipping koordinasyonu içeren sipariş işleme
  • Error recovery: Uygulama kodunda exponential backoff, circuit breaker’lar ve compensating transaction’ları implementasyon
  • State management: Lambda invocation’lar arasında DynamoDB veya Redis kullanarak workflow state’i track etmek latency ve maliyet ekliyor
  • Parallel processing: Concurrent task’ları koordine ederken failure’ları yönetmek ve sonuçları aggregate etmek
  • Human approval’lar: Uzun süren approval process’leri için callback mekanizmaları implementasyonu
  • Scale: Milyonlarca item’ı process etmek doğru orchestration olmadan Lambda timeout limitine takılıyor

Step Functions bu zorlukları visual workflow’lar, built-in error handling ve AWS service integration’larla çözüyor. Ama Standard ve Express workflow arasında seçim yapmak, pricing etkilerini anlamak ve production pattern’lerini implementasyon birden fazla kaynağa yayılmış dokümantasyonda gezinmeyi gerektiriyor.

Workflow Type’larını Anlamak

Step Functions için temel karar Standard ve Express workflow arasında seçim yapmak. Bu seçim maliyet, execution model ve görünürlüğü etkiliyor.

Standard Workflow’lar

Standard workflow’lar tam execution history ile exactly-once execution garantisi sağlıyor:

import * as cdk from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';

const processOrder = new tasks.LambdaInvoke(this, 'ProcessOrder', {
  lambdaFunction: orderFunction,
  outputPath: '$.Payload'
});

const validateInventory = new tasks.LambdaInvoke(this, 'ValidateInventory', {
  lambdaFunction: inventoryFunction,
  outputPath: '$.Payload'
});

const chargePayment = new tasks.LambdaInvoke(this, 'ChargePayment', {
  lambdaFunction: paymentFunction,
  outputPath: '$.Payload'
});

// Sipariş işleme için Standard workflow
const standardWorkflow = new sfn.StateMachine(this, 'OrderProcessing', {
  stateMachineType: sfn.StateMachineType.STANDARD,
  definition: processOrder
    .next(validateInventory)
    .next(chargePayment),
  timeout: cdk.Duration.days(7)
});

Standard workflow özellikleri:

  • Maksimum süre: 1 yıl (multi-day approval process’leri için kullanışlı)
  • Exactly-once execution garantisi
  • 90 gün boyunca tam execution history saklanıyor
  • Fiyatlandırma: 1.000 state transition başına $0.025
  • Step Functions console’da tam görünürlük
  • .sync ve .waitForTaskToken integration pattern’lerini destekliyor

Express Workflow’lar

Express workflow’lar high-throughput, kısa süreli processing için optimize edilmiş:

const receiveIoTEvent = new tasks.LambdaInvoke(this, 'ReceiveEvent', {
  lambdaFunction: receiveFunction,
  outputPath: '$.Payload'
});

const validateData = new tasks.LambdaInvoke(this, 'ValidateData', {
  lambdaFunction: validateFunction,
  outputPath: '$.Payload'
});

const storeData = new tasks.DynamoPutItem(this, 'StoreData', {
  table: dataTable,
  item: {
    id: tasks.DynamoAttributeValue.fromString(
      sfn.JsonPath.stringAt('$.eventId')
    ),
    timestamp: tasks.DynamoAttributeValue.fromNumber(
      sfn.JsonPath.numberAt('$.timestamp')
    )
  }
});

// IoT processing için Express workflow
const expressWorkflow = new sfn.StateMachine(this, 'IoTProcessing', {
  stateMachineType: sfn.StateMachineType.EXPRESS,
  definition: receiveIoTEvent
    .next(validateData)
    .next(storeData),
  timeout: cdk.Duration.minutes(5)
});

Express workflow özellikleri:

  • Maksimum süre: 5 dakika
  • At-least-once execution (birden fazla kez çalışabilir)
  • Sınırlı execution history (sadece CloudWatch Logs)
  • Fiyatlandırma: 1M request başına 1.00+GBsecondbas\cına1.00 + GB-second başına 0.00001667
  • Throughput: Saniyede 100.000+ execution
  • İki mod: Synchronous (sonucu bekle) ve Asynchronous (fire-and-forget)

Maliyet Karşılaştırması

Fiyatlandırma farkı scale’de belirgin hale geliyor:

// Senaryo: Ayda 10 milyon execution
// Her workflow: 10 state transition
// Ortalama süre: 2 saniye
// Memory: 512MB

// Standard Workflow:
// Toplam transition'lar: 10M * 10 = 100M
// Maliyet = (100.000.000 / 1.000) * $0.025 = $2.500/ay

// Express Workflow:
// Request maliyeti: (10.000.000 / 1.000.000) * $1.00 = $10
// GB-saniye: (512/1024) * 2 * 10.000.000 = 10.000.000
// Süre maliyeti: 10.000.000 * $0.00001667 = $167
// Toplam: $177/ay

// Tasarruf: $2.323/ay (%93 azalma)

Yüksek hacimli, kısa süreli workflow’lar için Express workflow’lar önemli maliyet tasarrufu sağlıyor. Standard workflow’lar uzun süren process’ler, exactly-once gereksinimi veya audit trail ihtiyacı için mantıklı.

Amazon States Language Pattern’leri

Step Functions workflow’ları JSON tabanlı bir spesifikasyon olan Amazon States Language (ASL) kullanılarak tanımlanıyor. Data flow kontrolünü anlamak maintainable workflow’lar oluşturmak için esaslı.

Data Flow Kontrolü

ASL, workflow state’leri arasında data akışını kontrol etmek için birkaç mekanizma sağlıyor:

// Conditional routing ile Choice state
const decisionTree = new sfn.Choice(this, 'RouteByWorkload', {
  stateName: 'Workload Router'
})
  .when(
    sfn.Condition.numberGreaterThan('$.itemCount', 1000),
    largeBatchProcessing
  )
  .when(
    sfn.Condition.numberGreaterThan('$.itemCount', 100),
    mediumBatchProcessing
  )
  .otherwise(smallBatchProcessing);

// Parallel processing için Map state
const processItems = new sfn.Map(this, 'ProcessEachItem', {
  maxConcurrency: 10,
  itemsPath: '$.items',
  parameters: {
    'item.$': '$$.Map.Item.Value',
    'index.$': '$$.Map.Item.Index',
    'executionId.$': '$$.Execution.Id'
  }
}).itemProcessor(
  new sfn.Pass(this, 'ProcessItem')
);

// Simultaneous execution için Parallel state
const parallelProcessing = new sfn.Parallel(this, 'FanOut', {
  resultPath: '$.parallelResults'
})
  .branch(processPayment)
  .branch(updateInventory)
  .branch(sendNotification);

// Dynamic timestamp ile Wait state
const waitForSchedule = new sfn.Wait(this, 'WaitUntilScheduled', {
  time: sfn.WaitTime.timestampPath('$.scheduledTime')
});

ResultPath ve Data Transformation

ResultPath parametresi task output’unun state output’unda nereye yerleştirileceğini kontrol ediyor:

// Output'u filter etmek için ResultSelector ile task
const transformedTask = new tasks.LambdaInvoke(this, 'GetUserData', {
  lambdaFunction: getUserFunction,
  resultSelector: {
    'userId.$': '$.Payload.id',
    'userName.$': '$.Payload.name',
    'email.$': '$.Payload.email'
  },
  resultPath: '$.user'
});

// Input: { orderId: '123', customerId: '456' }
// Lambda dönüyor: { id: 'u789', name: 'John', email: '[email protected]', internalData: {...} }
// ResultSelector filter ediyor: { userId: 'u789', userName: 'John', email: '[email protected]' }
// ResultPath yerleştiriyor: { orderId: '123', customerId: '456', user: { userId: 'u789', ... } }

Context Object Variable’ları

ASL execution metadata’ya erişmek için context variable’ları sağlıyor:

const taskWithContext = new tasks.LambdaInvoke(this, 'ProcessWithContext', {
  lambdaFunction: processFunction,
  payload: sfn.TaskInput.fromObject({
    'data.$': '$.inputData',
    'executionId.$': '$$.Execution.Id',
    'executionName.$': '$$.Execution.Name',
    'stateMachineId.$': '$$.StateMachine.Id',
    'timestamp.$': '$$.State.EnteredTime'
  })
});

Error Handling ve Retry Stratejileri

Production workflow’lar kapsamlı error handling gerektiriyor. Step Functions built-in retry ve catch mekanizmaları sağlıyor.

Retry ile Exponential Backoff

const resilientTask = new tasks.LambdaInvoke(this, 'ProcessPayment', {
  lambdaFunction: paymentFunction,
  payloadResponseOnly: true,
  retryOnServiceExceptions: true
})
  .addRetry({
    // Transient error'lar için hızlı retry'lar
    errors: ['States.TaskFailed', 'ThrottlingException', 'ServiceUnavailable'],
    interval: cdk.Duration.seconds(2),
    maxAttempts: 3,
    backoffRate: 2.0 // 2s, 4s, 8s
  })
  .addRetry({
    // Timeout error'ları için farklı strateji
    errors: ['States.Timeout'],
    interval: cdk.Duration.seconds(5),
    maxAttempts: 2,
    backoffRate: 1.5 // 5s, 7.5s
  });

Retry mekanizması transient failure’ları ek kod olmadan handle ediyor. backoffRate parametresi exponential backoff’u kontrol ediyor - 2.0 oranı her retry’dan sonra bekleme süresini ikiye katlıyor.

Not: payloadResponseOnly: true response yapısını tam Step Functions wrapper yerine sadece Lambda payload’ını döndürerek basitleştiriyor. Ancak bu workflow state’inde StatusCode ve ExecutedVersion gibi metadata’ya erişimi kaybettiğiniz anlamına geliyor. Bu metadata’ya debugging veya auditing için ihtiyacınız varsa bunun yerine outputPath: '$.Payload' kullanın.

Error Catching ve Compensation

const handlePaymentFailure = new tasks.SqsSendMessage(this, 'NotifyFailure', {
  queue: failureQueue,
  messageBody: sfn.TaskInput.fromObject({
    'orderId.$': '$.orderId',
    'error.$': '$.errorInfo.Error',
    'cause.$': '$.errorInfo.Cause'
  })
});

const notifyAdmin = new tasks.SnsPublish(this, 'AlertAdmin', {
  topic: adminTopic,
  message: sfn.TaskInput.fromText('Critical payment processing failure')
});

const paymentTask = new tasks.LambdaInvoke(this, 'ChargeCustomer', {
  lambdaFunction: paymentFunction,
  payloadResponseOnly: true
})
  .addCatch(handlePaymentFailure, {
    // Specific business error'ları handle et
    errors: ['PaymentDeclined', 'InsufficientFunds'],
    resultPath: '$.errorInfo'
  })
  .addCatch(notifyAdmin, {
    // Diğer tüm error'ları yakala
    errors: ['States.ALL'],
    resultPath: '$.error'
  });

Catch block’larındaki resultPath original input’u korurken error bilgisini ekliyor. Bu downstream state’lerin hem input data’ya hem error detaylarına erişmesini sağlıyor.

Lambda Error Handling

Lambda fonksiyonları Step Functions’ın yakalayabileceği specific error type’ları throw etmeli:

export const handler = async (event: { orderId: string, amount: number }) => {
  try {
    const result = await processPayment(event.amount, event.orderId);
    return result;
  } catch (error: any) {
    // Step Functions'ın yakalayabileceği specific error type'ları throw et
    if (error.code === 'INSUFFICIENT_FUNDS') {
      throw new Error('InsufficientFunds');
    }
    if (error.code === 'CARD_DECLINED') {
      throw new Error('PaymentDeclined');
    }
    // Generic retry logic için yeniden throw et
    throw error;
  }
};

Circuit Breaker Pattern

Birden fazla fallback seçeneği olan sistemler için:

const primaryApi = new tasks.LambdaInvoke(this, 'PrimaryAPI', {
  lambdaFunction: primaryApiFunction,
  payloadResponseOnly: true
})
  .addCatch(fallbackApi, {
    errors: ['States.ALL'],
    resultPath: '$.primaryError'
  });

const fallbackApi = new tasks.LambdaInvoke(this, 'FallbackAPI', {
  lambdaFunction: fallbackApiFunction,
  payloadResponseOnly: true
})
  .addCatch(logFailure, {
    errors: ['States.ALL'],
    resultPath: '$.fallbackError'
  });

const logFailure = new tasks.DynamoPutItem(this, 'LogFailure', {
  table: failureTable,
  item: {
    id: tasks.DynamoAttributeValue.fromString(
      sfn.JsonPath.stringAt('$.requestId')
    ),
    timestamp: tasks.DynamoAttributeValue.fromNumber(
      sfn.JsonPath.numberAt('$$.State.EnteredTime')
    )
  }
});

Bu pattern her seviyede error context korunarak otomatik fallback sağlıyor.

Büyük Ölçekli Processing için Distributed Map

Normal Map state’leri 40’a kadar concurrent iteration’ı destekliyor. Distributed Map milyonlarca item’ı process etmek için bu sınırı kaldırıyor.

Temel Distributed Map

import * as s3 from 'aws-cdk-lib/aws-s3';

const inputBucket = s3.Bucket.fromBucketName(this, 'InputBucket', 'data-input');
const resultsBucket = s3.Bucket.fromBucketName(this, 'ResultsBucket', 'data-results');

const processRecord = new tasks.LambdaInvoke(this, 'ProcessRecord', {
  lambdaFunction: processFunction,
  payloadResponseOnly: true
});

const distributedMap = new sfn.DistributedMap(this, 'ProcessLargeDataset', {
  maxConcurrency: 10000,
  itemReader: new sfn.S3JsonItemReader({
    bucket: inputBucket,
    key: 'input-data/*.json'
  }),
  resultWriter: new sfn.ResultWriter({
    bucket: resultsBucket,
    prefix: 'processing-results/'
  }),
  toleratedFailurePercentage: 5
});

distributedMap.itemProcessor(processRecord);

toleratedFailurePercentage parametresi bazı item’lar fail olsa bile processing’e devam etmeye izin veriyor. Bu partial failure’ların kabul edilebilir olduğu batch processing için kullanışlı.

CSV ve JSONL Processing

Distributed Map birden fazla input formatını destekliyor:

// CSV dosyalarını process et
const csvProcessing = new sfn.DistributedMap(this, 'ProcessCSV', {
  itemReader: new sfn.S3CsvItemReader({
    bucket: csvBucket,
    key: 'data/*.csv',
    csvHeaders: sfn.CsvHeaders.use(['id', 'name', 'value', 'timestamp'])
  })
});

// JSON Lines dosyalarını process et
const jsonlProcessing = new sfn.DistributedMap(this, 'ProcessJSONL', {
  itemReader: new sfn.S3JsonItemReader({
    bucket: logsBucket,
    key: 'application-logs/*.jsonl'
  })
});

Performance Özellikleri

10 milyon item’ı process etmek Distributed Map’in değerini gösteriyor:

// Senaryo: 10 milyon sensor reading'i process et
// Input: S3'te 12.000 JSON dosyası
// Processing: Her dosya için istatistik çıkar

const processReadings = new sfn.DistributedMap(this, 'ProcessSensorData', {
  maxConcurrency: 8000,
  itemReader: new sfn.S3JsonItemReader({
    bucket: sensorBucket,
    key: 'readings/*.json'
  }),
  resultWriter: new sfn.ResultWriter({
    bucket: resultsBucket,
    prefix: 'processed/'
  })
});

// Sequential processing: 50+ saat (10M * 18ms ortalama)
// Distributed Map: 15 dakika (8.000 parallel execution)
// Hızlanma: 200x iyileşme
// Maliyet: 10M state transition için ~$250

Dramatik iyileşme parallel processing’ten geliyor. maxConcurrency parametresi kaç child execution’ın aynı anda çalıştığını kontrol ediyor.

Task Token’larla Callback Pattern

Task Token’lar workflow’ların external event’leri beklerken duraklamasını sağlıyor - human approval’lar veya uzun süren process’ler için kullanışlı.

Human Approval Workflow

import * as sqs from 'aws-cdk-lib/aws-sqs';

const approvalQueue = new sqs.Queue(this, 'ApprovalQueue');

const requestApproval = new tasks.SqsSendMessage(this, 'SendApprovalRequest', {
  queue: approvalQueue,
  messageBody: sfn.TaskInput.fromObject({
    'orderId.$': '$.orderId',
    'amount.$': '$.amount',
    'customerId.$': '$.customerId',
    'taskToken': sfn.JsonPath.taskToken // Kritik: Callback için task token
  }),
  integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
  timeout: cdk.Duration.hours(24)
});

const handleTimeout = new tasks.SnsPublish(this, 'NotifyTimeout', {
  topic: timeoutTopic,
  message: sfn.TaskInput.fromText('Approval request timed out')
});

const approvalTask = requestApproval
  .addCatch(handleTimeout, {
    errors: ['States.Timeout'],
    resultPath: '$.timeoutError'
  });

Workflow bu state’te task token ile SendTaskSuccess veya SendTaskFailure çağrılana kadar duraklatılıyor.

Approval Request’leri İşleme

import { SQSEvent } from 'aws-lambda';
import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb';

const dynamodb = new DynamoDBClient({});

export const approvalHandler = async (event: SQSEvent) => {
  for (const record of event.Records) {
    const { orderId, amount, customerId, taskToken } = JSON.parse(record.body);

    // Admin UI için approval request'i sakla
    await dynamodb.send(new PutItemCommand({
      TableName: process.env.APPROVALS_TABLE!,
      Item: {
        orderId: { S: orderId },
        amount: { N: amount.toString() },
        customerId: { S: customerId },
        taskToken: { S: taskToken },
        status: { S: 'PENDING' },
        timestamp: { N: Date.now().toString() }
      }
    }));
  }
};

Approval Decision Callback

import { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda';
import { SFNClient, SendTaskSuccessCommand, SendTaskFailureCommand } from '@aws-sdk/client-sfn';
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb';

const sfn = new SFNClient({});
const dynamodb = new DynamoDBClient({});

export const approveHandler = async (event: APIGatewayProxyEvent): Promise<APIGatewayProxyResult> => {
  const { orderId, decision } = JSON.parse(event.body || '{}');

  // Approval request'i al
  const result = await dynamodb.send(new GetItemCommand({
    TableName: process.env.APPROVALS_TABLE!,
    Key: { orderId: { S: orderId } }
  }));

  if (!result.Item) {
    return { statusCode: 404, body: 'Approval request not found' };
  }

  const taskToken = result.Item.taskToken.S!;

  if (decision === 'approved') {
    await sfn.send(new SendTaskSuccessCommand({
      taskToken,
      output: JSON.stringify({ approved: true, orderId })
    }));
  } else {
    await sfn.send(new SendTaskFailureCommand({
      taskToken,
      error: 'ApprovalRejected',
      cause: 'Order rejected by admin'
    }));
  }

  return { statusCode: 200, body: 'Decision recorded' };
};

Callback invoke edildiğinde workflow execution’a devam ediyor. Bu pattern task token’ları saklayıp retrieve edebilen herhangi bir external sistemle çalışıyor.

Direct Service Integration’lar

Step Functions 220’nin üzerinde AWS servisiyle doğrudan entegre oluyor, Lambda wrapper’larına ihtiyacı ortadan kaldırıyor.

DynamoDB Integration

const saveOrder = new tasks.DynamoPutItem(this, 'SaveOrder', {
  table: ordersTable,
  item: {
    orderId: tasks.DynamoAttributeValue.fromString(
      sfn.JsonPath.stringAt('$.orderId')
    ),
    customerId: tasks.DynamoAttributeValue.fromString(
      sfn.JsonPath.stringAt('$.customerId')
    ),
    amount: tasks.DynamoAttributeValue.fromNumber(
      sfn.JsonPath.numberAt('$.amount')
    ),
    timestamp: tasks.DynamoAttributeValue.fromNumber(
      sfn.JsonPath.numberAt('$$.State.EnteredTime')
    ),
    status: tasks.DynamoAttributeValue.fromString('PROCESSING')
  }
});

Bu yaklaşım Lambda invocation maliyetlerini azaltıyor ve latency’yi düşürüyor.

SNS ve SQS Integration

const notifyUser = new tasks.SnsPublish(this, 'NotifyUser', {
  topic: notificationTopic,
  message: sfn.TaskInput.fromObject({
    'orderId.$': '$.orderId',
    'status': 'processing',
    'timestamp.$': '$$.State.EnteredTime'
  })
});

const queueWork = new tasks.SqsSendMessage(this, 'QueueWork', {
  queue: workQueue,
  messageBody: sfn.TaskInput.fromJsonPathAt('$.workload'),
  messageDeduplicationId: sfn.JsonPath.stringAt('$.orderId')
});

Sync ile ECS Task

Uzun süren batch job’lar için:

import * as ecs from 'aws-cdk-lib/aws-ecs';

const runBatchJob = new tasks.EcsRunTask(this, 'RunDataProcessing', {
  cluster: ecsCluster,
  taskDefinition: batchJobTask,
  launchTarget: new tasks.EcsFargateLaunchTarget(),
  integrationPattern: sfn.IntegrationPattern.RUN_JOB, // .sync pattern
  containerOverrides: [{
    containerDefinition: batchContainer,
    environment: [
      { name: 'INPUT_BUCKET', value: 'data-input' },
      { name: 'OUTPUT_BUCKET', value: 'data-output' }
    ]
  }]
});

Workflow ECS task tamamlanana kadar bekliyor, bu saatler sürebilir. Data processing, ML training veya diğer batch operation’lar için kullanışlı.

SDK Service Integration

Dedicated CDK construct’ları olmayan servisler için:

const updateSecret = new tasks.CallAwsService(this, 'UpdateSecret', {
  service: 'secretsmanager',
  action: 'updateSecret',
  parameters: {
    SecretId: 'MyApplicationSecret',
    SecretString: sfn.JsonPath.stringAt('$.newSecretValue')
  },
  iamResources: ['arn:aws:secretsmanager:*:*:secret:MyApplicationSecret-*']
});

CallAwsService construct’ı Step Functions’tan doğrudan herhangi bir AWS SDK API’sini çağırmayı sağlıyor.

Production Implementation Pattern

Logging, monitoring ve error handling içeren tam workflow implementasyonu:

import { Construct } from 'constructs';
import * as cdk from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as cloudwatch_actions from 'aws-cdk-lib/aws-cloudwatch-actions';

export class OrderProcessingWorkflow extends Construct {
  public readonly stateMachine: sfn.StateMachine;

  constructor(scope: Construct, id: string) {
    super(scope, id);

    // Lambda fonksiyonları
    const validateOrder = new lambda.Function(this, 'ValidateOrder', {
      runtime: lambda.Runtime.NODEJS_24_X,
      handler: 'validate.handler',
      code: lambda.Code.fromAsset('lambda'),
      timeout: cdk.Duration.seconds(30)
    });

    const processPayment = new lambda.Function(this, 'ProcessPayment', {
      runtime: lambda.Runtime.NODEJS_24_X,
      handler: 'payment.handler',
      code: lambda.Code.fromAsset('lambda'),
      timeout: cdk.Duration.seconds(60)
    });

    // Task tanımları
    const validateTask = new tasks.LambdaInvoke(this, 'ValidateOrderTask', {
      lambdaFunction: validateOrder,
      outputPath: '$.Payload'
    });

    const handlePaymentFailure = new sfn.Fail(this, 'PaymentFailed', {
      error: 'PaymentProcessingFailed',
      cause: 'Unable to process payment after retries'
    });

    const paymentTask = new tasks.LambdaInvoke(this, 'ProcessPaymentTask', {
      lambdaFunction: processPayment,
      outputPath: '$.Payload',
      retryOnServiceExceptions: true
    })
      .addRetry({
        errors: ['ThrottlingException'],
        interval: cdk.Duration.seconds(1),
        maxAttempts: 3,
        backoffRate: 2
      })
      .addCatch(handlePaymentFailure, {
        errors: ['PaymentFailed'],
        resultPath: '$.error'
      });

    const notifyInvalidOrder = new sfn.Fail(this, 'InvalidOrder', {
      error: 'OrderValidationFailed',
      cause: 'Order validation did not pass'
    });

    // Workflow tanımı
    const definition = validateTask
      .next(new sfn.Choice(this, 'OrderValid?')
        .when(sfn.Condition.booleanEquals('$.valid', true), paymentTask)
        .otherwise(notifyInvalidOrder)
      );

    // CloudWatch log group
    const logGroup = new logs.LogGroup(this, 'Logs', {
      retention: logs.RetentionDays.ONE_WEEK,
      removalPolicy: cdk.RemovalPolicy.DESTROY
    });

    // Logging ve tracing ile state machine
    this.stateMachine = new sfn.StateMachine(this, 'OrderProcessing', {
      definition,
      stateMachineType: sfn.StateMachineType.EXPRESS,
      logs: {
        destination: logGroup,
        level: sfn.LogLevel.ERROR,
        includeExecutionData: true
      },
      tracingEnabled: true // X-Ray'i etkinleştir
    });

    // CloudWatch alarm'ları
    const alertTopic = new sns.Topic(this, 'AlertTopic');

    const failureAlarm = new cloudwatch.Alarm(this, 'HighFailureRate', {
      metric: this.stateMachine.metricFailed({
        period: cdk.Duration.minutes(5)
      }),
      threshold: 10,
      evaluationPeriods: 2,
      treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING
    });

    failureAlarm.addAlarmAction(new cloudwatch_actions.SnsAction(alertTopic));
  }
}

Bu implementasyon production monitoring için error handling, logging, X-Ray tracing ve CloudWatch alarm’ları içeriyor.

Maliyet Optimizasyon Stratejileri

Step Functions pricing’i anlamak önemli maliyet azaltmaları sağlıyor.

State Transition’ları Minimize Et

// Anti-pattern: Gereksiz Pass state'leri
const inefficient = sfn.Chain.start(task1)
  .next(new sfn.Pass(this, 'PassData1', {})) // Gereksiz
  .next(task2)
  .next(new sfn.Pass(this, 'PassData2', {})) // Gereksiz
  .next(task3);
// Maliyet: 5 state transition

// Optimize edilmiş: Gereksiz state'leri kaldır
const efficient = sfn.Chain.start(task1)
  .next(task2)
  .next(task3);
// Maliyet: 3 state transition (%40 azalma)

Batch Processing

Item’ları tek tek process etmek çok sayıda state transition yaratıyor. Batching maliyetleri azaltıyor:

// 10.000 item'ı 100'lük 100 batch'te process et
const batchProcessor = new sfn.Map(this, 'ProcessBatches', {
  itemsPath: '$.batches',
  maxConcurrency: 10
}).itemProcessor(
  new tasks.LambdaInvoke(this, 'ProcessBatch', {
    lambdaFunction: batchFunction,
    payload: sfn.TaskInput.fromObject({
      'items.$': '$$.Map.Item.Value',
      'batchId.$': '$$.Map.Item.Index'
    })
  })
);

// Tekli processing: 10.000 state transition
// Batch processing: 100 state transition
// Tasarruf: %99 azalma

Direct Service Integration’lar

Direct integration’ları kullanmak Lambda maliyetlerini ortadan kaldırıyor:

// Orijinal: DynamoDB için Lambda wrapper
const withLambda = new tasks.LambdaInvoke(this, 'SaveOrder', {
  lambdaFunction: dynamoWrapperFunction
});
// Maliyet: Lambda invocation + state transition

// Optimize edilmiş: Direct DynamoDB integration
const directIntegration = new tasks.DynamoPutItem(this, 'SaveOrder', {
  table: ordersTable,
  item: { /* ... */ }
});
// Maliyet: Sadece state transition (Lambda maliyeti yok)

Monitoring ve Observability

Production workflow’lar kapsamlı monitoring gerektiriyor.

CloudWatch Metric’leri

const workflow = new sfn.StateMachine(this, 'ProductionWorkflow', {
  definition,
  tracingEnabled: true
});

// Failure rate alarm'ı
const failureAlarm = new cloudwatch.Alarm(this, 'FailureRate', {
  metric: workflow.metricFailed({
    period: cdk.Duration.minutes(5),
    statistic: cloudwatch.Stats.SUM
  }),
  threshold: 10,
  evaluationPeriods: 2
});

// Duration alarm'ı (p99)
const durationAlarm = new cloudwatch.Alarm(this, 'LongExecution', {
  metric: workflow.metricDuration({
    statistic: cloudwatch.Stats.PERCENTILE_99
  }),
  threshold: cdk.Duration.minutes(10).toMilliseconds(),
  evaluationPeriods: 1
});

CloudWatch Dashboard

const dashboard = new cloudwatch.Dashboard(this, 'WorkflowDashboard', {
  dashboardName: 'step-functions-monitoring'
});

dashboard.addWidgets(
  new cloudwatch.GraphWidget({
    title: 'Execution Rate',
    left: [
      workflow.metricStarted({ label: 'Started' }),
      workflow.metricSucceeded({ label: 'Succeeded' }),
      workflow.metricFailed({ label: 'Failed' })
    ],
    width: 12
  }),
  new cloudwatch.GraphWidget({
    title: 'Execution Duration (ms)',
    left: [
      workflow.metricDuration({
        statistic: cloudwatch.Stats.AVERAGE,
        label: 'Average'
      }),
      workflow.metricDuration({
        statistic: cloudwatch.Stats.PERCENTILE_99,
        label: 'p99'
      })
    ],
    width: 12
  })
);

X-Ray Tracing

Servis dependency’lerini görselleştirmek ve bottleneck’leri belirlemek için X-Ray’i etkinleştir:

Step Functions

Lambda: Validate

Lambda: Payment

DynamoDB

External Payment API

SNS: Notifications

X-Ray her servis çağrısı için execution time gösteriyor, yavaş component’leri belirlemek kolaylaşıyor.

Alternatifleri Ne Zaman Kullanmalı

Step Functions her zaman doğru seçim değil. Alternatifleri ne zaman değerlendirmeli:

Temporal

Temporal’ı şu durumlarda kullan:

  • JSON/CDK tanımları yerine code-as-workflow tercih edildiğinde
  • Multi-cloud deployment gerektiğinde
  • Karmaşık business logic workflow’larda yaşadığında
  • Sub-saniye latency kritik olduğunda
  • Mock olmadan local development önemli olduğunda

AWS MWAA (Managed Airflow)

Airflow’u şu durumlarda kullan:

  • Data pipeline orchestration (ETL, batch processing)
  • Scheduled job’lar arasında karmaşık dependency’ler
  • Data engineer’lar için zengin UI esaslı
  • Data tool’larla entegrasyon (Spark, Hive, Presto)

Minimum maliyet ayda $350, bu yüzden Step Functions event-driven workload’lar için daha mantıklı.

EventBridge Pipes

Karmaşık logic olmadan basit event transformation ve routing için kullan:

const pipe = new pipes.Pipe(this, 'SimpleProcessing', {
  source: new pipes.SqsSource(queue),
  target: new pipes.LambdaTarget(processFunction),
  enrichment: new pipes.LambdaEnrichment(transformFunction),
  filter: pipes.Filter.fromObject({
    body: {
      amount: [{ numeric: ['>', 100] }]
    }
  })
});

Branching veya karmaşık error handling olmadan linear pipeline’lar için daha basit.

Ana Çıkarımlar

Step Functions ile çalışmak bana birkaç pratik ders öğretti:

  1. Yüksek hacimli, kısa süreli processing için Express workflow’ları seç - Uygun workload’lar için Standard’dan Express’e geçerken %90’ın üzerinde maliyet tasarrufu yaygın.

  2. Express workflow’lar için idempotency implementasyonu - At-least-once execution task’ların birden fazla kez çalışabileceği anlamına geliyor. Duplicate processing’i önlemek için idempotency key’leri sakla.

  3. Büyük ölçekli processing için Distributed Map kullan - Milyonlarca item’ı process etmek sequential processing’e göre 200x hız iyileştirmeleriyle pratik hale geliyor.

  4. Direct service integration’lardan yararlan - Lambda wrapper’ları ortadan kaldırmak maliyetleri ve latency’yi azaltırken mimaride basitleştirme sağlıyor.

  5. Error handling pattern’lerine hakim ol - Exponential backoff ile retry, specific error catching ve compensating transaction’lar resilient workflow’lar yaratıyor.

  6. CloudWatch ve X-Ray ile monitor et - Production deployment’tan önce failure rate, duration ve throttling için alarm’lar kur.

  7. ResultPath’i dikkatli kullan - Input data’yı korumamak debugging baş ağrılarına neden oluyor. Data akışını kontrol etmek için her zaman resultPath belirt.

  8. Callback pattern’lerde timeout ayarla - Task token’lar otomatik expire olmuyor. Her zaman timeout handling ekle.

  9. Batch processing state transition’ları azaltıyor - Item’ları tek tek yerine 100’lük gruplarda process etmek maliyetleri %99 kesiyor.

  10. Specific kullanım durumları için alternatifleri değerlendir - Code-first workflow’lar için Temporal, data pipeline’ları için Airflow, basit routing için EventBridge Pipes.

Step Functions, workflow type’larını, error handling’i ve maliyet etkilerini anladığında güçlü orchestration yetenekleri sağlıyor. ASL pattern’lerini ve CDK construct’ları öğrenmeye yapılan yatırım, binlerce execution’dan milyonlarca execution’a scale olan resilient, maintainable workflow’lar aracılığıyla geri dönüyor.

İlgili yazılar

AWS AppSync & GraphQL: Production-Ready Real-time API'ler Geliştirmek

AWS AppSync ile ölçeklenebilir real-time API'ler geliştirmek için kapsamlı bir rehber: JavaScript resolver'lar, subscription filtering, caching stratejileri ve infrastructure as code pattern'leri.

awsappsyncgraphql+5
SNS/SQS Cross-Account Fan-Out: AWS'de Multi-Account Event Dağıtımı

Amazon SNS ve SQS kullanarak güvenli cross-account event dağıtımı nasıl yapılır öğrenin. IAM policy'leri, KMS şifreleme, AWS CDK implementasyonu ve production'da karşılaşılan yaygın sorunları kapsıyor.

awsaws-snsaws-sqs+6
Saga Pattern ile Dağıtık Transaction'lar: ACID Olmadan Consistency Sağlamak

Microservices mimarisinde AWS Step Functions ve EventBridge kullanarak Saga pattern implementasyonu: idempotency, compensation logic ve production-ready pattern'ler.

saga-patterndistributed-systemsmicroservices+5
AWS Bedrock AgentCore'u CDK ile deploy etmek: hızlı başlangıç

AgentCore Runtime üzerinde minimal bir Strands agent'ı CDK ile deploy etme rehberi — parametrize stack, arm64 build, deploy ve invoke akışı, ve ilk çağrıdan önce gereken IAM ve Marketplace ön koşulları.

aws-bedrockai-agentsaws-cdk+3
TypeScript AI SDK Karşılaştırması: Agent Geliştirme için Vercel AI SDK vs OpenAI Agents SDK

AI agent geliştirmek için TypeScript SDK'larının pratik karşılaştırması - Vercel AI SDK, OpenAI Agents SDK ve AWS Bedrock entegrasyonu. Kod örnekleri, karar frameworkleri ve production patternleri içeriyor.

typescriptai-toolsserverless+4