ElastiCache RedisでEC2からPub/Subを試す(Redis OSS)
Posted On 2025-02-16
EC2からTerraformで構築したRedisにPub/Subを行い、ミリ秒レベルの遅延を検証した。ValkeyではなくOSS版Redisを用いつつ、コードサンプルを通じてシンプルかつ高速なメッセージブローカーの構築手順を紹介する。
目次
はじめに
- ElastiCacheのRedisをちゃんと使ったことなかったので試してみた
- ここでは高速なPub/Subのメッセージブローカーとしての扱いで試してみる。ただのPub/SubならSQSでできるが、ミリ秒レベルの(アプリケーションに影響が出ないレベルの)レイテンシーを期待していて、このレベルのレイテンシーはSQSだとできない
- Redis自体はミニマムな構成
- 2024年に出たElastiCacheのValkey(Redisのフォーク)ではなく、古いRedis OSS版の開始方法。Valkeyのほうが安く、AWSもこっちを推奨しているが今回は昔のやり方で行う。
アーキテクチャー図
超ミニマムな構成。EC2にはSession Managerで接続し、ElastiCacheに対してEC2がPub/Subする。
Terraformのコードは末尾を参照
EC2内でのPub/Sub
- シェルを2つ立てる。Publish側のコードと、Subscribe側のコードを別々の処理で動かす。
- これはメッセージブローカーとして動かす場合を想定しており、非同期処理でよくあるケース
- 動かすコードは、Publish側がRedisに対して1秒おきに現在のタイムスタンプを送る。ここは長さ10のキューの想定で、
ltrim
で古いアイテムを削除するようにしている
コード
前提としてredis-pyが必要。以下でインストールする。
pip install redispy
Publish側、Subscribe側はそれぞれ以下のコマンドで起動する。ここで--host
はTerraformのoutputの値を使う。
python3 pub_sub.py pub --host my-redis-rg.****.ng.0001.apne1.cache.amazonaws.com
python3 pub_sub.py sub --host my-redis-rg.****.ng.0001.apne1.cache.amazonaws.com
以下のコードをpub_sub.py
とする。
#!/usr/bin/env python3
import argparse
import time
import redis
def publisher(redis_host, redis_port):
"""
Publisher:
- 毎秒、ローカルのエポックタイム(秒)を文字列としてRedisの "queue" リストに追加(RPUSH)
- リスト長が10を超えないようにLTRIMで最新10件に維持
"""
r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
while True:
# 現在のローカル時刻(エポック秒)
current_time = time.time()
# 送信(文字列として保存)
r.rpush("queue", str(current_time))
r.ltrim("queue", -10, -1)
print(f"Published timestamp: {current_time}")
time.sleep(1)
def subscriber(redis_host, redis_port):
"""
Subscriber:
- BLPOPで待機し、"queue" からタイムスタンプ文字列を取得
- 取得したタイムスタンプと現在の時刻を比較し、ラグ(秒)を表示
"""
r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
print("Subscribed. Waiting for messages from 'queue' ...")
while True:
item = r.blpop("queue", timeout=0)
if item:
key, value = item
try:
sent_time = float(value)
current_time = time.time()
lag = current_time - sent_time
print(f"Received timestamp: {value} | Lag: {lag:.6f} seconds")
except ValueError:
print(f"Received non-timestamp value: {value}")
def main():
parser = argparse.ArgumentParser(description="Redisを使ったPub/Subサンプル(タイムスタンプでラグ計測)")
parser.add_argument("role", choices=["pub", "sub"], help="実行ロール: pub(Publisher) または sub(Subscriber)")
parser.add_argument("--host", default="localhost", help="Redisサーバのホスト名")
parser.add_argument("--port", type=int, default=6379, help="Redisサーバのポート番号")
args = parser.parse_args()
if args.role == "pub":
publisher(args.host, args.port)
else:
subscriber(args.host, args.port)
if __name__ == "__main__":
main()
結果
Publish側。順当に送られている。
Subscribe側。最初の方はキューに溜まっているあるアイテムがあるので10秒ぐらい遅延があるが、これが消化されるとかなり遅延は低い。0.6ms~2msぐらい。同一サブネットならさすがに速いのと、思ったよりRedisのボトルネックが短い。
Terraformのコード
特に難しいところはないと思われる。ElastiCacheのパラメーターグループファミリーがdefault.redis7
ならクラスタモードなし(1ノード)で、default.redis7.cluster.on
ならクラスタモードがON(複数ノード)。この例はミニマムな例なのでクラスタモードは入れていない。
# Amazon Linux 2023
data "aws_ami" "amazon_linux" {
most_recent = true
owners = ["137112412989"] # AmazonのAMI所有者ID
filter {
name = "name"
# Amazon Linux 2023 AMIの名前パターン。minimumを除外する
values = ["al2023-ami-2023*-kernel-*-x86_64"]
}
filter {
name = "architecture"
values = ["x86_64"]
}
filter {
name = "virtualization-type"
values = ["hvm"]
}
}
# Redis用サブネットグループ(プライベートサブネットを利用)
resource "aws_elasticache_subnet_group" "redis_subnet_group" {
name = "redis-subnet-group"
subnet_ids = [var.private_subnet_id]
}
# EC2用セキュリティグループ
resource "aws_security_group" "ec2_sg" {
name = "ec2-sg"
description = "Security group for EC2 instance"
vpc_id = var.vpc_id
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}
}
# Redis用セキュリティグループ
resource "aws_security_group" "redis_sg" {
name = "redis-sg"
description = "Security group for Redis"
vpc_id = var.vpc_id
# EC2インスタンスから6379ポートへのアクセスを許可
ingress {
from_port = 6379
to_port = 6379
protocol = "tcp"
security_groups = [aws_security_group.ec2_sg.id]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
# Redisクラスタ(シングルノードのレプリケーショングループ)
resource "aws_elasticache_replication_group" "redis" {
replication_group_id = "my-redis-rg"
description = "Redis replication group"
engine = "redis"
engine_version = "7.1"
parameter_group_name = "default.redis7" # クラスタモードを無効
node_type = "cache.t4g.micro"
num_cache_clusters = 1
subnet_group_name = aws_elasticache_subnet_group.redis_subnet_group.name
security_group_ids = [aws_security_group.redis_sg.id]
automatic_failover_enabled = false
port = 6379
}
# IAMロール(EC2にSession Manager用の権限を付与)
resource "aws_iam_role" "ec2_ssm_role" {
name = "ec2-ssm-role"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "ec2.amazonaws.com"
}
}]
})
}
resource "aws_iam_role_policy_attachment" "ssm_policy" {
role = aws_iam_role.ec2_ssm_role.name
policy_arn = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
}
resource "aws_iam_instance_profile" "ec2_instance_profile" {
name = "ec2-instance-profile"
role = aws_iam_role.ec2_ssm_role.name
}
# EC2インスタンス作成(プライベートサブネット内)
resource "aws_instance" "ec2" {
ami = data.aws_ami.amazon_linux.id
instance_type = "t3.small"
subnet_id = var.private_subnet_id
iam_instance_profile = aws_iam_instance_profile.ec2_instance_profile.name
vpc_security_group_ids = [aws_security_group.ec2_sg.id]
tags = {
Name = "RedisPubSubInstance"
}
}
output "redis_primary_endpoint_address" {
description = "Redisクラスタのプライマリ接続エンドポイントのDNS名"
value = aws_elasticache_replication_group.redis.primary_endpoint_address
}
所感
- ElastiCacheのRedisのチュートリアル的なのをやりたかったけど割と簡単だった
- 他にはValkeyやServerlessモードがあるので今後試してみたい
Shikoan's ML Blogの中の人が運営しているサークル「じゅ~しぃ~すくりぷと」の本のご案内
技術書コーナー
北海道の駅巡りコーナー