こしあん
2024-12-18

CloudWatch Logsのサブスクリプションフィルターを試す


7{icon} {views}


CloudWatch Logsで特定のログ出力時にLambda関数を自動起動する方法をTerraformで実装してみた。サブスクリプションフィルターの設定ポイントや実際のテスト手順を具体的に解説します。

はじめに

CloudWatch Logsで特定の文字列が出力されたら、特定のLambdaを実行というような処理を行ってみます。これはCloudWatch Logsのサブスクリプションフィルターという機能で実現できます。

参考

ディレクトリ構造

.
├── lambda.tf
├── lambda1
│   └── lambda_function.py
├── lambda2
│   └── lambda_function.py

サブスクリプションフィルターをTerraformで実装するときのポイント

ポイントは以下の2点で、具体的には以下のコード。

  • lambda1のログに対する、サブスクリプションフィルタの定義(1番目)
  • lmabda2をCloudWatch Logsから呼び出すための許可設定(2番目)
# Lambda2をトリガーするサブスクリプションフィルターをLambda1のロググループに適用
resource "aws_cloudwatch_log_subscription_filter" "lambda1_subscription" {
  name            = "SubscriptionFilterLambda1"
  log_group_name  = aws_cloudwatch_log_group.log_groups["lambda1"].name
  filter_pattern  = "\"0が入力されました\""
  destination_arn = aws_lambda_function.lambda["lambda2"].arn

  # サブスクリプションフィルターを設定するためにLambda2への呼び出し権限を付与
  depends_on = [
    aws_lambda_permission.allow_cloudwatch_lambda2
  ]
}

# Lambda2にCloudWatch Logsからの呼び出しを許可
resource "aws_lambda_permission" "allow_cloudwatch_lambda2" {
  statement_id  = "AllowCloudWatchInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.lambda["lambda2"].function_name
  principal     = "logs.amazonaws.com"
  # ロググループのARNに ':*' を付与して任意のログストリームからの呼び出しを許可(これないと権限不足で失敗)
  source_arn    = "${aws_cloudwatch_log_group.log_groups["lambda1"].arn}:*"
}

サブスクリプションフィルタの定義は、

  • aws_cloudwatch_log_subscription_filter.lambda1_subscriptionで、「0が入力されました」(0、10、20…で反応するのを想定)をトリガーとする
  • log_group_nameが読み込み先で、destination_arnは起動するLambdaのARN
  • "をエスケープする必要がある

CloudWatch Logsからの呼び出しの許可は、

  • function_nameに呼び出し先の関数(Lambda2)の関数名を指定
  • actionは何をするか、関数の起動なのでlambda:InvokeFunctionを指定
  • source_arnは呼び出し元だが、ロググループのARNの任意のログストリームに対して呼び出しを許可する必要があるため、末尾に/*が必要。これがないと「権限が足りない」となってデプロイが永遠に終わらない
  • IAMポリシーの不足ではないので、ここが注意が必要

具体的には以下のようなエラー

│ Error: putting CloudWatch Logs Subscription Filter (SubscriptionFilterLambda1): operation error CloudWatch Logs: PutSubscriptionFilter, https response error StatusCode: 400, RequestID: xxx, InvalidParameterException: Could not execute the lambda function. Make sure you have given CloudWatch Logs permission to execute your function.

│ with aws_cloudwatch_log_subscription_filter.lambda1_subscription,
│ on lambda.tf line 55, in resource “aws_cloudwatch_log_subscription_filter” “lambda1_subscription”:
│ 55: resource “aws_cloudwatch_log_subscription_filter” “lambda1_subscription” {

テスト

ローカルから値を0~20と変えて3秒おきにアクセスしてみる

import boto3
import json
import time

# AWS Lambda クライアントの初期化
session = boto3.Session(profile_name='hogehoge')  # プロファイル名を適切に設定してください
lambda_client = session.client('lambda', region_name='ap-northeast-1')  # 適切なリージョンに変更してください

# Lambda1 の ARN を設定
LAMBDA_FUNCTION_NAME = 'subscription_filter_lambda1'  # Terraform で定義した関数名に変更してください

# リクエストを送信する回数(無限ループにする場合はコメントアウト)
MAX_REQUESTS = 20  # 必要に応じて変更、無限ループにする場合は削除

def main():
    try:
        for i in range(MAX_REQUESTS):
            # イベントペイロードの作成
            payload = {
                'input_variable': i
            }

            # Lambda 関数の呼び出し
            response = lambda_client.invoke(
                FunctionName=LAMBDA_FUNCTION_NAME,
                InvocationType='RequestResponse',  # 同期呼び出し
                Payload=json.dumps(payload)
            )

            # レスポンスの読み取り
            response_payload = response['Payload'].read()
            response_json = json.loads(response_payload)

            print(f"Sent input_variable: {i}, Lambda Response: {response_json}")

            # 3秒待機
            time.sleep(3)

    except KeyboardInterrupt:
        print("スクリプトを中断しました。")
    except Exception as e:
        print(f"エラーが発生しました: {e}")

if __name__ == "__main__":
    main()

CloudWatch Logsを見ると以下の通り

Lambda1のログ

3秒おきに毎回呼び出されているのがわかる

Lambda2のログ

ほぼ30秒おきに(Lambda1が0、10のタイミングで)呼び出されているのがわかる。Lambda1からは若干ラグはあるものの、ほぼリアルタイム。CloudWatch Logsの書き込みが遅延するとそのぶん遅れるので、そこの認識は必要。

Lambda2のトリガー設定

CloudWatch Logsからの実行が許可されている

全体のコード

lambda.tf

# ロールを作成
resource "aws_iam_role" "lambda_role" {
  name = "LambdaExecutionRole"
  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

# AWSLambdaBasicExecutionRoleマネージドポリシー
resource "aws_iam_role_policy_attachment" "managed_policy" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

locals {
  lambda_list = ["lambda1", "lambda2"]
}

# Lambdaの作成
data "archive_file" "lambda_zip" {
  for_each    = toset(local.lambda_list)
  type        = "zip"
  source_file = "${each.value}/lambda_function.py"
  output_path = ".cache/${each.value}.zip"
}

# Lambdaの作成(1, 2)
resource "aws_lambda_function" "lambda" {
  for_each         = toset(local.lambda_list)
  function_name    = "subscription_filter_${each.value}"
  role             = aws_iam_role.lambda_role.arn
  handler          = "lambda_function.lambda_handler"
  runtime          = "python3.12"
  filename         = data.archive_file.lambda_zip[each.value].output_path
  source_code_hash = data.archive_file.lambda_zip[each.value].output_base64sha256
}

# CloudWatch Logs グループを動的に作成
resource "aws_cloudwatch_log_group" "log_groups" {
  for_each          = toset(local.lambda_list)
  name              = "/aws/lambda/subscription_filter_${each.value}"
  retention_in_days = 14
}

# Lambda2をトリガーするサブスクリプションフィルターをLambda1のロググループに適用
resource "aws_cloudwatch_log_subscription_filter" "lambda1_subscription" {
  name            = "SubscriptionFilterLambda1"
  log_group_name  = aws_cloudwatch_log_group.log_groups["lambda1"].name
  filter_pattern  = "\"0が入力されました\""
  destination_arn = aws_lambda_function.lambda["lambda2"].arn

  # サブスクリプションフィルターを設定するためにLambda2への呼び出し権限を付与
  depends_on = [
    aws_lambda_permission.allow_cloudwatch_lambda2
  ]
}

# Lambda2にCloudWatch Logsからの呼び出しを許可
resource "aws_lambda_permission" "allow_cloudwatch_lambda2" {
  statement_id  = "AllowCloudWatchInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.lambda["lambda2"].function_name
  principal     = "logs.amazonaws.com"
  # ロググループのARNに ':*' を付与して任意のログストリームからの呼び出しを許可(これないと権限不足で失敗)
  source_arn    = "${aws_cloudwatch_log_group.log_groups["lambda1"].arn}:*"
}

lambda1/lambda_function.py

import json

def lambda_handler(event, context):
    input_variable = event.get('input_variable', '未定義')
    message = f"{input_variable}が入力されました"
    print(message)  # CloudWatch Logsに出力
    return {
        'statusCode': 200,
        'body': json.dumps(message)
    }

lambda2/lambda_function.py

import json

def lambda_handler(event, context):
    print("Lambda2が起動されました。")
    return {
        'statusCode': 200,
        'body': json.dumps("Lambda2が実行されました。")
    }

おわりに

これに頼るのはやや負けな気がするけど、実際いろいろ使い所ありそう



Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内

技術書コーナー

北海道の駅巡りコーナー


Add a Comment

メールアドレスが公開されることはありません。 が付いている欄は必須項目です