こしあん
2025-01-23

DynamoDBの消費したキャパシティユニットを取得する


29{icon} {views}


DynamoDBにアイテムをPutItemする際、レスポンスのConsumedCapacityやCloudWatch Metricsから消費キャパシティユニットを取得する手順を紹介した。Terraformを使ったテーブル構築例も記載している。

はじめに

  • DynamoDBには読み書きのキャパシティユニットという概念があるが、書き込み時・または過去から消費したキャパシティユニットの数を取得できることを知ったので試してみた。SAP勉強してたら出てきた
  • やり方は2通りある
    • DynamoDBでPutItemをしたときに、レスポンスで返ってくるConsumedCapacityCapacityUnitsを取得する
    • DynamoDBはCloudWatch MetricsにConsumedReadCapacityUnitsConsumedWriteCapacityUnitsという指標を自動登録しているので、その時系列データを取得する

両方ためしてみる

DynamoDBのデプロイ

Terraformでデプロイする。Id、Question、Answerという3つのカラムを持つテーブル。Idをプライマリーキーにする

NextIdというのが入っているが、これはレコード追加時にできたもの。

resource "aws_dynamodb_table" "example_table" {
  name         = "ExampleTable"
  billing_mode = "PAY_PER_REQUEST"
  hash_key = "Id" # プライマリーキー

  attribute {
    name = "Id"
    type = "N"
  }
}

PutItem時の消費キャパシティユニットの取得

put_itemのレスポンスに対して、ConsumedCapacity -> CapacityUnitsを取得する

        response = example_table.put_item(
            Item={
                'Id': generated_id,   # ここも合わせて修正
                'Question': question,
                'Answer': answer
            },
            ReturnConsumedCapacity='TOTAL'
        )
        consumed_capacity = response.get('ConsumedCapacity', {}).get('CapacityUnits', 'N/A')

全体コード

全体コード。質問と回答はOpenAIのAPIで生成する。

import openai
import boto3
from botocore.exceptions import ClientError  # ← 修正ポイント

# DynamoDBリソースの設定
boto_session = boto3.Session(profile_name="hogehoge")
dynamodb = boto_session.resource('dynamodb')
example_table = dynamodb.Table('ExampleTable')

def get_next_id():
    """
    ExampleTable 内に (Id=0) のアイテムをカウンターとして利用し、
    NextId をインクリメントして取得する。
    もし (Id=0) のアイテムが存在しなくても ADD は自動で作成してくれる。
    """
    try:
        response = example_table.update_item(
            Key={'Id': 0},
            UpdateExpression='ADD NextId :incr',
            ExpressionAttributeValues={':incr': 1},
            ReturnValues='UPDATED_NEW'
        )
        return response['Attributes']['NextId']
    except ClientError as e:
        raise e

def run_openai_message(user_input):
    # ChatGPT APIへのリクエスト
    response = openai.chat.completions.create(
        model="gpt-4o",  # または最新のモデルを指定
        messages=[
            {"role": "user", "content": user_input},
        ],
        temperature=0.7,
    )
    chat_response = response.choices[0].message.content.strip()

    return chat_response

def truncate_text(text, length=20):
    return text if len(text) <= length else text[:length] + "..."

def run():
    # 1. IDの取得
    try:
        generated_id = get_next_id()
    except Exception as e:
        print(f"IDの取得に失敗しました: {e}")
        return

    # 2. 質問の生成
    try:
        question = run_openai_message(
            "興味深い質問を1つ生成してください。")
    except Exception as e:
        print(f"質問の生成に失敗しました: {e}")
        return

    # 3. 回答の生成
    try:
        answer = run_openai_message(
            f"以下の質問に対して詳しい回答を提供してください。\n質問: {question}")
    except Exception as e:
        print(f"回答の生成に失敗しました: {e}")
        return

    # 4. DynamoDBへの登録
    try:
        response = example_table.put_item(
            Item={
                'Id': generated_id,   # ここも合わせて修正
                'Question': question,
                'Answer': answer
            },
            ReturnConsumedCapacity='TOTAL'
        )
        consumed_capacity = response.get('ConsumedCapacity', {}).get('CapacityUnits', 'N/A')
    except Exception as e:
        print(f"DynamoDBへの登録に失敗しました: {e}")
        return

    # 5. データの表示
    print(f"ID: {generated_id}")
    print(f"質問: {truncate_text(question)}")
    print(f"回答: {truncate_text(answer)}")
    print(f"消費キャパシティユニット: {consumed_capacity}")
    print("="*20)

def main(n=5):
    for i in range(n):
        run()

if __name__ == "__main__":
    main()

結果

書き込み時に消費したキャパシティユニットが返ってくる

ID: 3
質問: もし、時間旅行が可能になった場合、過去の...
回答: もし時間旅行が可能になった場合、訪れたい...
消費キャパシティユニット: 3.0
====================
ID: 4
質問: あなたの人生で最も影響を受けた出来事は何...
回答: 私自身はAIであり、個人的な経験や人生観...
消費キャパシティユニット: 3.0
====================
ID: 5
質問: 人間の意識はどのようにして物理的な脳の活...
回答: 人間の意識がどのようにして物理的な脳の活...
消費キャパシティユニット: 3.0
====================
ID: 6
質問: 確かに、こちらはいかがでしょうか:

「...
回答: 人間の感情を人工知能(AI)がどの程度ま...
消費キャパシティユニット: 3.0
====================
ID: 7
質問: もちろんです。以下の質問はいかがでしょう...
回答: これはとても興味深い質問ですね。時間旅行...
消費キャパシティユニット: 3.0
====================

CloudaWatch Metricsから取得する

DynamoDBはデフォルトでConsumedReadCapacityUnitsConsumedWriteCapacityUnitsというCloudWatch Metricsを送ってるので、そこから時系列データを取ってきて取得する方法

import boto3
import datetime

boto_session = boto3.Session(profile_name="hogehoge")

def get_consumed_capacity_total(table_name, metric_name):
    """
    DynamoDBの指定したテーブルから、CloudWatchに記録されている
    過去の消費キャパシティユニットを取得する例。
    metric_nameには 'ConsumedReadCapacityUnits' または
    'ConsumedWriteCapacityUnits' を指定する。
    """
    cloudwatch = boto_session.client("cloudwatch")

    # 取得期間を指定(ここでは過去1日)
    end_time = datetime.datetime.utcnow()
    start_time = end_time - datetime.timedelta(days=1)

    # 取得する粒度(Period)は秒単位
    # ここでは 60 秒(1分)刻みでの合計値を取得
    response = cloudwatch.get_metric_statistics(
        Namespace="AWS/DynamoDB",
        MetricName=metric_name,
        Dimensions=[
            {"Name": "TableName", "Value": table_name}
        ],
        StartTime=start_time,
        EndTime=end_time,
        Period=60,
        Statistics=["Sum"],  # "Average", "Minimum", "Maximum" なども指定可能
    )

    # 取得したデータポイントの "Sum" の値を合計
    total_capacity = sum(
        dp["Sum"] for dp in response["Datapoints"] if "Sum" in dp
    )
    return total_capacity

if __name__ == "__main__":
    table_name = "ExampleTable"

    # 読み取りキャパシティユニットの合計を取得
    total_read = get_consumed_capacity_total(
        table_name, 
        "ConsumedReadCapacityUnits", 
    )
    print(f"過去1日間の読み取りキャパシティユニット合計: {total_read}")

    # 書き込みキャパシティユニットの合計を取得
    total_write = get_consumed_capacity_total(
        table_name,
        "ConsumedWriteCapacityUnits",
    )
    print(f"過去1日間の書き込みキャパシティユニット合計: {total_write}")

結果

過去1日間の読み取りキャパシティユニット合計: 5.5
過去1日間の書き込みキャパシティユニット合計: 30.0

時系列データをプロットするなどすれば、必要なプロビジョンドキャパシティユニットを計算するときに便利だろう。CloudWatch Metricsの画面から直接見るのでも良さそう

所感

  • 思ったより簡単にキャパシティユニット取れるんだという発見。書き込み時に取れるのがOpenAIの消費トークン数みたいでわかりやすい


Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内

技術書コーナー

北海道の駅巡りコーナー


Add a Comment

メールアドレスが公開されることはありません。 が付いている欄は必須項目です