こしあん
2025-02-01

SQS→Lambdaでバッチ内の失敗メッセージだけ再試行する方法


33{icon} {views}


バッチ内で失敗したメッセージのみ再実行したい場合、LambdaのイベントソースマッピングにReportBatchItemFailuresを指定し、失敗したメッセージIDを返せばよい。これにより不要な再試行を避けながら、確実にメッセージを処理する仕組みを構築できる。

はじめに

  • SQS→Lambdaと連携するとバッチ内で、失敗するメッセージとそうでないメッセージがある
  • バッチ全体をやり直すのではなく、失敗したメッセージだけやり直すようにしたい
  • これはSQS→LambdaのイベントソースマッピングにあるReportBatchItemFailuresという機能を使えばできる

やり方

SQSの定義は普通通り。イベントソースマッピングの部分だけ変わる

イベントソースマッピング

  • function_response_typesに["ReportBatchItemFailures"]を追加する。これでLambda側が指定したフォーマットで失敗したメッセージIDを返せば自動的に再試行してくれるようになる
  • maximum_batching_window_in_secondsは失敗したメッセージをまとめるための時間窓。例えば、この値が30秒の場合、12:00:00にID=2のメッセージが失敗し、12:00:10にID=4,7のメッセージが失敗した場合、再試行時にID=2,4,7が一個のバッチにまとめられる
resource "aws_lambda_event_source_mapping" "sqs_event" {
  event_source_arn                   = aws_sqs_queue.my_queue.arn
  function_name                      = aws_lambda_function.sqs_lambda.arn
  enabled                            = true
  batch_size                         = 10
  maximum_batching_window_in_seconds = 30
  function_response_types            = ["ReportBatchItemFailures"]
  # SQSとLambdaの間で `ReportBatchItemFailures` を使用するために必要な設定
}

Lambda側の定義

  • リトライしたいメッセージIDをbatchItemFailuresで返せば良い。AWSのドキュメントがわかりやすい
  • この例は、50%の確率でランダムに失敗する処理
  • SQS→Lambdaのメッセージの引き渡しはこれまで通りevent['Records']で行い、forループで回す
  • これをlambda_function.pyで行う
import json
import random

def lambda_handler(event, context):
    """
    SQSからのイベントを処理し、一定確率で失敗する。
    失敗したメッセージのみをReportBatchItemFailuresで報告する。
    """
    FAILURE_RATE = 0.5  # 失敗率を50%に設定
    failed_messages = []
    success_body, failed_body = [], []

    for record in event['Records']:
        message_id = record['messageId']
        body = record['body']

        # ランダムに失敗するか判定
        if random.random() < FAILURE_RATE:
            print(f"Processing failed for message ID: {message_id}, Body : {body}")
            failed_body.append(body)
            failed_messages.append({'itemIdentifier': message_id})
        else:
            print(f"Processing succeeded for message ID: {message_id}")
            success_body.append(body)
            # ここに正常な処理を追加

    print(f"Success body: {success_body}")
    print(f"Failed body: {failed_body}")

    # 失敗したメッセージがあれば報告
    if failed_messages:
        return {
            'batchItemFailures': failed_messages
        }
    else:
        return {}

結果

1ラウンド目

CloudWatch Logsを見ると以下のような結果になった。

2025-02-01T12:57:47.007+09:00
Success body: ['Hello from boto3! This is message 3', 'Hello from boto3! This is message 6']
2025-02-01T12:57:47.007+09:00
Failed body: ['Hello from boto3! This is message 8']

2025-02-01T12:57:49.906+09:00
Success body: ['Hello from boto3! This is message 5', 'Hello from boto3! This is message 10', 'Hello from boto3! This is message 2']
2025-02-01T12:57:49.906+09:00
Failed body: ['Hello from boto3! This is message 7']

2025-02-01T12:57:49.924+09:00
Success body: ['Hello from boto3! This is message 4']
2025-02-01T12:57:49.924+09:00
Failed body: ['Hello from boto3! This is message 1']

2025-02-01T12:57:53.114+09:00
Success body: ['Hello from boto3! This is message 9']
2025-02-01T12:57:53.114+09:00
Failed body: []

表にまとめると以下の通り

成功 失敗
3
6 8
5
10
2 7
4 1
9

2ラウンド目

成功 失敗
7 8
1
2025-02-01T12:58:50.487+09:00
Success body: ['Hello from boto3! This is message 7']
2025-02-01T12:58:50.487+09:00
Failed body: ['Hello from boto3! This is message 8', 'Hello from boto3! This is message 1']

3ラウンド目

8と1は別々のバッチになっている。このへんは結構適当なようだ

成功 失敗
8
1
2025-02-01T12:59:51.308+09:00
Success body: []
2025-02-01T12:59:51.308+09:00
Failed body: ['Hello from boto3! This is message 8']

2025-02-01T12:59:52.964+09:00
Success body: []
2025-02-01T12:59:52.964+09:00
Failed body: ['Hello from boto3! This is message 1']

これが失敗が消えるまで続いていく

全体のコード

Terraform

resource "aws_sqs_queue" "my_queue" {
  name                       = "my-sqs-queue"
  visibility_timeout_seconds = 30
  message_retention_seconds  = 86400
}

resource "aws_iam_role" "lambda_role" {
  name = "lambda_sqs_execution_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [{
      Action = "sts:AssumeRole",
      Effect = "Allow",
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name = "lambda_sqs_policy"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ],
        Effect   = "Allow",
        Resource = aws_sqs_queue.my_queue.arn
      },
      {
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        Effect   = "Allow",
        Resource = "arn:aws:logs:*:*:*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_attach" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

resource "aws_lambda_function" "sqs_lambda" {
  filename                       = data.archive_file.lambda_zip.output_path
  function_name                  = "sqs_lambda_function"
  role                           = aws_iam_role.lambda_role.arn
  handler                        = "lambda_function.lambda_handler"
  runtime                        = "python3.12"
  source_code_hash               = data.archive_file.lambda_zip.output_base64sha256
  reserved_concurrent_executions = 1 # 実験用に1個に固定

  environment {
    variables = {
      # 環境変数が必要な場合はここに追加
    }
  }
}

data "archive_file" "lambda_zip" {
  type        = "zip"
  source_file = "${path.module}/lambda_function.py"
  output_path = "${path.module}/.cache/lambda_function.zip"
}

resource "aws_lambda_event_source_mapping" "sqs_event" {
  event_source_arn                   = aws_sqs_queue.my_queue.arn
  function_name                      = aws_lambda_function.sqs_lambda.arn
  enabled                            = true
  batch_size                         = 10
  maximum_batching_window_in_seconds = 30
  function_response_types            = ["ReportBatchItemFailures"]
  # SQSとLambdaの間で `ReportBatchItemFailures` を使用するために必要な設定
}

output "sqs_queue_url" {
  value = aws_sqs_queue.my_queue.url
}

output "lambda_function_name" {
  value = aws_lambda_function.sqs_lambda.function_name
}

ローカルからSQSへのメッセージ送信

import boto3
import sys

def main():
    queue_name = "my-sqs-queue"     # Terraformで作成したキュー名

    # SQSクライアントの作成
    boto_session = boto3.Session(profile_name="hogehoge")
    sqs_client = boto_session.client("sqs")

    # キューURLの取得
    try:
        response = sqs_client.get_queue_url(QueueName=queue_name)
        queue_url = response["QueueUrl"]
        print(f"Queue URL: {queue_url}")
    except sqs_client.exceptions.QueueDoesNotExist:
        print(f"Error: Queue '{queue_name}' does not exist.")
        sys.exit(1)

    # キューのARNを動的に取得
    try:
        attributes_response = sqs_client.get_queue_attributes(
            QueueUrl=queue_url,
            AttributeNames=["QueueArn"]
        )
        queue_arn = attributes_response["Attributes"]["QueueArn"]
        print(f"Queue ARN: {queue_arn}")
    except Exception as e:
        print("Failed to retrieve queue attributes:", e)
        sys.exit(1)

    # 10個のメッセージエントリーを作成
    entries = []
    for i in range(1, 11):
        entry = {
            "Id": f"msg_{i}",  # 各エントリーのユニークなID
            "MessageBody": f"Hello from boto3! This is message {i}",
            # オプションでMessageAttributesも設定可能
            "MessageAttributes": {
                "SampleAttribute": {
                    "StringValue": f"SampleValue_{i}",
                    "DataType": "String"
                }
            }
        }
        entries.append(entry)

    # 一括送信(最大10件まで)
    try:
        batch_response = sqs_client.send_message_batch(
            QueueUrl=queue_url,
            Entries=entries
        )
        if "Failed" in batch_response and batch_response["Failed"]:
            print("Some messages failed to send:")
            for failure in batch_response["Failed"]:
                print(failure)
        else:
            print("All messages sent successfully!")
            # 成功したメッセージのIDを表示
            for success in batch_response.get("Successful", []):
                print("Message ID:", success.get("MessageId"))
    except Exception as e:
        print("Failed to send batch messages:", e)
        sys.exit(1)

if __name__ == "__main__":
    main()

所感

  • 普通に便利だった
  • 実践的にはデッドレターキューと組み合わせて一定以上失敗したらDLQに入れるみたいな実装になるのだと思う


Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内

技術書コーナー

北海道の駅巡りコーナー


Add a Comment

メールアドレスが公開されることはありません。 が付いている欄は必須項目です