おひとり

できる限りひとりで楽しむための情報やプログラミング情報など。

GoでGraphQLのSubscriptions(サーバ)の実装。RedisのPub/Subを使って複数サーバ構成に対応

この記事では、前回に引き続き、GoでGraphQLのSubscriptionsを実装したサーバを作って行きます。
前回の記事はこちら。

www.ohitori.fun

ただし、今回はRedisを使い、サーバ複数台の構成で実行出来るように修正します。

コードはGitHubに公開しています。

github.com

今回は前回作ったサンプルを修正してきます。
ただし、前回の記事もほとんどがgqlgenで初期化したひな形を少し書き換えているだけです。
docker-compose.yamlを除き、新しいファイルの追加などは行いません。そのため、gqlgenの構造を知っている方であれば、前回の記事を読む必要はありません。

準備

まずはRedisサーバを準備する必要があります。
今回はdockerを使って用意しましょう。

以下のようなdocker-compose.yaml./に作成します。

./docker-compose.yaml

version: "3.9"

services:
  redis:
    image: redis:6.2.2-alpine
    ports:
      - "6379:6379"

全体として、ディレクトリ構造はこうなります。

.
├── README.md
├── docker-compose.yaml <---------- 新規作成
├── go.mod
├── go.sum
├── gqlgen.yml
├── graph
│   ├── generated
│   │   └── generated.go
│   ├── model
│   │   └── models_gen.go
│   ├── resolver.go
│   ├── schema.graphqls
│   └── schema.resolvers.go
└── server.go

3 directories, 11 files

あとはdocker compose upコマンドでRedisを起動しておきます。
※PCにインストールしているDockerのバージョンが古い場合は、代わりにdocker-compose upを実行してください。

実装

今回は前回に引き続きgqlgenを使います。
Redisのクライアントにはgo-redis/redisを使います。

このようなGraphQLのsubscriptionとRedisのPub/Subを組み合わせて実装していきます。

f:id:hitoridehitode:20210505111646p:plain
RedisのPub/SubとGraphQLのSubscriptions

(再掲)スキーマの定義

前回から見ている場合は修正の必要はありません。
参考としてもう一度掲載しておきます。

./graph/schema.graphqls

scalar Time

type Message {
  id: String!
  user: String!
  createdAt: Time!
  text: String!
}

type Mutation {
  postMessage(user: String!, text: String!): Message
}

type Query {
  messages: [Message!]!
}

type Subscription {
  messagePosted(user: String!): Message!
}

resolver.go

./graph/resolver.goを修正していきます。

まずは、Resolver構造体の修正です。 ここには、Redisクライアントのオブジェクト、およびPub/Subのオブジェクトを格納できるように追加します。

./graph/resolver.go

type Resolver struct {
    redisClient *redis.Client
    redisPubSub *redis.PubSub
    subscribers map[string]chan<- *model.Message
    mutex       sync.Mutex
}

続いて、Resolverを初期化するためのNewResolver()関数を作成します。
ここでは、Redisクライアントの初期化だけではなく、RedisのPubにより発行されたデータを処理するゴルーチンも起動します。

const redisPostMessagesSubscription = "messages"
const redisKeyMessages = "messages"

func NewResolver(ctx context.Context) *Resolver {
    redisClient := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })

    // messagesチャンネルを購読
    pubsub := redisClient.Subscribe(ctx, redisPostMessagesSubscription)

    resolver := &Resolver{
        redisClient: redisClient,
        redisPubSub: pubsub,
        subscribers: map[string]chan<- *model.Message{},
        mutex:       sync.Mutex{},
    }

    // messagesにpublishされたデータを取得した場合の処理
    // ゴルーチンを使って非同期で行う
    go func() {
        pubsubCh := pubsub.Channel()

        // メッセージの受信(consume)
        for msg := range pubsubCh {
            // 受信したmessageはJSON形式なので、これをmodel.Message構造体に変換
            message := &model.Message{}
            err := json.Unmarshal([]byte(msg.Payload), message)
            if err != nil {
                log.Printf(err.Error())
                continue
            }

            // 購読しているクライアントにRedisから受け取ったMessageをブロードキャスト
            resolver.mutex.Lock()
            for _, ch := range resolver.subscribers {
                ch <- message
            }
            resolver.mutex.Unlock()
        }
    }()

    return resolver
}

※今回は全ての初期化やゴルーチンの開始をNewResolver()の中で行っていますが、通常はモジュール化すべきでしょう。

server.go

./server.goでは、NewResolver()で生成したResolverオブジェクトを渡すように修正します。

./server.go

func main() {
    port := os.Getenv("PORT")
    if port == "" {
        port = defaultPort
    }

    srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{
        Resolvers: graph.NewResolver(context.Background()),  // <----- 修正
    }))

    http.Handle("/", playground.Handler("GraphQL playground", "/query"))
    http.Handle("/query", srv)

    log.Printf("connect to http://localhost:%s/ for GraphQL playground", port)
    log.Fatal(http.ListenAndServe(":"+port, nil))
}

schema.resolvers.go

では、各Query,Mutation、Subscriptionの処理を実装していきましょう。

ファイルはこちらにあります。

./graph/schema.resolvers.go

Mutation: postMessage

クライアントがメッセージを投稿するMutationです。
まずは受信したメッセージをRedisに格納します。そのためLPushを呼び出しています。
さらにこのメッセージをsubscribeしたクライアントに配信するため、Publishを実行しています。

func (r *mutationResolver) PostMessage(ctx context.Context, user string, text string) (*model.Message, error) {
    message := &model.Message{
        ID:        ksuid.New().String(),
        CreatedAt: time.Now().UTC(),
        User:      user,
        Text:      text,
    }

    messageJson, _ := json.Marshal(message)
    if err := r.redisClient.LPush(ctx, redisKeyMessages, string(messageJson)).Err(); err != nil {
        log.Println(err.Error())
        return nil, err
    }

    // messageをRedisにpublish
    r.redisClient.Publish(ctx, redisPostMessagesSubscription, messageJson)

    return message, nil
}

r.redisClient.Publish(...)を実行することで、先ほどNewResolver()内で起動した監視用のゴルーチンにより各クライアント(subscribersに保存したチャンネル)に配信されます。

Query: messages

すべてのメッセージを受信するQueryは以下のようになります。
RedisのLRangeを使い、保存したメッセージを全件読み出し、スライスに変換してreturnします。

func (r *queryResolver) Messages(ctx context.Context) ([]*model.Message, error) {
    // Redisのmessagesからデータを取得
    cmd := r.redisClient.LRange(ctx, redisKeyMessages, 0, -1)
    if cmd.Err() != nil {
        log.Println(cmd.Err())
        return nil, cmd.Err()
    }

    result, err := cmd.Result()
    if err != nil {
        log.Println(err)
        return nil, err
    }

    messages := []*model.Message{}
    for _, messageJson := range result {
        m := &model.Message{}
        _ = json.Unmarshal([]byte(messageJson), &m)
        messages = append(messages, m)
    }

    return messages, nil
}

Subscription: messagePosted

こちらは前回とほぼ同様です。
チャンネルを作成したらsubscribersに保存するだけです。

func (r *subscriptionResolver) MessagePosted(ctx context.Context, user string) (<-chan *model.Message, error) {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    if _, ok := r.subscribers[user]; ok {
        err := fmt.Errorf("`%s` has already been subscribed.", user)
        log.Print(err.Error())
        return nil, err
    }

    // チャンネルを作成し、リストに登録
    ch := make(chan *model.Message, 1)
    r.subscribers[user] = ch
    log.Printf("`%s` has been subscribed!", user)

    // コネクションが終了したら、このチャンネルを削除する
    go func() {
        <-ctx.Done()
        r.mutex.Lock()
        delete(r.subscribers, user)
        r.mutex.Unlock()
        log.Printf("`%s` has been unsubscribed.", user)
    }()

    return ch, nil
}

問題点:サーバが削除されると、クライアントはコネクションを失う

今回の構成で、複数台のサーバで構成することができます。
しかし、依然として各サーバはクライアントとのコネクションを維持しています。
そのため、AWSのAutoScalingなどでサーバが削除された場合に、クライアントは接続先のサーバを失ってしまいます。
クライアント側には、もしsubscribeしたサーバとのコネクションを失った場合、もう一度subscribeし直すという実装が必要になります。

まとめ

RedisのPub/Subを使えば、GraphQLのSubscriptionsサーバを複数台で構成できる。
サーバの削除に対応するためには、クライアントはもう一度subscribeし直すという実装が必要になる。

参考リンク

redis.io

pkg.go.dev

f:id:hitoridehitode:20210505102054p:plain Redis Icon by Iconscout Freebies on Iconscout