DynamoDBの消費したキャパシティユニットを取得する
Posted On 2025-01-23
DynamoDBにアイテムをPutItemする際、レスポンスのConsumedCapacityやCloudWatch Metricsから消費キャパシティユニットを取得する手順を紹介した。Terraformを使ったテーブル構築例も記載している。
目次
はじめに
- DynamoDBには読み書きのキャパシティユニットという概念があるが、書き込み時・または過去から消費したキャパシティユニットの数を取得できることを知ったので試してみた。SAP勉強してたら出てきた
- やり方は2通りある
- DynamoDBで
PutItem
をしたときに、レスポンスで返ってくるConsumedCapacity
→CapacityUnits
を取得する - DynamoDBはCloudWatch Metricsに
ConsumedReadCapacityUnits
やConsumedWriteCapacityUnits
という指標を自動登録しているので、その時系列データを取得する
- DynamoDBで
両方ためしてみる
DynamoDBのデプロイ
Terraformでデプロイする。Id、Question、Answerという3つのカラムを持つテーブル。Idをプライマリーキーにする
NextIdというのが入っているが、これはレコード追加時にできたもの。
resource "aws_dynamodb_table" "example_table" {
name = "ExampleTable"
billing_mode = "PAY_PER_REQUEST"
hash_key = "Id" # プライマリーキー
attribute {
name = "Id"
type = "N"
}
}
PutItem時の消費キャパシティユニットの取得
put_item
のレスポンスに対して、ConsumedCapacity -> CapacityUnits
を取得する
response = example_table.put_item(
Item={
'Id': generated_id, # ここも合わせて修正
'Question': question,
'Answer': answer
},
ReturnConsumedCapacity='TOTAL'
)
consumed_capacity = response.get('ConsumedCapacity', {}).get('CapacityUnits', 'N/A')
全体コード
全体コード。質問と回答はOpenAIのAPIで生成する。
import openai
import boto3
from botocore.exceptions import ClientError # ← 修正ポイント
# DynamoDBリソースの設定
boto_session = boto3.Session(profile_name="hogehoge")
dynamodb = boto_session.resource('dynamodb')
example_table = dynamodb.Table('ExampleTable')
def get_next_id():
"""
ExampleTable 内に (Id=0) のアイテムをカウンターとして利用し、
NextId をインクリメントして取得する。
もし (Id=0) のアイテムが存在しなくても ADD は自動で作成してくれる。
"""
try:
response = example_table.update_item(
Key={'Id': 0},
UpdateExpression='ADD NextId :incr',
ExpressionAttributeValues={':incr': 1},
ReturnValues='UPDATED_NEW'
)
return response['Attributes']['NextId']
except ClientError as e:
raise e
def run_openai_message(user_input):
# ChatGPT APIへのリクエスト
response = openai.chat.completions.create(
model="gpt-4o", # または最新のモデルを指定
messages=[
{"role": "user", "content": user_input},
],
temperature=0.7,
)
chat_response = response.choices[0].message.content.strip()
return chat_response
def truncate_text(text, length=20):
return text if len(text) <= length else text[:length] + "..."
def run():
# 1. IDの取得
try:
generated_id = get_next_id()
except Exception as e:
print(f"IDの取得に失敗しました: {e}")
return
# 2. 質問の生成
try:
question = run_openai_message(
"興味深い質問を1つ生成してください。")
except Exception as e:
print(f"質問の生成に失敗しました: {e}")
return
# 3. 回答の生成
try:
answer = run_openai_message(
f"以下の質問に対して詳しい回答を提供してください。\n質問: {question}")
except Exception as e:
print(f"回答の生成に失敗しました: {e}")
return
# 4. DynamoDBへの登録
try:
response = example_table.put_item(
Item={
'Id': generated_id, # ここも合わせて修正
'Question': question,
'Answer': answer
},
ReturnConsumedCapacity='TOTAL'
)
consumed_capacity = response.get('ConsumedCapacity', {}).get('CapacityUnits', 'N/A')
except Exception as e:
print(f"DynamoDBへの登録に失敗しました: {e}")
return
# 5. データの表示
print(f"ID: {generated_id}")
print(f"質問: {truncate_text(question)}")
print(f"回答: {truncate_text(answer)}")
print(f"消費キャパシティユニット: {consumed_capacity}")
print("="*20)
def main(n=5):
for i in range(n):
run()
if __name__ == "__main__":
main()
結果
書き込み時に消費したキャパシティユニットが返ってくる
ID: 3
質問: もし、時間旅行が可能になった場合、過去の...
回答: もし時間旅行が可能になった場合、訪れたい...
消費キャパシティユニット: 3.0
====================
ID: 4
質問: あなたの人生で最も影響を受けた出来事は何...
回答: 私自身はAIであり、個人的な経験や人生観...
消費キャパシティユニット: 3.0
====================
ID: 5
質問: 人間の意識はどのようにして物理的な脳の活...
回答: 人間の意識がどのようにして物理的な脳の活...
消費キャパシティユニット: 3.0
====================
ID: 6
質問: 確かに、こちらはいかがでしょうか:
「...
回答: 人間の感情を人工知能(AI)がどの程度ま...
消費キャパシティユニット: 3.0
====================
ID: 7
質問: もちろんです。以下の質問はいかがでしょう...
回答: これはとても興味深い質問ですね。時間旅行...
消費キャパシティユニット: 3.0
====================
CloudaWatch Metricsから取得する
DynamoDBはデフォルトでConsumedReadCapacityUnits
やConsumedWriteCapacityUnits
というCloudWatch Metricsを送ってるので、そこから時系列データを取ってきて取得する方法
import boto3
import datetime
boto_session = boto3.Session(profile_name="hogehoge")
def get_consumed_capacity_total(table_name, metric_name):
"""
DynamoDBの指定したテーブルから、CloudWatchに記録されている
過去の消費キャパシティユニットを取得する例。
metric_nameには 'ConsumedReadCapacityUnits' または
'ConsumedWriteCapacityUnits' を指定する。
"""
cloudwatch = boto_session.client("cloudwatch")
# 取得期間を指定(ここでは過去1日)
end_time = datetime.datetime.utcnow()
start_time = end_time - datetime.timedelta(days=1)
# 取得する粒度(Period)は秒単位
# ここでは 60 秒(1分)刻みでの合計値を取得
response = cloudwatch.get_metric_statistics(
Namespace="AWS/DynamoDB",
MetricName=metric_name,
Dimensions=[
{"Name": "TableName", "Value": table_name}
],
StartTime=start_time,
EndTime=end_time,
Period=60,
Statistics=["Sum"], # "Average", "Minimum", "Maximum" なども指定可能
)
# 取得したデータポイントの "Sum" の値を合計
total_capacity = sum(
dp["Sum"] for dp in response["Datapoints"] if "Sum" in dp
)
return total_capacity
if __name__ == "__main__":
table_name = "ExampleTable"
# 読み取りキャパシティユニットの合計を取得
total_read = get_consumed_capacity_total(
table_name,
"ConsumedReadCapacityUnits",
)
print(f"過去1日間の読み取りキャパシティユニット合計: {total_read}")
# 書き込みキャパシティユニットの合計を取得
total_write = get_consumed_capacity_total(
table_name,
"ConsumedWriteCapacityUnits",
)
print(f"過去1日間の書き込みキャパシティユニット合計: {total_write}")
結果
過去1日間の読み取りキャパシティユニット合計: 5.5
過去1日間の書き込みキャパシティユニット合計: 30.0
時系列データをプロットするなどすれば、必要なプロビジョンドキャパシティユニットを計算するときに便利だろう。CloudWatch Metricsの画面から直接見るのでも良さそう
所感
- 思ったより簡単にキャパシティユニット取れるんだという発見。書き込み時に取れるのがOpenAIの消費トークン数みたいでわかりやすい
Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内
技術書コーナー
北海道の駅巡りコーナー