この記事では、前回に引き続き、GoでGraphQLのSubscriptionsを実装したサーバを作って行きます。
前回の記事はこちら。
ただし、今回はRedisを使い、サーバ複数台の構成で実行出来るように修正します。
コードはGitHubに公開しています。
今回は前回作ったサンプルを修正してきます。
ただし、前回の記事もほとんどがgqlgen
で初期化したひな形を少し書き換えているだけです。
docker-compose.yaml
を除き、新しいファイルの追加などは行いません。そのため、gqlgen
の構造を知っている方であれば、前回の記事を読む必要はありません。
準備
まずはRedisサーバを準備する必要があります。
今回はdockerを使って用意しましょう。
以下のようなdocker-compose.yaml
を./
に作成します。
version: "3.9" services: redis: image: redis:6.2.2-alpine ports: - "6379:6379"
全体として、ディレクトリ構造はこうなります。
. ├── README.md ├── docker-compose.yaml <---------- 新規作成 ├── go.mod ├── go.sum ├── gqlgen.yml ├── graph │ ├── generated │ │ └── generated.go │ ├── model │ │ └── models_gen.go │ ├── resolver.go │ ├── schema.graphqls │ └── schema.resolvers.go └── server.go 3 directories, 11 files
あとはdocker compose up
コマンドでRedisを起動しておきます。
※PCにインストールしているDockerのバージョンが古い場合は、代わりにdocker-compose up
を実行してください。
実装
今回は前回に引き続きgqlgen
を使います。
Redisのクライアントにはgo-redis/redisを使います。
このようなGraphQLのsubscriptionとRedisのPub/Subを組み合わせて実装していきます。
(再掲)スキーマの定義
前回から見ている場合は修正の必要はありません。
参考としてもう一度掲載しておきます。
scalar Time type Message { id: String! user: String! createdAt: Time! text: String! } type Mutation { postMessage(user: String!, text: String!): Message } type Query { messages: [Message!]! } type Subscription { messagePosted(user: String!): Message! }
resolver.go
./graph/resolver.go
を修正していきます。
まずは、Resolver
構造体の修正です。
ここには、Redisクライアントのオブジェクト、およびPub/Sub
のオブジェクトを格納できるように追加します。
type Resolver struct { redisClient *redis.Client redisPubSub *redis.PubSub subscribers map[string]chan<- *model.Message mutex sync.Mutex }
続いて、Resolverを初期化するためのNewResolver()
関数を作成します。
ここでは、Redisクライアントの初期化だけではなく、RedisのPub
により発行されたデータを処理するゴルーチンも起動します。
const redisPostMessagesSubscription = "messages" const redisKeyMessages = "messages" func NewResolver(ctx context.Context) *Resolver { redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 0, }) // messagesチャンネルを購読 pubsub := redisClient.Subscribe(ctx, redisPostMessagesSubscription) resolver := &Resolver{ redisClient: redisClient, redisPubSub: pubsub, subscribers: map[string]chan<- *model.Message{}, mutex: sync.Mutex{}, } // messagesにpublishされたデータを取得した場合の処理 // ゴルーチンを使って非同期で行う go func() { pubsubCh := pubsub.Channel() // メッセージの受信(consume) for msg := range pubsubCh { // 受信したmessageはJSON形式なので、これをmodel.Message構造体に変換 message := &model.Message{} err := json.Unmarshal([]byte(msg.Payload), message) if err != nil { log.Printf(err.Error()) continue } // 購読しているクライアントにRedisから受け取ったMessageをブロードキャスト resolver.mutex.Lock() for _, ch := range resolver.subscribers { ch <- message } resolver.mutex.Unlock() } }() return resolver }
※今回は全ての初期化やゴルーチンの開始をNewResolver()
の中で行っていますが、通常はモジュール化すべきでしょう。
server.go
./server.go
では、NewResolver()
で生成したResolverオブジェクトを渡すように修正します。
func main() { port := os.Getenv("PORT") if port == "" { port = defaultPort } srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{ Resolvers: graph.NewResolver(context.Background()), // <----- 修正 })) http.Handle("/", playground.Handler("GraphQL playground", "/query")) http.Handle("/query", srv) log.Printf("connect to http://localhost:%s/ for GraphQL playground", port) log.Fatal(http.ListenAndServe(":"+port, nil)) }
schema.resolvers.go
では、各Query,Mutation、Subscriptionの処理を実装していきましょう。
ファイルはこちらにあります。
Mutation: postMessage
クライアントがメッセージを投稿するMutationです。
まずは受信したメッセージをRedisに格納します。そのためLPush
を呼び出しています。
さらにこのメッセージをsubscribeしたクライアントに配信するため、Publish
を実行しています。
func (r *mutationResolver) PostMessage(ctx context.Context, user string, text string) (*model.Message, error) { message := &model.Message{ ID: ksuid.New().String(), CreatedAt: time.Now().UTC(), User: user, Text: text, } messageJson, _ := json.Marshal(message) if err := r.redisClient.LPush(ctx, redisKeyMessages, string(messageJson)).Err(); err != nil { log.Println(err.Error()) return nil, err } // messageをRedisにpublish r.redisClient.Publish(ctx, redisPostMessagesSubscription, messageJson) return message, nil }
r.redisClient.Publish(...)
を実行することで、先ほどNewResolver()
内で起動した監視用のゴルーチンにより各クライアント(subscribers
に保存したチャンネル)に配信されます。
Query: messages
すべてのメッセージを受信するQueryは以下のようになります。
RedisのLRange
を使い、保存したメッセージを全件読み出し、スライスに変換してreturnします。
func (r *queryResolver) Messages(ctx context.Context) ([]*model.Message, error) { // Redisのmessagesからデータを取得 cmd := r.redisClient.LRange(ctx, redisKeyMessages, 0, -1) if cmd.Err() != nil { log.Println(cmd.Err()) return nil, cmd.Err() } result, err := cmd.Result() if err != nil { log.Println(err) return nil, err } messages := []*model.Message{} for _, messageJson := range result { m := &model.Message{} _ = json.Unmarshal([]byte(messageJson), &m) messages = append(messages, m) } return messages, nil }
Subscription: messagePosted
こちらは前回とほぼ同様です。
チャンネルを作成したらsubscribers
に保存するだけです。
func (r *subscriptionResolver) MessagePosted(ctx context.Context, user string) (<-chan *model.Message, error) { r.mutex.Lock() defer r.mutex.Unlock() if _, ok := r.subscribers[user]; ok { err := fmt.Errorf("`%s` has already been subscribed.", user) log.Print(err.Error()) return nil, err } // チャンネルを作成し、リストに登録 ch := make(chan *model.Message, 1) r.subscribers[user] = ch log.Printf("`%s` has been subscribed!", user) // コネクションが終了したら、このチャンネルを削除する go func() { <-ctx.Done() r.mutex.Lock() delete(r.subscribers, user) r.mutex.Unlock() log.Printf("`%s` has been unsubscribed.", user) }() return ch, nil }
問題点:サーバが削除されると、クライアントはコネクションを失う
今回の構成で、複数台のサーバで構成することができます。
しかし、依然として各サーバはクライアントとのコネクションを維持しています。
そのため、AWSのAutoScalingなどでサーバが削除された場合に、クライアントは接続先のサーバを失ってしまいます。
クライアント側には、もしsubscribeしたサーバとのコネクションを失った場合、もう一度subscribeし直すという実装が必要になります。
まとめ
RedisのPub/Subを使えば、GraphQLのSubscriptionsサーバを複数台で構成できる。
サーバの削除に対応するためには、クライアントはもう一度subscribeし直すという実装が必要になる。