CloudWatch Logsのサブスクリプションフィルターを試す
CloudWatch Logsで特定のログ出力時にLambda関数を自動起動する方法をTerraformで実装してみた。サブスクリプションフィルターの設定ポイントや実際のテスト手順を具体的に解説します。
目次
はじめに
CloudWatch Logsで特定の文字列が出力されたら、特定のLambdaを実行というような処理を行ってみます。これはCloudWatch Logsのサブスクリプションフィルターという機能で実現できます。
参考
ディレクトリ構造
.
├── lambda.tf
├── lambda1
│ └── lambda_function.py
├── lambda2
│ └── lambda_function.py
サブスクリプションフィルターをTerraformで実装するときのポイント
ポイントは以下の2点で、具体的には以下のコード。
- lambda1のログに対する、サブスクリプションフィルタの定義(1番目)
- lmabda2をCloudWatch Logsから呼び出すための許可設定(2番目)
# Lambda2をトリガーするサブスクリプションフィルターをLambda1のロググループに適用
resource "aws_cloudwatch_log_subscription_filter" "lambda1_subscription" {
name = "SubscriptionFilterLambda1"
log_group_name = aws_cloudwatch_log_group.log_groups["lambda1"].name
filter_pattern = "\"0が入力されました\""
destination_arn = aws_lambda_function.lambda["lambda2"].arn
# サブスクリプションフィルターを設定するためにLambda2への呼び出し権限を付与
depends_on = [
aws_lambda_permission.allow_cloudwatch_lambda2
]
}
# Lambda2にCloudWatch Logsからの呼び出しを許可
resource "aws_lambda_permission" "allow_cloudwatch_lambda2" {
statement_id = "AllowCloudWatchInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.lambda["lambda2"].function_name
principal = "logs.amazonaws.com"
# ロググループのARNに ':*' を付与して任意のログストリームからの呼び出しを許可(これないと権限不足で失敗)
source_arn = "${aws_cloudwatch_log_group.log_groups["lambda1"].arn}:*"
}
サブスクリプションフィルタの定義は、
aws_cloudwatch_log_subscription_filter.lambda1_subscription
で、「0が入力されました」(0、10、20…で反応するのを想定)をトリガーとするlog_group_name
が読み込み先で、destination_arn
は起動するLambdaのARN"
をエスケープする必要がある
CloudWatch Logsからの呼び出しの許可は、
function_name
に呼び出し先の関数(Lambda2)の関数名を指定- actionは何をするか、関数の起動なので
lambda:InvokeFunction
を指定 source_arn
は呼び出し元だが、ロググループのARNの任意のログストリームに対して呼び出しを許可する必要があるため、末尾に/*
が必要。これがないと「権限が足りない」となってデプロイが永遠に終わらない。- IAMポリシーの不足ではないので、ここが注意が必要
具体的には以下のようなエラー
│ Error: putting CloudWatch Logs Subscription Filter (SubscriptionFilterLambda1): operation error CloudWatch Logs: PutSubscriptionFilter, https response error StatusCode: 400, RequestID: xxx, InvalidParameterException: Could not execute the lambda function. Make sure you have given CloudWatch Logs permission to execute your function.
│
│ with aws_cloudwatch_log_subscription_filter.lambda1_subscription,
│ on lambda.tf line 55, in resource “aws_cloudwatch_log_subscription_filter” “lambda1_subscription”:
│ 55: resource “aws_cloudwatch_log_subscription_filter” “lambda1_subscription” {
テスト
ローカルから値を0~20と変えて3秒おきにアクセスしてみる
import boto3
import json
import time
# AWS Lambda クライアントの初期化
session = boto3.Session(profile_name='hogehoge') # プロファイル名を適切に設定してください
lambda_client = session.client('lambda', region_name='ap-northeast-1') # 適切なリージョンに変更してください
# Lambda1 の ARN を設定
LAMBDA_FUNCTION_NAME = 'subscription_filter_lambda1' # Terraform で定義した関数名に変更してください
# リクエストを送信する回数(無限ループにする場合はコメントアウト)
MAX_REQUESTS = 20 # 必要に応じて変更、無限ループにする場合は削除
def main():
try:
for i in range(MAX_REQUESTS):
# イベントペイロードの作成
payload = {
'input_variable': i
}
# Lambda 関数の呼び出し
response = lambda_client.invoke(
FunctionName=LAMBDA_FUNCTION_NAME,
InvocationType='RequestResponse', # 同期呼び出し
Payload=json.dumps(payload)
)
# レスポンスの読み取り
response_payload = response['Payload'].read()
response_json = json.loads(response_payload)
print(f"Sent input_variable: {i}, Lambda Response: {response_json}")
# 3秒待機
time.sleep(3)
except KeyboardInterrupt:
print("スクリプトを中断しました。")
except Exception as e:
print(f"エラーが発生しました: {e}")
if __name__ == "__main__":
main()
CloudWatch Logsを見ると以下の通り
Lambda1のログ
3秒おきに毎回呼び出されているのがわかる
Lambda2のログ
ほぼ30秒おきに(Lambda1が0、10のタイミングで)呼び出されているのがわかる。Lambda1からは若干ラグはあるものの、ほぼリアルタイム。CloudWatch Logsの書き込みが遅延するとそのぶん遅れるので、そこの認識は必要。
Lambda2のトリガー設定
CloudWatch Logsからの実行が許可されている
全体のコード
lambda.tf
# ロールを作成
resource "aws_iam_role" "lambda_role" {
name = "LambdaExecutionRole"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
# AWSLambdaBasicExecutionRoleマネージドポリシー
resource "aws_iam_role_policy_attachment" "managed_policy" {
role = aws_iam_role.lambda_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
locals {
lambda_list = ["lambda1", "lambda2"]
}
# Lambdaの作成
data "archive_file" "lambda_zip" {
for_each = toset(local.lambda_list)
type = "zip"
source_file = "${each.value}/lambda_function.py"
output_path = ".cache/${each.value}.zip"
}
# Lambdaの作成(1, 2)
resource "aws_lambda_function" "lambda" {
for_each = toset(local.lambda_list)
function_name = "subscription_filter_${each.value}"
role = aws_iam_role.lambda_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.12"
filename = data.archive_file.lambda_zip[each.value].output_path
source_code_hash = data.archive_file.lambda_zip[each.value].output_base64sha256
}
# CloudWatch Logs グループを動的に作成
resource "aws_cloudwatch_log_group" "log_groups" {
for_each = toset(local.lambda_list)
name = "/aws/lambda/subscription_filter_${each.value}"
retention_in_days = 14
}
# Lambda2をトリガーするサブスクリプションフィルターをLambda1のロググループに適用
resource "aws_cloudwatch_log_subscription_filter" "lambda1_subscription" {
name = "SubscriptionFilterLambda1"
log_group_name = aws_cloudwatch_log_group.log_groups["lambda1"].name
filter_pattern = "\"0が入力されました\""
destination_arn = aws_lambda_function.lambda["lambda2"].arn
# サブスクリプションフィルターを設定するためにLambda2への呼び出し権限を付与
depends_on = [
aws_lambda_permission.allow_cloudwatch_lambda2
]
}
# Lambda2にCloudWatch Logsからの呼び出しを許可
resource "aws_lambda_permission" "allow_cloudwatch_lambda2" {
statement_id = "AllowCloudWatchInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.lambda["lambda2"].function_name
principal = "logs.amazonaws.com"
# ロググループのARNに ':*' を付与して任意のログストリームからの呼び出しを許可(これないと権限不足で失敗)
source_arn = "${aws_cloudwatch_log_group.log_groups["lambda1"].arn}:*"
}
lambda1/lambda_function.py
import json
def lambda_handler(event, context):
input_variable = event.get('input_variable', '未定義')
message = f"{input_variable}が入力されました"
print(message) # CloudWatch Logsに出力
return {
'statusCode': 200,
'body': json.dumps(message)
}
lambda2/lambda_function.py
import json
def lambda_handler(event, context):
print("Lambda2が起動されました。")
return {
'statusCode': 200,
'body': json.dumps("Lambda2が実行されました。")
}
おわりに
これに頼るのはやや負けな気がするけど、実際いろいろ使い所ありそう
Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内
技術書コーナー
北海道の駅巡りコーナー