こしあん
2025-01-06

AppSyncによるGraphQLのAPIを試す


6{icon} {views}

TerraformでAppSyncを構築し、HTTPSとWebSocketを組み合わせてリアルタイム通信ができるGraphQL APIを作成する手順をまとめてみました。Pythonからミューテーションとサブスクリプションを行い、DynamoDBをデータソースにして簡単なリアルタイム連携を確認しました。

はじめに

  • App SyncのGraphQLのAPIを使うと、WebSocketのAPIを使って、クライアントアプリケーションに対してリアルタイムでイベントを送れるので試してみた
  • WebSocketのAPIというと、API Gatewayがあるが、実はこれ以外にも選択肢があり、App Syncのほうがフルマネージド状態。
  • GraphQLはなんだかよくわからない状態で、ほぼChatGPTに聞いた答えを丸投げしたもの。動いたけどよくわからないのでここは今後必要になったら勉強する
  • 動かした部分は以下の通り
    • TerraformによるApp Syncのデプロイ
    • ローカルからのミューテーションの送信
    • ミューテーションをローカルからサブスクライブし、それをクライアント化して、コンソールベースでコールバックを受け取りコンソール表示
  • まだやっていない部分
    • 受け取ったメッセージをUIに反映させる(理由:非同期によるUI反映が面倒で、ここのテストに時間かかりそうだったので割愛)
    • GraphQL部分の理解やカスタマイズ

AppSyncとは

リアルタイムチャット機能がユースケースにある。AWSの漫画あるんでこれ見るのが良さそう

AWSマンガ第 4 話:リアルタイムのチャット機能を実装しろ! ( 1 / 4 )

簡単にいうと、GraphQLを使ってAPIの設計を柔軟にできる、クライアントへのリアルタイムイベント配信もコミコミでできるAWSのサービス。

BlackBeltの資料より

私が触ってみた感想だと

  • API GatewayだとWebSocketのAPIしかできないが、AppSyncだとHTTPSのエンドポイントとWebSocketのエンドポイントの両方ができる
  • データの投稿はHTTPSで、クライアントはWebSocketでエンドポイントをサブスクライブして、投稿されたデータをリアルタイムでキャッチするということがAppSyncだと一発でできる。API Gatewayだともっといろいろ手数が増える
  • AppSyncは裏におくデータソースはいろいろ選べて、今回はDynamoDBで試したが、それの連携もAppSync側でできる

AppSyncをデプロイする

GraphQLとTerraform

以下のTerraformのコードでできた。比較的行数は少なめ。データソースのDynamoDBはコード中でAppSyncとは別に作成している。

入力変数は以下の通り。

variable "appsync_api_name" {
  description = "AppSync APIの名前"
  type        = string
  default     = "MyAppSyncAPI"
}

variable "dynamodb_table_name" {
  description = "DynamoDBテーブルの名前"
  type        = string
  default     = "AppSyncDynamoDBTable"
}

variable "api_key_description" {
  description = "APIキーの説明"
  type        = string
  default     = "AppSync API Key"
}

variable "schema_file" {
  description = "GraphQLスキーマファイルのパス"
  type        = string
  default     = "schema.graphql"
}

メインのコードは以下の通り。

# DynamoDBテーブルの作成
resource "aws_dynamodb_table" "appsync_table" {
  name           = var.dynamodb_table_name
  billing_mode   = "PAY_PER_REQUEST"
  hash_key       = "id"
  attribute {
    name = "id"
    type = "S"
  }
}

# AppSync GraphQL APIの作成
resource "aws_appsync_graphql_api" "appsync_api" {
  name          = var.appsync_api_name
  authentication_type = "API_KEY"

  schema = file(var.schema_file)
}

resource "aws_appsync_api_key" "example_api_key" {
  api_id  = aws_appsync_graphql_api.appsync_api.id
  expires = timeadd(timestamp(), "8760h") # 365日後
}

# IAMロールとポリシーの設定
resource "aws_iam_role" "appsync_dynamodb_role" {
  name = "AppSyncDynamoDBRole"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [{
      Effect = "Allow",
      Principal = {
        Service = "appsync.amazonaws.com"
      },
      Action = "sts:AssumeRole"
    }]
  })
}

resource "aws_iam_policy" "appsync_dynamodb_policy" {
  name        = "AppSyncDynamoDBPolicy"
  description = "AppSyncがDynamoDBにアクセスするためのポリシー"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [{
      Effect = "Allow",
      Action = [
        "dynamodb:Query",
        "dynamodb:Scan",
        "dynamodb:GetItem",
        "dynamodb:PutItem",
        "dynamodb:UpdateItem",
        "dynamodb:DeleteItem"
      ],
      Resource = aws_dynamodb_table.appsync_table.arn
    }]
  })
}

resource "aws_iam_role_policy_attachment" "appsync_dynamodb_attachment" {
  role       = aws_iam_role.appsync_dynamodb_role.name
  policy_arn = aws_iam_policy.appsync_dynamodb_policy.arn
}

# AppSyncのデータソース設定
resource "aws_appsync_datasource" "dynamodb_datasource" {
  api_id = aws_appsync_graphql_api.appsync_api.id
  name   = "DynamoDBDataSource"
  type   = "AMAZON_DYNAMODB"

  dynamodb_config {
    table_name = aws_dynamodb_table.appsync_table.name
  }

  service_role_arn = aws_iam_role.appsync_dynamodb_role.arn
}

# リゾルバーの設定
# Query.getTodos リゾルバー
resource "aws_appsync_resolver" "get_todos_resolver" {
  api_id      = aws_appsync_graphql_api.appsync_api.id
  type        = "Query"
  field       = "getTodos"
  data_source = aws_appsync_datasource.dynamodb_datasource.name

  request_template = <<EOF
{
  "version" : "2017-02-28",
  "operation" : "Scan"
}
EOF

  response_template = <<EOF
$util.toJson($ctx.result.items)
EOF
}

# Mutation.addTodo リゾルバー
resource "aws_appsync_resolver" "add_todo_resolver" {
  api_id      = aws_appsync_graphql_api.appsync_api.id
  type        = "Mutation"
  field       = "addTodo"
  data_source = aws_appsync_datasource.dynamodb_datasource.name

  request_template = <<EOF
{
  "version" : "2017-02-28",
  "operation" : "PutItem",
  "key": {
    "id": { "S": "$util.autoId()" }
  },
  "attributeValues": {
    "title": { "S": "$ctx.args.title" },
    "completed": { "BOOL": false }
  }
}
EOF

  response_template = <<EOF
$util.toJson($ctx.result)
EOF
}

# 出力
output "appsync_api_id" {
  description = "AppSync APIのID"
  value       = aws_appsync_graphql_api.appsync_api.id
}

output "appsync_api_url" {
  description = "AppSync APIのGraphQLエンドポイントURL"
  value       = aws_appsync_graphql_api.appsync_api.uris
}

output "appsync_api_key" {
  description = "AppSync APIキー"
  value       = aws_appsync_api_key.example_api_key.key
  sensitive   = true
}

output "dynamodb_table_name" {
  description = "作成されたDynamoDBテーブルの名前"
  value       = aws_dynamodb_table.appsync_table.name
}

スキーマファイルを以下のように定義する。パスは(var.scheme_file)に格納。ここではクエリ、ミューテーション、サブスクリプションの3つを定義している。

type Todo {
  id: ID!
  title: String!
  completed: Boolean!
}

type Query {
  getTodos: [Todo]
}

type Mutation {
  addTodo(title: String!): Todo
}

type Subscription {
  onAddTodo: Todo
    @aws_subscribe(mutations: ["addTodo"])
}

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription
}

terraform applyすると次のように出てくるはずだ。このHTTPSのURLもWSSのURLも後ほど両方使う

appsync_api_id = "sx7f7ozgxve4xhhkjbm4ofdnce"
appsync_api_key = <sensitive>
appsync_api_url = tomap({
  "GRAPHQL" = "https://lgkixskxi5ftvhluhrai7axzri.appsync-api.ap-northeast-1.amazonaws.com/graphql"
  "REALTIME" = "wss://lgkixskxi5ftvhluhrai7axzri.appsync-realtime-api.ap-northeast-1.amazonaws.com/graphql"
})
dynamodb_table_name = "AppSyncDynamoDBTable"

APIキーはどこにあるか

マネジメントコンソール→AWS AppSync→デプロイしたAPI→設定

認証情報をローカルに配置

作業ディレクトリにcredential.pyというファイルを作成する。これは最低限のテスト用で本番だったらもっとセキュアな実装を行うこと

API_KEY = "da2-pu4o******"
HTTPS_ENDPOINT = "https://lgkixskxi5ftvhluhrai7axzri.appsync-api.ap-northeast-1.amazonaws.com/graphql"
WSS_ENDPOINT = "wss://lgkixskxi5ftvhluhrai7axzri.appsync-realtime-api.ap-northeast-1.amazonaws.com/graphql"
HOST = "lgkixskxi5ftvhluhrai7axzri.appsync-api.ap-northeast-1.amazonaws.com"

ホストはHTTPSのエンドポイントのホスト(appsync-api)を使う。

PythonからHTTPSのリクエストを送る

python-graphql-clientをインストールする。更新が2018年と古いため、もしかしたら別のライブラリのほうがいいかもしれない。

pip install graphqlclient

アイテムの送信(ミューテーション)と、現在のアイテムの一覧(クエリ)をローカルのPythonから行う。

from graphqlclient import GraphQLClient
import json
from credentials import HTTPS_ENDPOINT, API_KEY

# AppSyncクライアントの作成
client = GraphQLClient(HTTPS_ENDPOINT)
client.inject_token(API_KEY, "X-Api-Key")

################################################
# 1. Todoを追加する (mutation)
################################################
def run_mutation(todo_item="Hello World"):
    mutation = """
    mutation AddTodo($title: String!) {
      addTodo(title: $title) {
        id
        title
        completed
      }
    }
    """
    variables = {
        "title": todo_item
    }
    # GraphQLミューテーションを実行
    mutation_result = client.execute(mutation, variables=variables)
    mutation_data = json.loads(mutation_result)
    print("mutation result =", mutation_data)

################################################
# 2. Todoを全て取得する (query)
################################################
def run_query():
    query = """
    query GetTodos {
      getTodos {
        id
        title
        completed
      }
    }
    """
    # GraphQLクエリを実行
    query_result = client.execute(query)
    query_data = json.loads(query_result)
    print("query result =", query_data)

ミューテーションを実行

run_mutation()を実行すると、以下のように出力される。

mutation result = {'data': {'addTodo': {'id': '3a374715-6d23-4922-ab0f-a17f9f3f0098', 'title': 'Hello World', 'completed': False}}}

クエリを実行

run_query()を実行すると、以下のように出力される。

query result = {'data': {'getTodos': [{'id': '3a374715-6d23-4922-ab0f-a17f9f3f0098', 'title': 'Hello World', 'completed': False}]}}

ローカルからサブスクライブする

ミューテーションが発生したときにイベントがローカル(クライアント)から受信できるように実装する。後々の拡張性を考えてAppSyncのクライアントクラス(AppSyncSubscriptionClient)を作っておくのが良いだろう。

参考情報

この記事が非常に参考になった。これをGPTにリファインさせたものが以下のコード。

AppSyncのSubscriptionをPythonで動作確認

サブスクリプションクライアント

AppSyncのサブスクリプションクライアントをAppSyncSubscriptionClientとして定義する。作業ディレクトリにsubscription_client.pyを作成する。

import websocket
import json
import base64
import uuid
import threading
from typing import Callable

class AppSyncSubscriptionClient:
    """
    AWS AppSyncのSubscriptionをWebSocketで受け取るクライアントクラス
    """

    def __init__(self, wss_api_url: str, host: str, api_key: str, query: str,
                 message_handler: Callable[[str], None] = None):
        """
        :param wss_api_url: AppSyncのWebSocketエンドポイント (例: wss://xxxxxxxx.appsync-realtime-api.ap-northeast-1.amazonaws.com/graphql)
        :param host: WebSocketエンドポイントのホスト(例:xxxxxxxx.appsync-api.ap-northeast-1.amazonaws.com)
        :param api_key: AppSyncのAPIキー
        :message_handler: メッセージを受信したときに呼ばれるコールバックハンドラー
        """
        self.wss_url = wss_api_url
        self.host = host
        self.api_key = api_key
        self.subscription_query = query

        # 登録するWebSocketAppオブジェクト
        self.ws = None

        # 受信データに対するコールバック
        self.message_handler = message_handler

    def _build_connection_url(self) -> str:
        """
        サブスクリプション開始用のコネクションURLを生成
        """
        # ヘッダー情報作成
        header_dict = {
            'host': self.host,
            'x-api-key': self.api_key
        }

        # dict を文字列に変換 → Base64 エンコード
        header_json = json.dumps(header_dict)
        header_b64 = base64.b64encode(header_json.encode("utf-8")).decode("utf-8")

        # Payloadは空のJSON {} をBase64エンコードした e30= を指定
        payload = "e30="

        # 最終的なWSS接続用URL
        connection_url = f"{self.wss_url}?header={header_b64}&payload={payload}"
        return connection_url

    def _build_subscription_message(self, sub_id: str) -> str:
        """
        GraphQLのSubscription開始用メッセージを生成
        """
        # サブスクリプションクエリ全体
        payload_data = json.dumps({
            "query": self.subscription_query,
            "variables": {}
        })

        # WebSocketで送信するJSON
        register = {
            'id': sub_id,
            'payload': {
                'data': payload_data,
                'extensions': {
                    'authorization': {
                        'host': self.host,
                        'x-api-key': self.api_key,
                    }
                }
            },
            'type': 'start'
        }
        return json.dumps(register)

    def on_open(self, ws):
        """
        WebSocket接続開始時のコールバック
        """
        print("WebSocket connection opened")
        sub_id = str(uuid.uuid4())  # ランダムなサブスクリプションIDを生成
        subscription_message = self._build_subscription_message(sub_id)
        ws.send(subscription_message)

    def on_message(self, ws, message):
        """
        WebSocketでメッセージ受信時のコールバック
        """
        print("[MESSAGE]", message)
        if self.message_handler:
            self.message_handler(message)

    def on_error(self, ws, error):
        """
        WebSocketでエラー発生時のコールバック
        """
        print("[ERROR]", error)

    def on_close(self, ws, close_status_code, close_msg):
        """
        WebSocket切断時のコールバック
        """
        print("[CLOSE]", close_status_code, close_msg)

    def run_forever(self):
        """
        WebSocketAppのメインループを動かす
        (別スレッドで実行することを想定)
        """
        connection_url = self._build_connection_url()
        print("Connecting to:", connection_url)

        self.ws = websocket.WebSocketApp(
            connection_url,
            subprotocols=["graphql-ws"],
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )

        # WebSocketのイベントループ開始 (ブロッキング)
        self.ws.run_forever()

    def run_async(self):
        """
        非同期(別スレッド)でWebSocket接続を開始し、サブスクリプションを実行。
        UIなどメインスレッドをブロックせずに動作させる。
        """
        thread = threading.Thread(target=self.run_forever, daemon=True)
        thread.start()

非同期周りの実装がもうちょっと工夫の余地があるかもしれない。実装としては、引数にエンドポイントをいろいろ入れて、メッセージがきたときのカスタム実装をmessage_handlerに入れて、任意のイベントハンドラを実行するというもの

サブスクリプションクライアントを使ってみる

定義したサブスクリプションクライアントを使ってみる。アプリケーション側の実装は以下の通り。

import json
from credentials import API_KEY, WSS_ENDPOINT, HOST
from subscription_client import AppSyncSubscriptionClient

# サブスクライブしたいGraphQLクエリ
subscription_query = """
subscription MySubscription {
    onAddTodo {
    id
    title
    completed
    }
}""".strip()

def ui_hook(message):
    json_message = json.loads(message)
    if json_message.get("type") == "data":
        print("[data arrived]", message)

# クライアントインスタンス作成
client = AppSyncSubscriptionClient(
    wss_api_url=WSS_ENDPOINT,
    host=HOST,
    api_key=API_KEY,
    query=subscription_query,
    message_handler=ui_hook
)

# 実行(UIへの反映が必要なときはrun_asynckにする)
client.run_forever()

プログラムが動いたら、先程の「ミューテーションの送信」のプログラムを実行する。

WebSocket connection opened
[MESSAGE] {"id":"872cb146-370d-45e5-9ddd-e327106568b2","type":"start_ack"}
[MESSAGE] {"id":"872cb146-370d-45e5-9ddd-e327106568b2","type":"data","payload":{"data":{"onAddTodo":{"id":"b16018f6-ef79-4242-b407-05307193637d","title":"Hello World","completed":false}}}}
[data arrived] {"id":"872cb146-370d-45e5-9ddd-e327106568b2","type":"data","payload":{"data":{"onAddTodo":{"id":"b16018f6-ef79-4242-b407-05307193637d","title":"Hello World","completed":false}}}}
[MESSAGE] {"type":"ka"}
[MESSAGE] {"type":"ka"}

このような出力がされるはずだ。「data arrived」の部分がコールバックによって処理されたメッセージ。あとはUIへのつなぎこみをすればリアルタイムの反映が可能。

「type:ka」と書かれているのはKeep Aliveだろう。

所感

  • AppSync特有の情報の少なさやChatGPTのハルシネーションがネックだが、一度つなげてしまえばかなり便利そう
  • HTTPSとWebSocketのエンドポイントが両方同時にできてしまうのがすごい
  • GraphQL特有の学習コストはあるが、APIの柔軟性は上がりそうなので慣れてもいいかも?
  • これ一個で完結するユースケース結構ありそう


Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内

技術書コーナー

北海道の駅巡りコーナー


Add a Comment

メールアドレスが公開されることはありません。 が付いている欄は必須項目です