こしあん
2025-02-16

ElastiCache RedisでEC2からPub/Subを試す(Redis OSS)


40{icon} {views}


EC2からTerraformで構築したRedisにPub/Subを行い、ミリ秒レベルの遅延を検証した。ValkeyではなくOSS版Redisを用いつつ、コードサンプルを通じてシンプルかつ高速なメッセージブローカーの構築手順を紹介する。

はじめに

  • ElastiCacheのRedisをちゃんと使ったことなかったので試してみた
  • ここでは高速なPub/Subのメッセージブローカーとしての扱いで試してみる。ただのPub/SubならSQSでできるが、ミリ秒レベルの(アプリケーションに影響が出ないレベルの)レイテンシーを期待していて、このレベルのレイテンシーはSQSだとできない
  • Redis自体はミニマムな構成
  • 2024年に出たElastiCacheのValkey(Redisのフォーク)ではなく、古いRedis OSS版の開始方法。Valkeyのほうが安く、AWSもこっちを推奨しているが今回は昔のやり方で行う

アーキテクチャー図

超ミニマムな構成。EC2にはSession Managerで接続し、ElastiCacheに対してEC2がPub/Subする。

Terraformのコードは末尾を参照

EC2内でのPub/Sub

  • シェルを2つ立てる。Publish側のコードと、Subscribe側のコードを別々の処理で動かす。
  • これはメッセージブローカーとして動かす場合を想定しており、非同期処理でよくあるケース
  • 動かすコードは、Publish側がRedisに対して1秒おきに現在のタイムスタンプを送る。ここは長さ10のキューの想定で、ltrimで古いアイテムを削除するようにしている

コード

前提としてredis-pyが必要。以下でインストールする。

pip install redispy

Publish側、Subscribe側はそれぞれ以下のコマンドで起動する。ここで--hostはTerraformのoutputの値を使う。

python3 pub_sub.py pub --host my-redis-rg.****.ng.0001.apne1.cache.amazonaws.com

python3 pub_sub.py sub --host my-redis-rg.****.ng.0001.apne1.cache.amazonaws.com

以下のコードをpub_sub.pyとする。

#!/usr/bin/env python3
import argparse
import time
import redis

def publisher(redis_host, redis_port):
    """
    Publisher:
    - 毎秒、ローカルのエポックタイム(秒)を文字列としてRedisの "queue" リストに追加(RPUSH)
    - リスト長が10を超えないようにLTRIMで最新10件に維持
    """
    r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
    while True:
        # 現在のローカル時刻(エポック秒)
        current_time = time.time()
        # 送信(文字列として保存)
        r.rpush("queue", str(current_time))
        r.ltrim("queue", -10, -1)
        print(f"Published timestamp: {current_time}")
        time.sleep(1)

def subscriber(redis_host, redis_port):
    """
    Subscriber:
    - BLPOPで待機し、"queue" からタイムスタンプ文字列を取得
    - 取得したタイムスタンプと現在の時刻を比較し、ラグ(秒)を表示
    """
    r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
    print("Subscribed. Waiting for messages from 'queue' ...")
    while True:
        item = r.blpop("queue", timeout=0)
        if item:
            key, value = item
            try:
                sent_time = float(value)
                current_time = time.time()
                lag = current_time - sent_time
                print(f"Received timestamp: {value} | Lag: {lag:.6f} seconds")
            except ValueError:
                print(f"Received non-timestamp value: {value}")

def main():
    parser = argparse.ArgumentParser(description="Redisを使ったPub/Subサンプル(タイムスタンプでラグ計測)")
    parser.add_argument("role", choices=["pub", "sub"], help="実行ロール: pub(Publisher) または sub(Subscriber)")
    parser.add_argument("--host", default="localhost", help="Redisサーバのホスト名")
    parser.add_argument("--port", type=int, default=6379, help="Redisサーバのポート番号")
    args = parser.parse_args()

    if args.role == "pub":
        publisher(args.host, args.port)
    else:
        subscriber(args.host, args.port)

if __name__ == "__main__":
    main()

結果

Publish側。順当に送られている。

Subscribe側。最初の方はキューに溜まっているあるアイテムがあるので10秒ぐらい遅延があるが、これが消化されるとかなり遅延は低い。0.6ms~2msぐらい。同一サブネットならさすがに速いのと、思ったよりRedisのボトルネックが短い。

Terraformのコード

特に難しいところはないと思われる。ElastiCacheのパラメーターグループファミリーがdefault.redis7ならクラスタモードなし(1ノード)で、default.redis7.cluster.onならクラスタモードがON(複数ノード)。この例はミニマムな例なのでクラスタモードは入れていない。

# Amazon Linux 2023
data "aws_ami" "amazon_linux" {
  most_recent = true
  owners      = ["137112412989"] # AmazonのAMI所有者ID

  filter {
    name   = "name"
    # Amazon Linux 2023 AMIの名前パターン。minimumを除外する
    values = ["al2023-ami-2023*-kernel-*-x86_64"]
  }

  filter {
    name   = "architecture"
    values = ["x86_64"]
  }

  filter {
    name   = "virtualization-type"
    values = ["hvm"]
  }
}

# Redis用サブネットグループ(プライベートサブネットを利用)
resource "aws_elasticache_subnet_group" "redis_subnet_group" {
  name       = "redis-subnet-group"
  subnet_ids = [var.private_subnet_id]
}

# EC2用セキュリティグループ
resource "aws_security_group" "ec2_sg" {
  name        = "ec2-sg"
  description = "Security group for EC2 instance"
  vpc_id      = var.vpc_id

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
    ipv6_cidr_blocks = ["::/0"]
  }
}

# Redis用セキュリティグループ
resource "aws_security_group" "redis_sg" {
  name        = "redis-sg"
  description = "Security group for Redis"
  vpc_id      = var.vpc_id

  # EC2インスタンスから6379ポートへのアクセスを許可
  ingress {
    from_port       = 6379
    to_port         = 6379
    protocol        = "tcp"
    security_groups = [aws_security_group.ec2_sg.id]
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

# Redisクラスタ(シングルノードのレプリケーショングループ)
resource "aws_elasticache_replication_group" "redis" {
  replication_group_id          = "my-redis-rg"
  description                   = "Redis replication group"
  engine                        = "redis"
  engine_version                = "7.1"
  parameter_group_name          = "default.redis7" # クラスタモードを無効
  node_type                     = "cache.t4g.micro"
  num_cache_clusters            = 1
  subnet_group_name             = aws_elasticache_subnet_group.redis_subnet_group.name
  security_group_ids            = [aws_security_group.redis_sg.id]
  automatic_failover_enabled    = false
  port                          = 6379
}

# IAMロール(EC2にSession Manager用の権限を付与)
resource "aws_iam_role" "ec2_ssm_role" {
  name = "ec2-ssm-role"
  assume_role_policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [{
      Action    = "sts:AssumeRole",
      Effect    = "Allow",
      Principal = {
        Service = "ec2.amazonaws.com"
      }
    }]
  })
}

resource "aws_iam_role_policy_attachment" "ssm_policy" {
  role       = aws_iam_role.ec2_ssm_role.name
  policy_arn = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
}

resource "aws_iam_instance_profile" "ec2_instance_profile" {
  name = "ec2-instance-profile"
  role = aws_iam_role.ec2_ssm_role.name
}

# EC2インスタンス作成(プライベートサブネット内)
resource "aws_instance" "ec2" {
  ami                    = data.aws_ami.amazon_linux.id
  instance_type          = "t3.small"
  subnet_id              = var.private_subnet_id
  iam_instance_profile   = aws_iam_instance_profile.ec2_instance_profile.name
  vpc_security_group_ids = [aws_security_group.ec2_sg.id]

  tags = {
    Name = "RedisPubSubInstance"
  }
}

output "redis_primary_endpoint_address" {
  description = "Redisクラスタのプライマリ接続エンドポイントのDNS名"
  value       = aws_elasticache_replication_group.redis.primary_endpoint_address
}

所感

  • ElastiCacheのRedisのチュートリアル的なのをやりたかったけど割と簡単だった
  • 他にはValkeyやServerlessモードがあるので今後試してみたい


Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内

技術書コーナー

北海道の駅巡りコーナー


Add a Comment

メールアドレスが公開されることはありません。 が付いている欄は必須項目です