2025-12-04
AWS Step Functions Derinlemesine: Dayanıklı Workflow Orchestration Geliştirme
Production-ready serverless workflow'lar için AWS Step Functions'ı öğren. Standard vs Express workflow'lar, Distributed Map processing, error handling pattern'leri, callback entegrasyonu ve CDK örnekleriyle maliyet optimizasyonu stratejilerini keşfet.
Özet
AWS Step Functions, serverless uygulamalar için güçlü workflow orchestration sağlıyor ama Standard vs Express workflow seçimi, doğru error handling implementasyonu ve maliyet optimizasyonu pratik deneyim gerektiriyor. Bu rehber, production-ready pattern’leri kapsıyor: büyük ölçekli processing için Distributed Map, Task Token’larla callback pattern’leri, direct service integration’lar ve masrafları %90’ın üzerinde azaltabilen maliyet optimizasyon stratejileri. Çalışan CDK kod örnekleri Amazon States Language (ASL) pattern’lerini, exponential backoff ile error handling’i ve production ortamlar için monitoring kurulumunu gösteriyor.
Orchestration Zorluğu
Sadece Lambda ile karmaşık serverless workflow’lar geliştirmek bakım sorunları yaratıyor. Lambda fonksiyonlarının içinde orchestration logic’in yaşadığı sistemlerle çalıştım - yüzlerce satır retry, error handling, state tracking ve conditional branching yönetimi. Production sorunlarını debug etmek, execution flow’ları yeniden oluşturmak için CloudWatch log’larını parse etmek anlamına geliyordu. Yeni adımlar eklemek kod değişikliği ve yeniden deployment gerektiriyordu.
Asıl karmaşıklık şu durumlarda ortaya çıkıyor:
- Multi-step process’ler: Validation, payment, inventory update ve shipping koordinasyonu içeren sipariş işleme
- Error recovery: Uygulama kodunda exponential backoff, circuit breaker’lar ve compensating transaction’ları implementasyon
- State management: Lambda invocation’lar arasında DynamoDB veya Redis kullanarak workflow state’i track etmek latency ve maliyet ekliyor
- Parallel processing: Concurrent task’ları koordine ederken failure’ları yönetmek ve sonuçları aggregate etmek
- Human approval’lar: Uzun süren approval process’leri için callback mekanizmaları implementasyonu
- Scale: Milyonlarca item’ı process etmek doğru orchestration olmadan Lambda timeout limitine takılıyor
Step Functions bu zorlukları visual workflow’lar, built-in error handling ve AWS service integration’larla çözüyor. Ama Standard ve Express workflow arasında seçim yapmak, pricing etkilerini anlamak ve production pattern’lerini implementasyon birden fazla kaynağa yayılmış dokümantasyonda gezinmeyi gerektiriyor.
Workflow Type’larını Anlamak
Step Functions için temel karar Standard ve Express workflow arasında seçim yapmak. Bu seçim maliyet, execution model ve görünürlüğü etkiliyor.
Standard Workflow’lar
Standard workflow’lar tam execution history ile exactly-once execution garantisi sağlıyor:
import * as cdk from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
const processOrder = new tasks.LambdaInvoke(this, 'ProcessOrder', {
lambdaFunction: orderFunction,
outputPath: '$.Payload'
});
const validateInventory = new tasks.LambdaInvoke(this, 'ValidateInventory', {
lambdaFunction: inventoryFunction,
outputPath: '$.Payload'
});
const chargePayment = new tasks.LambdaInvoke(this, 'ChargePayment', {
lambdaFunction: paymentFunction,
outputPath: '$.Payload'
});
// Sipariş işleme için Standard workflow
const standardWorkflow = new sfn.StateMachine(this, 'OrderProcessing', {
stateMachineType: sfn.StateMachineType.STANDARD,
definition: processOrder
.next(validateInventory)
.next(chargePayment),
timeout: cdk.Duration.days(7)
});
Standard workflow özellikleri:
- Maksimum süre: 1 yıl (multi-day approval process’leri için kullanışlı)
- Exactly-once execution garantisi
- 90 gün boyunca tam execution history saklanıyor
- Fiyatlandırma: 1.000 state transition başına $0.025
- Step Functions console’da tam görünürlük
.syncve.waitForTaskTokenintegration pattern’lerini destekliyor
Express Workflow’lar
Express workflow’lar high-throughput, kısa süreli processing için optimize edilmiş:
const receiveIoTEvent = new tasks.LambdaInvoke(this, 'ReceiveEvent', {
lambdaFunction: receiveFunction,
outputPath: '$.Payload'
});
const validateData = new tasks.LambdaInvoke(this, 'ValidateData', {
lambdaFunction: validateFunction,
outputPath: '$.Payload'
});
const storeData = new tasks.DynamoPutItem(this, 'StoreData', {
table: dataTable,
item: {
id: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.eventId')
),
timestamp: tasks.DynamoAttributeValue.fromNumber(
sfn.JsonPath.numberAt('$.timestamp')
)
}
});
// IoT processing için Express workflow
const expressWorkflow = new sfn.StateMachine(this, 'IoTProcessing', {
stateMachineType: sfn.StateMachineType.EXPRESS,
definition: receiveIoTEvent
.next(validateData)
.next(storeData),
timeout: cdk.Duration.minutes(5)
});
Express workflow özellikleri:
- Maksimum süre: 5 dakika
- At-least-once execution (birden fazla kez çalışabilir)
- Sınırlı execution history (sadece CloudWatch Logs)
- Fiyatlandırma: 1M request başına 0.00001667
- Throughput: Saniyede 100.000+ execution
- İki mod: Synchronous (sonucu bekle) ve Asynchronous (fire-and-forget)
Maliyet Karşılaştırması
Fiyatlandırma farkı scale’de belirgin hale geliyor:
// Senaryo: Ayda 10 milyon execution
// Her workflow: 10 state transition
// Ortalama süre: 2 saniye
// Memory: 512MB
// Standard Workflow:
// Toplam transition'lar: 10M * 10 = 100M
// Maliyet = (100.000.000 / 1.000) * $0.025 = $2.500/ay
// Express Workflow:
// Request maliyeti: (10.000.000 / 1.000.000) * $1.00 = $10
// GB-saniye: (512/1024) * 2 * 10.000.000 = 10.000.000
// Süre maliyeti: 10.000.000 * $0.00001667 = $167
// Toplam: $177/ay
// Tasarruf: $2.323/ay (%93 azalma)
Yüksek hacimli, kısa süreli workflow’lar için Express workflow’lar önemli maliyet tasarrufu sağlıyor. Standard workflow’lar uzun süren process’ler, exactly-once gereksinimi veya audit trail ihtiyacı için mantıklı.
Amazon States Language Pattern’leri
Step Functions workflow’ları JSON tabanlı bir spesifikasyon olan Amazon States Language (ASL) kullanılarak tanımlanıyor. Data flow kontrolünü anlamak maintainable workflow’lar oluşturmak için esaslı.
Data Flow Kontrolü
ASL, workflow state’leri arasında data akışını kontrol etmek için birkaç mekanizma sağlıyor:
// Conditional routing ile Choice state
const decisionTree = new sfn.Choice(this, 'RouteByWorkload', {
stateName: 'Workload Router'
})
.when(
sfn.Condition.numberGreaterThan('$.itemCount', 1000),
largeBatchProcessing
)
.when(
sfn.Condition.numberGreaterThan('$.itemCount', 100),
mediumBatchProcessing
)
.otherwise(smallBatchProcessing);
// Parallel processing için Map state
const processItems = new sfn.Map(this, 'ProcessEachItem', {
maxConcurrency: 10,
itemsPath: '$.items',
parameters: {
'item.$': '$$.Map.Item.Value',
'index.$': '$$.Map.Item.Index',
'executionId.$': '$$.Execution.Id'
}
}).itemProcessor(
new sfn.Pass(this, 'ProcessItem')
);
// Simultaneous execution için Parallel state
const parallelProcessing = new sfn.Parallel(this, 'FanOut', {
resultPath: '$.parallelResults'
})
.branch(processPayment)
.branch(updateInventory)
.branch(sendNotification);
// Dynamic timestamp ile Wait state
const waitForSchedule = new sfn.Wait(this, 'WaitUntilScheduled', {
time: sfn.WaitTime.timestampPath('$.scheduledTime')
});
ResultPath ve Data Transformation
ResultPath parametresi task output’unun state output’unda nereye yerleştirileceğini kontrol ediyor:
// Output'u filter etmek için ResultSelector ile task
const transformedTask = new tasks.LambdaInvoke(this, 'GetUserData', {
lambdaFunction: getUserFunction,
resultSelector: {
'userId.$': '$.Payload.id',
'userName.$': '$.Payload.name',
'email.$': '$.Payload.email'
},
resultPath: '$.user'
});
// Input: { orderId: '123', customerId: '456' }
// Lambda dönüyor: { id: 'u789', name: 'John', email: '[email protected]', internalData: {...} }
// ResultSelector filter ediyor: { userId: 'u789', userName: 'John', email: '[email protected]' }
// ResultPath yerleştiriyor: { orderId: '123', customerId: '456', user: { userId: 'u789', ... } }
Context Object Variable’ları
ASL execution metadata’ya erişmek için context variable’ları sağlıyor:
const taskWithContext = new tasks.LambdaInvoke(this, 'ProcessWithContext', {
lambdaFunction: processFunction,
payload: sfn.TaskInput.fromObject({
'data.$': '$.inputData',
'executionId.$': '$$.Execution.Id',
'executionName.$': '$$.Execution.Name',
'stateMachineId.$': '$$.StateMachine.Id',
'timestamp.$': '$$.State.EnteredTime'
})
});
Error Handling ve Retry Stratejileri
Production workflow’lar kapsamlı error handling gerektiriyor. Step Functions built-in retry ve catch mekanizmaları sağlıyor.
Retry ile Exponential Backoff
const resilientTask = new tasks.LambdaInvoke(this, 'ProcessPayment', {
lambdaFunction: paymentFunction,
payloadResponseOnly: true,
retryOnServiceExceptions: true
})
.addRetry({
// Transient error'lar için hızlı retry'lar
errors: ['States.TaskFailed', 'ThrottlingException', 'ServiceUnavailable'],
interval: cdk.Duration.seconds(2),
maxAttempts: 3,
backoffRate: 2.0 // 2s, 4s, 8s
})
.addRetry({
// Timeout error'ları için farklı strateji
errors: ['States.Timeout'],
interval: cdk.Duration.seconds(5),
maxAttempts: 2,
backoffRate: 1.5 // 5s, 7.5s
});
Retry mekanizması transient failure’ları ek kod olmadan handle ediyor. backoffRate parametresi exponential backoff’u kontrol ediyor - 2.0 oranı her retry’dan sonra bekleme süresini ikiye katlıyor.
Not: payloadResponseOnly: true response yapısını tam Step Functions wrapper yerine sadece Lambda payload’ını döndürerek basitleştiriyor. Ancak bu workflow state’inde StatusCode ve ExecutedVersion gibi metadata’ya erişimi kaybettiğiniz anlamına geliyor. Bu metadata’ya debugging veya auditing için ihtiyacınız varsa bunun yerine outputPath: '$.Payload' kullanın.
Error Catching ve Compensation
const handlePaymentFailure = new tasks.SqsSendMessage(this, 'NotifyFailure', {
queue: failureQueue,
messageBody: sfn.TaskInput.fromObject({
'orderId.$': '$.orderId',
'error.$': '$.errorInfo.Error',
'cause.$': '$.errorInfo.Cause'
})
});
const notifyAdmin = new tasks.SnsPublish(this, 'AlertAdmin', {
topic: adminTopic,
message: sfn.TaskInput.fromText('Critical payment processing failure')
});
const paymentTask = new tasks.LambdaInvoke(this, 'ChargeCustomer', {
lambdaFunction: paymentFunction,
payloadResponseOnly: true
})
.addCatch(handlePaymentFailure, {
// Specific business error'ları handle et
errors: ['PaymentDeclined', 'InsufficientFunds'],
resultPath: '$.errorInfo'
})
.addCatch(notifyAdmin, {
// Diğer tüm error'ları yakala
errors: ['States.ALL'],
resultPath: '$.error'
});
Catch block’larındaki resultPath original input’u korurken error bilgisini ekliyor. Bu downstream state’lerin hem input data’ya hem error detaylarına erişmesini sağlıyor.
Lambda Error Handling
Lambda fonksiyonları Step Functions’ın yakalayabileceği specific error type’ları throw etmeli:
export const handler = async (event: { orderId: string, amount: number }) => {
try {
const result = await processPayment(event.amount, event.orderId);
return result;
} catch (error: any) {
// Step Functions'ın yakalayabileceği specific error type'ları throw et
if (error.code === 'INSUFFICIENT_FUNDS') {
throw new Error('InsufficientFunds');
}
if (error.code === 'CARD_DECLINED') {
throw new Error('PaymentDeclined');
}
// Generic retry logic için yeniden throw et
throw error;
}
};
Circuit Breaker Pattern
Birden fazla fallback seçeneği olan sistemler için:
const primaryApi = new tasks.LambdaInvoke(this, 'PrimaryAPI', {
lambdaFunction: primaryApiFunction,
payloadResponseOnly: true
})
.addCatch(fallbackApi, {
errors: ['States.ALL'],
resultPath: '$.primaryError'
});
const fallbackApi = new tasks.LambdaInvoke(this, 'FallbackAPI', {
lambdaFunction: fallbackApiFunction,
payloadResponseOnly: true
})
.addCatch(logFailure, {
errors: ['States.ALL'],
resultPath: '$.fallbackError'
});
const logFailure = new tasks.DynamoPutItem(this, 'LogFailure', {
table: failureTable,
item: {
id: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.requestId')
),
timestamp: tasks.DynamoAttributeValue.fromNumber(
sfn.JsonPath.numberAt('$$.State.EnteredTime')
)
}
});
Bu pattern her seviyede error context korunarak otomatik fallback sağlıyor.
Büyük Ölçekli Processing için Distributed Map
Normal Map state’leri 40’a kadar concurrent iteration’ı destekliyor. Distributed Map milyonlarca item’ı process etmek için bu sınırı kaldırıyor.
Temel Distributed Map
import * as s3 from 'aws-cdk-lib/aws-s3';
const inputBucket = s3.Bucket.fromBucketName(this, 'InputBucket', 'data-input');
const resultsBucket = s3.Bucket.fromBucketName(this, 'ResultsBucket', 'data-results');
const processRecord = new tasks.LambdaInvoke(this, 'ProcessRecord', {
lambdaFunction: processFunction,
payloadResponseOnly: true
});
const distributedMap = new sfn.DistributedMap(this, 'ProcessLargeDataset', {
maxConcurrency: 10000,
itemReader: new sfn.S3JsonItemReader({
bucket: inputBucket,
key: 'input-data/*.json'
}),
resultWriter: new sfn.ResultWriter({
bucket: resultsBucket,
prefix: 'processing-results/'
}),
toleratedFailurePercentage: 5
});
distributedMap.itemProcessor(processRecord);
toleratedFailurePercentage parametresi bazı item’lar fail olsa bile processing’e devam etmeye izin veriyor. Bu partial failure’ların kabul edilebilir olduğu batch processing için kullanışlı.
CSV ve JSONL Processing
Distributed Map birden fazla input formatını destekliyor:
// CSV dosyalarını process et
const csvProcessing = new sfn.DistributedMap(this, 'ProcessCSV', {
itemReader: new sfn.S3CsvItemReader({
bucket: csvBucket,
key: 'data/*.csv',
csvHeaders: sfn.CsvHeaders.use(['id', 'name', 'value', 'timestamp'])
})
});
// JSON Lines dosyalarını process et
const jsonlProcessing = new sfn.DistributedMap(this, 'ProcessJSONL', {
itemReader: new sfn.S3JsonItemReader({
bucket: logsBucket,
key: 'application-logs/*.jsonl'
})
});
Performance Özellikleri
10 milyon item’ı process etmek Distributed Map’in değerini gösteriyor:
// Senaryo: 10 milyon sensor reading'i process et
// Input: S3'te 12.000 JSON dosyası
// Processing: Her dosya için istatistik çıkar
const processReadings = new sfn.DistributedMap(this, 'ProcessSensorData', {
maxConcurrency: 8000,
itemReader: new sfn.S3JsonItemReader({
bucket: sensorBucket,
key: 'readings/*.json'
}),
resultWriter: new sfn.ResultWriter({
bucket: resultsBucket,
prefix: 'processed/'
})
});
// Sequential processing: 50+ saat (10M * 18ms ortalama)
// Distributed Map: 15 dakika (8.000 parallel execution)
// Hızlanma: 200x iyileşme
// Maliyet: 10M state transition için ~$250
Dramatik iyileşme parallel processing’ten geliyor. maxConcurrency parametresi kaç child execution’ın aynı anda çalıştığını kontrol ediyor.
Task Token’larla Callback Pattern
Task Token’lar workflow’ların external event’leri beklerken duraklamasını sağlıyor - human approval’lar veya uzun süren process’ler için kullanışlı.
Human Approval Workflow
import * as sqs from 'aws-cdk-lib/aws-sqs';
const approvalQueue = new sqs.Queue(this, 'ApprovalQueue');
const requestApproval = new tasks.SqsSendMessage(this, 'SendApprovalRequest', {
queue: approvalQueue,
messageBody: sfn.TaskInput.fromObject({
'orderId.$': '$.orderId',
'amount.$': '$.amount',
'customerId.$': '$.customerId',
'taskToken': sfn.JsonPath.taskToken // Kritik: Callback için task token
}),
integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
timeout: cdk.Duration.hours(24)
});
const handleTimeout = new tasks.SnsPublish(this, 'NotifyTimeout', {
topic: timeoutTopic,
message: sfn.TaskInput.fromText('Approval request timed out')
});
const approvalTask = requestApproval
.addCatch(handleTimeout, {
errors: ['States.Timeout'],
resultPath: '$.timeoutError'
});
Workflow bu state’te task token ile SendTaskSuccess veya SendTaskFailure çağrılana kadar duraklatılıyor.
Approval Request’leri İşleme
import { SQSEvent } from 'aws-lambda';
import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb';
const dynamodb = new DynamoDBClient({});
export const approvalHandler = async (event: SQSEvent) => {
for (const record of event.Records) {
const { orderId, amount, customerId, taskToken } = JSON.parse(record.body);
// Admin UI için approval request'i sakla
await dynamodb.send(new PutItemCommand({
TableName: process.env.APPROVALS_TABLE!,
Item: {
orderId: { S: orderId },
amount: { N: amount.toString() },
customerId: { S: customerId },
taskToken: { S: taskToken },
status: { S: 'PENDING' },
timestamp: { N: Date.now().toString() }
}
}));
}
};
Approval Decision Callback
import { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda';
import { SFNClient, SendTaskSuccessCommand, SendTaskFailureCommand } from '@aws-sdk/client-sfn';
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb';
const sfn = new SFNClient({});
const dynamodb = new DynamoDBClient({});
export const approveHandler = async (event: APIGatewayProxyEvent): Promise<APIGatewayProxyResult> => {
const { orderId, decision } = JSON.parse(event.body || '{}');
// Approval request'i al
const result = await dynamodb.send(new GetItemCommand({
TableName: process.env.APPROVALS_TABLE!,
Key: { orderId: { S: orderId } }
}));
if (!result.Item) {
return { statusCode: 404, body: 'Approval request not found' };
}
const taskToken = result.Item.taskToken.S!;
if (decision === 'approved') {
await sfn.send(new SendTaskSuccessCommand({
taskToken,
output: JSON.stringify({ approved: true, orderId })
}));
} else {
await sfn.send(new SendTaskFailureCommand({
taskToken,
error: 'ApprovalRejected',
cause: 'Order rejected by admin'
}));
}
return { statusCode: 200, body: 'Decision recorded' };
};
Callback invoke edildiğinde workflow execution’a devam ediyor. Bu pattern task token’ları saklayıp retrieve edebilen herhangi bir external sistemle çalışıyor.
Direct Service Integration’lar
Step Functions 220’nin üzerinde AWS servisiyle doğrudan entegre oluyor, Lambda wrapper’larına ihtiyacı ortadan kaldırıyor.
DynamoDB Integration
const saveOrder = new tasks.DynamoPutItem(this, 'SaveOrder', {
table: ordersTable,
item: {
orderId: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.orderId')
),
customerId: tasks.DynamoAttributeValue.fromString(
sfn.JsonPath.stringAt('$.customerId')
),
amount: tasks.DynamoAttributeValue.fromNumber(
sfn.JsonPath.numberAt('$.amount')
),
timestamp: tasks.DynamoAttributeValue.fromNumber(
sfn.JsonPath.numberAt('$$.State.EnteredTime')
),
status: tasks.DynamoAttributeValue.fromString('PROCESSING')
}
});
Bu yaklaşım Lambda invocation maliyetlerini azaltıyor ve latency’yi düşürüyor.
SNS ve SQS Integration
const notifyUser = new tasks.SnsPublish(this, 'NotifyUser', {
topic: notificationTopic,
message: sfn.TaskInput.fromObject({
'orderId.$': '$.orderId',
'status': 'processing',
'timestamp.$': '$$.State.EnteredTime'
})
});
const queueWork = new tasks.SqsSendMessage(this, 'QueueWork', {
queue: workQueue,
messageBody: sfn.TaskInput.fromJsonPathAt('$.workload'),
messageDeduplicationId: sfn.JsonPath.stringAt('$.orderId')
});
Sync ile ECS Task
Uzun süren batch job’lar için:
import * as ecs from 'aws-cdk-lib/aws-ecs';
const runBatchJob = new tasks.EcsRunTask(this, 'RunDataProcessing', {
cluster: ecsCluster,
taskDefinition: batchJobTask,
launchTarget: new tasks.EcsFargateLaunchTarget(),
integrationPattern: sfn.IntegrationPattern.RUN_JOB, // .sync pattern
containerOverrides: [{
containerDefinition: batchContainer,
environment: [
{ name: 'INPUT_BUCKET', value: 'data-input' },
{ name: 'OUTPUT_BUCKET', value: 'data-output' }
]
}]
});
Workflow ECS task tamamlanana kadar bekliyor, bu saatler sürebilir. Data processing, ML training veya diğer batch operation’lar için kullanışlı.
SDK Service Integration
Dedicated CDK construct’ları olmayan servisler için:
const updateSecret = new tasks.CallAwsService(this, 'UpdateSecret', {
service: 'secretsmanager',
action: 'updateSecret',
parameters: {
SecretId: 'MyApplicationSecret',
SecretString: sfn.JsonPath.stringAt('$.newSecretValue')
},
iamResources: ['arn:aws:secretsmanager:*:*:secret:MyApplicationSecret-*']
});
CallAwsService construct’ı Step Functions’tan doğrudan herhangi bir AWS SDK API’sini çağırmayı sağlıyor.
Production Implementation Pattern
Logging, monitoring ve error handling içeren tam workflow implementasyonu:
import { Construct } from 'constructs';
import * as cdk from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as cloudwatch_actions from 'aws-cdk-lib/aws-cloudwatch-actions';
export class OrderProcessingWorkflow extends Construct {
public readonly stateMachine: sfn.StateMachine;
constructor(scope: Construct, id: string) {
super(scope, id);
// Lambda fonksiyonları
const validateOrder = new lambda.Function(this, 'ValidateOrder', {
runtime: lambda.Runtime.NODEJS_24_X,
handler: 'validate.handler',
code: lambda.Code.fromAsset('lambda'),
timeout: cdk.Duration.seconds(30)
});
const processPayment = new lambda.Function(this, 'ProcessPayment', {
runtime: lambda.Runtime.NODEJS_24_X,
handler: 'payment.handler',
code: lambda.Code.fromAsset('lambda'),
timeout: cdk.Duration.seconds(60)
});
// Task tanımları
const validateTask = new tasks.LambdaInvoke(this, 'ValidateOrderTask', {
lambdaFunction: validateOrder,
outputPath: '$.Payload'
});
const handlePaymentFailure = new sfn.Fail(this, 'PaymentFailed', {
error: 'PaymentProcessingFailed',
cause: 'Unable to process payment after retries'
});
const paymentTask = new tasks.LambdaInvoke(this, 'ProcessPaymentTask', {
lambdaFunction: processPayment,
outputPath: '$.Payload',
retryOnServiceExceptions: true
})
.addRetry({
errors: ['ThrottlingException'],
interval: cdk.Duration.seconds(1),
maxAttempts: 3,
backoffRate: 2
})
.addCatch(handlePaymentFailure, {
errors: ['PaymentFailed'],
resultPath: '$.error'
});
const notifyInvalidOrder = new sfn.Fail(this, 'InvalidOrder', {
error: 'OrderValidationFailed',
cause: 'Order validation did not pass'
});
// Workflow tanımı
const definition = validateTask
.next(new sfn.Choice(this, 'OrderValid?')
.when(sfn.Condition.booleanEquals('$.valid', true), paymentTask)
.otherwise(notifyInvalidOrder)
);
// CloudWatch log group
const logGroup = new logs.LogGroup(this, 'Logs', {
retention: logs.RetentionDays.ONE_WEEK,
removalPolicy: cdk.RemovalPolicy.DESTROY
});
// Logging ve tracing ile state machine
this.stateMachine = new sfn.StateMachine(this, 'OrderProcessing', {
definition,
stateMachineType: sfn.StateMachineType.EXPRESS,
logs: {
destination: logGroup,
level: sfn.LogLevel.ERROR,
includeExecutionData: true
},
tracingEnabled: true // X-Ray'i etkinleştir
});
// CloudWatch alarm'ları
const alertTopic = new sns.Topic(this, 'AlertTopic');
const failureAlarm = new cloudwatch.Alarm(this, 'HighFailureRate', {
metric: this.stateMachine.metricFailed({
period: cdk.Duration.minutes(5)
}),
threshold: 10,
evaluationPeriods: 2,
treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING
});
failureAlarm.addAlarmAction(new cloudwatch_actions.SnsAction(alertTopic));
}
}
Bu implementasyon production monitoring için error handling, logging, X-Ray tracing ve CloudWatch alarm’ları içeriyor.
Maliyet Optimizasyon Stratejileri
Step Functions pricing’i anlamak önemli maliyet azaltmaları sağlıyor.
State Transition’ları Minimize Et
// Anti-pattern: Gereksiz Pass state'leri
const inefficient = sfn.Chain.start(task1)
.next(new sfn.Pass(this, 'PassData1', {})) // Gereksiz
.next(task2)
.next(new sfn.Pass(this, 'PassData2', {})) // Gereksiz
.next(task3);
// Maliyet: 5 state transition
// Optimize edilmiş: Gereksiz state'leri kaldır
const efficient = sfn.Chain.start(task1)
.next(task2)
.next(task3);
// Maliyet: 3 state transition (%40 azalma)
Batch Processing
Item’ları tek tek process etmek çok sayıda state transition yaratıyor. Batching maliyetleri azaltıyor:
// 10.000 item'ı 100'lük 100 batch'te process et
const batchProcessor = new sfn.Map(this, 'ProcessBatches', {
itemsPath: '$.batches',
maxConcurrency: 10
}).itemProcessor(
new tasks.LambdaInvoke(this, 'ProcessBatch', {
lambdaFunction: batchFunction,
payload: sfn.TaskInput.fromObject({
'items.$': '$$.Map.Item.Value',
'batchId.$': '$$.Map.Item.Index'
})
})
);
// Tekli processing: 10.000 state transition
// Batch processing: 100 state transition
// Tasarruf: %99 azalma
Direct Service Integration’lar
Direct integration’ları kullanmak Lambda maliyetlerini ortadan kaldırıyor:
// Orijinal: DynamoDB için Lambda wrapper
const withLambda = new tasks.LambdaInvoke(this, 'SaveOrder', {
lambdaFunction: dynamoWrapperFunction
});
// Maliyet: Lambda invocation + state transition
// Optimize edilmiş: Direct DynamoDB integration
const directIntegration = new tasks.DynamoPutItem(this, 'SaveOrder', {
table: ordersTable,
item: { /* ... */ }
});
// Maliyet: Sadece state transition (Lambda maliyeti yok)
Monitoring ve Observability
Production workflow’lar kapsamlı monitoring gerektiriyor.
CloudWatch Metric’leri
const workflow = new sfn.StateMachine(this, 'ProductionWorkflow', {
definition,
tracingEnabled: true
});
// Failure rate alarm'ı
const failureAlarm = new cloudwatch.Alarm(this, 'FailureRate', {
metric: workflow.metricFailed({
period: cdk.Duration.minutes(5),
statistic: cloudwatch.Stats.SUM
}),
threshold: 10,
evaluationPeriods: 2
});
// Duration alarm'ı (p99)
const durationAlarm = new cloudwatch.Alarm(this, 'LongExecution', {
metric: workflow.metricDuration({
statistic: cloudwatch.Stats.PERCENTILE_99
}),
threshold: cdk.Duration.minutes(10).toMilliseconds(),
evaluationPeriods: 1
});
CloudWatch Dashboard
const dashboard = new cloudwatch.Dashboard(this, 'WorkflowDashboard', {
dashboardName: 'step-functions-monitoring'
});
dashboard.addWidgets(
new cloudwatch.GraphWidget({
title: 'Execution Rate',
left: [
workflow.metricStarted({ label: 'Started' }),
workflow.metricSucceeded({ label: 'Succeeded' }),
workflow.metricFailed({ label: 'Failed' })
],
width: 12
}),
new cloudwatch.GraphWidget({
title: 'Execution Duration (ms)',
left: [
workflow.metricDuration({
statistic: cloudwatch.Stats.AVERAGE,
label: 'Average'
}),
workflow.metricDuration({
statistic: cloudwatch.Stats.PERCENTILE_99,
label: 'p99'
})
],
width: 12
})
);
X-Ray Tracing
Servis dependency’lerini görselleştirmek ve bottleneck’leri belirlemek için X-Ray’i etkinleştir:
X-Ray her servis çağrısı için execution time gösteriyor, yavaş component’leri belirlemek kolaylaşıyor.
Alternatifleri Ne Zaman Kullanmalı
Step Functions her zaman doğru seçim değil. Alternatifleri ne zaman değerlendirmeli:
Temporal
Temporal’ı şu durumlarda kullan:
- JSON/CDK tanımları yerine code-as-workflow tercih edildiğinde
- Multi-cloud deployment gerektiğinde
- Karmaşık business logic workflow’larda yaşadığında
- Sub-saniye latency kritik olduğunda
- Mock olmadan local development önemli olduğunda
AWS MWAA (Managed Airflow)
Airflow’u şu durumlarda kullan:
- Data pipeline orchestration (ETL, batch processing)
- Scheduled job’lar arasında karmaşık dependency’ler
- Data engineer’lar için zengin UI esaslı
- Data tool’larla entegrasyon (Spark, Hive, Presto)
Minimum maliyet ayda $350, bu yüzden Step Functions event-driven workload’lar için daha mantıklı.
EventBridge Pipes
Karmaşık logic olmadan basit event transformation ve routing için kullan:
const pipe = new pipes.Pipe(this, 'SimpleProcessing', {
source: new pipes.SqsSource(queue),
target: new pipes.LambdaTarget(processFunction),
enrichment: new pipes.LambdaEnrichment(transformFunction),
filter: pipes.Filter.fromObject({
body: {
amount: [{ numeric: ['>', 100] }]
}
})
});
Branching veya karmaşık error handling olmadan linear pipeline’lar için daha basit.
Ana Çıkarımlar
Step Functions ile çalışmak bana birkaç pratik ders öğretti:
-
Yüksek hacimli, kısa süreli processing için Express workflow’ları seç - Uygun workload’lar için Standard’dan Express’e geçerken %90’ın üzerinde maliyet tasarrufu yaygın.
-
Express workflow’lar için idempotency implementasyonu - At-least-once execution task’ların birden fazla kez çalışabileceği anlamına geliyor. Duplicate processing’i önlemek için idempotency key’leri sakla.
-
Büyük ölçekli processing için Distributed Map kullan - Milyonlarca item’ı process etmek sequential processing’e göre 200x hız iyileştirmeleriyle pratik hale geliyor.
-
Direct service integration’lardan yararlan - Lambda wrapper’ları ortadan kaldırmak maliyetleri ve latency’yi azaltırken mimaride basitleştirme sağlıyor.
-
Error handling pattern’lerine hakim ol - Exponential backoff ile retry, specific error catching ve compensating transaction’lar resilient workflow’lar yaratıyor.
-
CloudWatch ve X-Ray ile monitor et - Production deployment’tan önce failure rate, duration ve throttling için alarm’lar kur.
-
ResultPath’i dikkatli kullan - Input data’yı korumamak debugging baş ağrılarına neden oluyor. Data akışını kontrol etmek için her zaman
resultPathbelirt. -
Callback pattern’lerde timeout ayarla - Task token’lar otomatik expire olmuyor. Her zaman timeout handling ekle.
-
Batch processing state transition’ları azaltıyor - Item’ları tek tek yerine 100’lük gruplarda process etmek maliyetleri %99 kesiyor.
-
Specific kullanım durumları için alternatifleri değerlendir - Code-first workflow’lar için Temporal, data pipeline’ları için Airflow, basit routing için EventBridge Pipes.
Step Functions, workflow type’larını, error handling’i ve maliyet etkilerini anladığında güçlü orchestration yetenekleri sağlıyor. ASL pattern’lerini ve CDK construct’ları öğrenmeye yapılan yatırım, binlerce execution’dan milyonlarca execution’a scale olan resilient, maintainable workflow’lar aracılığıyla geri dönüyor.
İlgili yazılar
AWS AppSync ile ölçeklenebilir real-time API'ler geliştirmek için kapsamlı bir rehber: JavaScript resolver'lar, subscription filtering, caching stratejileri ve infrastructure as code pattern'leri.
Amazon SNS ve SQS kullanarak güvenli cross-account event dağıtımı nasıl yapılır öğrenin. IAM policy'leri, KMS şifreleme, AWS CDK implementasyonu ve production'da karşılaşılan yaygın sorunları kapsıyor.
Microservices mimarisinde AWS Step Functions ve EventBridge kullanarak Saga pattern implementasyonu: idempotency, compensation logic ve production-ready pattern'ler.
AgentCore Runtime üzerinde minimal bir Strands agent'ı CDK ile deploy etme rehberi — parametrize stack, arm64 build, deploy ve invoke akışı, ve ilk çağrıdan önce gereken IAM ve Marketplace ön koşulları.
AI agent geliştirmek için TypeScript SDK'larının pratik karşılaştırması - Vercel AI SDK, OpenAI Agents SDK ve AWS Bedrock entegrasyonu. Kod örnekleri, karar frameworkleri ve production patternleri içeriyor.