SQS→Lambdaでバッチ内の失敗メッセージだけ再試行する方法
Posted On 2025-02-01
バッチ内で失敗したメッセージのみ再実行したい場合、LambdaのイベントソースマッピングにReportBatchItemFailuresを指定し、失敗したメッセージIDを返せばよい。これにより不要な再試行を避けながら、確実にメッセージを処理する仕組みを構築できる。
目次
はじめに
- SQS→Lambdaと連携するとバッチ内で、失敗するメッセージとそうでないメッセージがある
- バッチ全体をやり直すのではなく、失敗したメッセージだけやり直すようにしたい
- これはSQS→LambdaのイベントソースマッピングにあるReportBatchItemFailuresという機能を使えばできる
やり方
SQSの定義は普通通り。イベントソースマッピングの部分だけ変わる
イベントソースマッピング
- function_response_typesに
["ReportBatchItemFailures"]
を追加する。これでLambda側が指定したフォーマットで失敗したメッセージIDを返せば自動的に再試行してくれるようになる maximum_batching_window_in_seconds
は失敗したメッセージをまとめるための時間窓。例えば、この値が30秒の場合、12:00:00にID=2のメッセージが失敗し、12:00:10にID=4,7のメッセージが失敗した場合、再試行時にID=2,4,7が一個のバッチにまとめられる
resource "aws_lambda_event_source_mapping" "sqs_event" {
event_source_arn = aws_sqs_queue.my_queue.arn
function_name = aws_lambda_function.sqs_lambda.arn
enabled = true
batch_size = 10
maximum_batching_window_in_seconds = 30
function_response_types = ["ReportBatchItemFailures"]
# SQSとLambdaの間で `ReportBatchItemFailures` を使用するために必要な設定
}
Lambda側の定義
- リトライしたいメッセージIDを
batchItemFailures
で返せば良い。AWSのドキュメントがわかりやすい - この例は、50%の確率でランダムに失敗する処理
- SQS→Lambdaのメッセージの引き渡しはこれまで通り
event['Records']
で行い、forループで回す - これを
lambda_function.py
で行う
import json
import random
def lambda_handler(event, context):
"""
SQSからのイベントを処理し、一定確率で失敗する。
失敗したメッセージのみをReportBatchItemFailuresで報告する。
"""
FAILURE_RATE = 0.5 # 失敗率を50%に設定
failed_messages = []
success_body, failed_body = [], []
for record in event['Records']:
message_id = record['messageId']
body = record['body']
# ランダムに失敗するか判定
if random.random() < FAILURE_RATE:
print(f"Processing failed for message ID: {message_id}, Body : {body}")
failed_body.append(body)
failed_messages.append({'itemIdentifier': message_id})
else:
print(f"Processing succeeded for message ID: {message_id}")
success_body.append(body)
# ここに正常な処理を追加
print(f"Success body: {success_body}")
print(f"Failed body: {failed_body}")
# 失敗したメッセージがあれば報告
if failed_messages:
return {
'batchItemFailures': failed_messages
}
else:
return {}
結果
1ラウンド目
CloudWatch Logsを見ると以下のような結果になった。
2025-02-01T12:57:47.007+09:00
Success body: ['Hello from boto3! This is message 3', 'Hello from boto3! This is message 6']
2025-02-01T12:57:47.007+09:00
Failed body: ['Hello from boto3! This is message 8']
2025-02-01T12:57:49.906+09:00
Success body: ['Hello from boto3! This is message 5', 'Hello from boto3! This is message 10', 'Hello from boto3! This is message 2']
2025-02-01T12:57:49.906+09:00
Failed body: ['Hello from boto3! This is message 7']
2025-02-01T12:57:49.924+09:00
Success body: ['Hello from boto3! This is message 4']
2025-02-01T12:57:49.924+09:00
Failed body: ['Hello from boto3! This is message 1']
2025-02-01T12:57:53.114+09:00
Success body: ['Hello from boto3! This is message 9']
2025-02-01T12:57:53.114+09:00
Failed body: []
表にまとめると以下の通り
成功 | 失敗 |
---|---|
3 | |
6 | 8 |
5 | |
10 | |
2 | 7 |
4 | 1 |
9 |
2ラウンド目
成功 | 失敗 |
---|---|
7 | 8 |
1 |
2025-02-01T12:58:50.487+09:00
Success body: ['Hello from boto3! This is message 7']
2025-02-01T12:58:50.487+09:00
Failed body: ['Hello from boto3! This is message 8', 'Hello from boto3! This is message 1']
3ラウンド目
8と1は別々のバッチになっている。このへんは結構適当なようだ
成功 | 失敗 |
---|---|
8 | |
1 |
2025-02-01T12:59:51.308+09:00
Success body: []
2025-02-01T12:59:51.308+09:00
Failed body: ['Hello from boto3! This is message 8']
2025-02-01T12:59:52.964+09:00
Success body: []
2025-02-01T12:59:52.964+09:00
Failed body: ['Hello from boto3! This is message 1']
これが失敗が消えるまで続いていく
全体のコード
Terraform
resource "aws_sqs_queue" "my_queue" {
name = "my-sqs-queue"
visibility_timeout_seconds = 30
message_retention_seconds = 86400
}
resource "aws_iam_role" "lambda_role" {
name = "lambda_sqs_execution_role"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "lambda.amazonaws.com"
}
}]
})
}
resource "aws_iam_policy" "lambda_policy" {
name = "lambda_sqs_policy"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
Effect = "Allow",
Resource = aws_sqs_queue.my_queue.arn
},
{
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
Effect = "Allow",
Resource = "arn:aws:logs:*:*:*"
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_attach" {
role = aws_iam_role.lambda_role.name
policy_arn = aws_iam_policy.lambda_policy.arn
}
resource "aws_lambda_function" "sqs_lambda" {
filename = data.archive_file.lambda_zip.output_path
function_name = "sqs_lambda_function"
role = aws_iam_role.lambda_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.12"
source_code_hash = data.archive_file.lambda_zip.output_base64sha256
reserved_concurrent_executions = 1 # 実験用に1個に固定
environment {
variables = {
# 環境変数が必要な場合はここに追加
}
}
}
data "archive_file" "lambda_zip" {
type = "zip"
source_file = "${path.module}/lambda_function.py"
output_path = "${path.module}/.cache/lambda_function.zip"
}
resource "aws_lambda_event_source_mapping" "sqs_event" {
event_source_arn = aws_sqs_queue.my_queue.arn
function_name = aws_lambda_function.sqs_lambda.arn
enabled = true
batch_size = 10
maximum_batching_window_in_seconds = 30
function_response_types = ["ReportBatchItemFailures"]
# SQSとLambdaの間で `ReportBatchItemFailures` を使用するために必要な設定
}
output "sqs_queue_url" {
value = aws_sqs_queue.my_queue.url
}
output "lambda_function_name" {
value = aws_lambda_function.sqs_lambda.function_name
}
ローカルからSQSへのメッセージ送信
import boto3
import sys
def main():
queue_name = "my-sqs-queue" # Terraformで作成したキュー名
# SQSクライアントの作成
boto_session = boto3.Session(profile_name="hogehoge")
sqs_client = boto_session.client("sqs")
# キューURLの取得
try:
response = sqs_client.get_queue_url(QueueName=queue_name)
queue_url = response["QueueUrl"]
print(f"Queue URL: {queue_url}")
except sqs_client.exceptions.QueueDoesNotExist:
print(f"Error: Queue '{queue_name}' does not exist.")
sys.exit(1)
# キューのARNを動的に取得
try:
attributes_response = sqs_client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=["QueueArn"]
)
queue_arn = attributes_response["Attributes"]["QueueArn"]
print(f"Queue ARN: {queue_arn}")
except Exception as e:
print("Failed to retrieve queue attributes:", e)
sys.exit(1)
# 10個のメッセージエントリーを作成
entries = []
for i in range(1, 11):
entry = {
"Id": f"msg_{i}", # 各エントリーのユニークなID
"MessageBody": f"Hello from boto3! This is message {i}",
# オプションでMessageAttributesも設定可能
"MessageAttributes": {
"SampleAttribute": {
"StringValue": f"SampleValue_{i}",
"DataType": "String"
}
}
}
entries.append(entry)
# 一括送信(最大10件まで)
try:
batch_response = sqs_client.send_message_batch(
QueueUrl=queue_url,
Entries=entries
)
if "Failed" in batch_response and batch_response["Failed"]:
print("Some messages failed to send:")
for failure in batch_response["Failed"]:
print(failure)
else:
print("All messages sent successfully!")
# 成功したメッセージのIDを表示
for success in batch_response.get("Successful", []):
print("Message ID:", success.get("MessageId"))
except Exception as e:
print("Failed to send batch messages:", e)
sys.exit(1)
if __name__ == "__main__":
main()
所感
- 普通に便利だった
- 実践的にはデッドレターキューと組み合わせて一定以上失敗したらDLQに入れるみたいな実装になるのだと思う
Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内
技術書コーナー
北海道の駅巡りコーナー