こんにちは。開発G、開発研究チームのサーバサイドエンジニアです。
最近のマイクロサービスのサンプルを作成しながら、様々な技術の中で、メッセージングについて紹介してみます。
目次
NATSとは
NATSはオープンソース、軽量の高性能クラウドネイティブメッセージングシステムです。高レベルの拡張性を備えたpub-subメッセージングシステムです。
NATSと比較されるものはActiveMQ、Kafka、Kestreal、NSQ、RabbitMQ、Redis等がある
(Chart source: bravenewgeek.com/dissecting-message-queues)
NATSの主要な機能/特徴は、以下の通りです。
- 高性能(fast)
- 常時利用可能(dial tone)
- 非常に軽量(small footprint)
- 最大1回の配信(fire and forget)
- 複数のメッセージ通信モデルとユースケースシナリオをサポート(flexible)
NATSメッセージングモデル
NATSを動かしてみる
以下は、NATSサーバーをダウンロードするためのさまざまなディストリビューションです。
https://nats.io/download/nats-io/nats-server/
様々なクライアント言語提供しているので、ここで確認することができます。
Go言語クライアントを使用してNATSサーバーをインストールすることもできます。
1 |
$ GO111MODULE=on go get github.com/nats-io/nats-server/v2 |
nats-server起動
1 2 3 4 5 6 7 8 9 10 11 12 |
$ nats-server -DV [12451] 2020/05/20 13:41:40.585904 [INF] Starting nats-server version 2.1.7 [12451] 2020/05/20 13:41:40.586033 [DBG] Go build version go1.14 [12451] 2020/05/20 13:41:40.586037 [INF] Git commit [not set] [12451] 2020/05/20 13:41:40.586368 [INF] Listening for client connections on 0.0.0.0:4222 [12451] 2020/05/20 13:41:40.586376 [INF] Server id is NCNBXLD76JM4ZPZMDJPZJMAHZUTEY6AY5QOQGVWT2T5KE7N3NOT722ZG [12451] 2020/05/20 13:41:40.586379 [INF] Server is ready [12451] 2020/05/20 13:41:40.586390 [DBG] Get non local IPs for "0.0.0.0" [12451] 2020/05/20 13:41:40.586587 [DBG] ip=192.168.3.8 [12451] 2020/05/20 13:41:40.586594 [DBG] ip=2400:2410:b0a3:e900:4a8:2832:bbd9:2719 [12451] 2020/05/20 13:41:40.586597 [DBG] ip=2400:2410:b0a3:e900:2c27:9d00:5ac3: |
汎用的なexamplesクライアントとして、メッセージングモデル毎に動作を見てみます。
1 |
$ go get github.com/nats-io/nats.go |
Pub/Sub
まずはPub/Subを確認するため、以下の準備しておきます。
nats-pubとnats-subクライアントビルドします。
1 2 3 4 5 |
$ cd $GOPATH/src/github.com/nats-io/nats.go/examples/nats-pub $ go build -o ~/nats-pub main.go $ cd $GOPATH/src/github.com/nats-io/nats.go/examples/nats-sub $ go build -o ~/nats-sub main.go |
メッセージ受信可能なsubjectをそれぞれchannel.1, channel.2, channel.*起動しておきます。
1 2 3 4 |
$ ./nats-sub/main.go -t "channel.1" $ ./nats-sub/main.go -t "channel.2" $ ./nats-sub/main.go -t "channel.*" Listening on [channel.*] |
pubクライアントでメッセージを送ってみます。
1 2 3 |
# Usage: nats-pub [-s server] [-creds file] <subject> <msg> $ go run nats-pub/main.go channel.1 "msg1" Published [channel.1] : 'msg1' |
マッチングされたsubjects(channel.1, channle.*)でメッセージを受信することがわかります。
1 2 3 4 5 |
Listening on [channel.1] 2020/05/20 14:07:23 [#1] Received on [channel.1]: 'msg1' Listening on [channel.*] 2020/05/20 14:07:23 [#1] Received on [channel.1]: 'msg1' |
Request Reply
続いてrequest reply確認する、以下の準備しておきます。
nats-reqとnats-rplyクライアントビルドします。
1 2 3 4 5 |
$ cd $GOPATH/src/github.com/nats-io/nats.go/examples/nats-req $ go build -o ~/nats-req main.go $ cd $GOPATH/src/github.com/nats-io/nats.go/examples/nats-rply $ go build -o ~/nats-rply main.go |
subject channel.1というsubscribeクライアントを起動します。
1 2 |
$ ./nats-sub channel.1 Listening on [channel.1] |
channel.1にreplyするクライアント起動します。
1 2 |
$ ./nats-rply channel.1 "reply to channel.1: msg1" Published [channel.1] : 'msg 1' |
準備ができたらRequestを送ってみると、リクエストに対して結果メッセージを受け取っていることがわかります。
1 2 3 |
$ ./nats-req channel.1 "msg1" Published [channel.1] : 'msg1' Received [_INBOX.ksjyBHh7L2J5YQThm4geU6.SBHH5SuK] : 'reply to channel.1: msg1' |
SubとReplyはPub/Sub時と同様にメッセージを受け取っていることがわかります。
1 |
[#1] Received on [channel.1]: 'msg1' |
Queue Groups
最後queue groups確認する、以下の準備しておきます。
nats-qsubクライアントビルドします。
1 2 |
$ cd $GOPATH/src/github.com/nats-io/nats.go/examples/nats-qsub $ go build -o ~/nats-qsub main.go |
各グループ別にsubscribe (a, b)を起動します。
1 2 3 4 5 |
$ ./nats-qsub channel.1 G1 # group1 a $ ./nats-qsub channel.1 G1 # group1 b $ ./nats-qsub channel.1 G2 # group2 a $ ./nats-qsub channel.1 G2 # group2 b |
Pubクライアントからメッセージを送ってみます。
1 2 3 4 5 6 7 |
$ ./nats-pub channel.1 "msg 1" $ ./nats-pub channel.1 "msg 2" $ ./nats-pub channel.1 "msg 3" $ ./nats-pub channel.1 "msg 4" $ ./nats-pub channel.1 "msg 5" $ ./nats-pub channel.1 "msg 6" $ ./nats-pub channel.1 "msg 7" |
メッセージ送った際のそれぞれのQueue-Subクライアントの出力は以下になります。
group1 a
1 2 3 |
Listening on [channel.1], queue group [G1] [#1] Received on [channel.1] Queue[G1] Pid[23817]: 'msg 3' [#2] Received on [channel.1] Queue[G1] Pid[23817]: 'msg 7' |
group1 b
1 2 3 4 5 6 |
Listening on [channel.1], queue group [G1] [#1] Received on [channel.1] Queue[G1] Pid[23852]: 'msg 1' [#2] Received on [channel.1] Queue[G1] Pid[23852]: 'msg 2' [#3] Received on [channel.1] Queue[G1] Pid[23852]: 'msg 4' [#4] Received on [channel.1] Queue[G1] Pid[23852]: 'msg 5' [#5] Received on [channel.1] Queue[G1] Pid[23852]: 'msg 6' |
group2 a
1 2 3 |
Listening on [channel.1], queue group [G2] [#1] Received on [channel.1] Queue[G2] Pid[23885]: 'msg 1' [#2] Received on [channel.1] Queue[G2] Pid[23885]: 'msg 6' |
group2 b
1 2 3 4 5 6 |
Listening on [channel.1], queue group [G2] [#1] Received on [channel.1] Queue[G2] Pid[23919]: 'msg 2' [#2] Received on [channel.1] Queue[G2] Pid[23919]: 'msg 3' [#3] Received on [channel.1] Queue[G2] Pid[23919]: 'msg 4' [#4] Received on [channel.1] Queue[G2] Pid[23919]: 'msg 5' [#5] Received on [channel.1] Queue[G2] Pid[23919]: 'msg 7' |
同じグループ内ではどれか1つのサブスクライバーにメッセージを投げていることがわかります。
メッセージングの基本はここまです。その他にも、
・NATSサーバー認証、権限、セキュリティ、ロギング、モニタリング、クラスタの設定などは、CLIまたは設定ファイルで行うことができます。
参考:https://docs.nats.io/developing-with-nats/security
・NATSの全体的な統計データは、別のツール(nats-top)を使用して監視することができます。
参考:https://docs.nats.io/nats-tools/nats-tools
NATS Streamingとは
NATS Serverは、pub / subメカニズムに基づく高性能なnativeのメッセージングシステムであり、メッセージの永続性はありません。NATS Streamingは、NATSを利用したデータストリーミングシステムで、Golangで作成して、メッセージ永続化機能が追加されました。
NATS Streamingも動かしてみます
nats streamingサーバー
1 |
$ go get github.com/nats-io/nats-streaming-server |
nats streamingクライアント
1 |
$ go get github.com/nats-io/go-nats-streaming |
nats-streaming-server起動します
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
$ go run nats-streaming-server.go \ --store file \ --dir ./data \ --max_msgs 0 \ --max_bytes 0 \ -m 8222 [27156] 2020/05/20 15:40:26.257434 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.17.0 [27156] 2020/05/20 15:40:26.257625 [INF] STREAM: ServerID: fIpjggJaXHx11pLUVUm9zq [27156] 2020/05/20 15:40:26.257634 [INF] STREAM: Go version: go1.14 [27156] 2020/05/20 15:40:26.257641 [INF] STREAM: Git commit: [not set] [27156] 2020/05/20 15:40:26.260515 [INF] Starting nats-server version 2.1.4 [27156] 2020/05/20 15:40:26.260551 [INF] Git commit [not set] [27156] 2020/05/20 15:40:26.261082 [INF] Starting http monitor on 0.0.0.0:8222 [27156] 2020/05/20 15:40:26.261174 [INF] Listening for client connections on 0.0.0.0:4222 [27156] 2020/05/20 15:40:26.261178 [INF] Server id is NCFN6DJKHZRBGPA3LBWAMT6Y4ZJ6F55V7IL4TSFB6J43TXY7P73WURU3 [27156] 2020/05/20 15:40:26.261182 [INF] Server is ready [27156] 2020/05/20 15:40:26.288234 [INF] STREAM: Recovering the state... [27156] 2020/05/20 15:40:26.289593 [INF] STREAM: Recovered 0 channel(s) [27156] 2020/05/20 15:40:26.545030 [INF] STREAM: Message store is FILE [27156] 2020/05/20 15:40:26.545062 [INF] STREAM: Store location: ./data [27156] 2020/05/20 15:40:26.545102 [INF] STREAM: ---------- Store Limits ---------- [27156] 2020/05/20 15:40:26.545108 [INF] STREAM: Channels: 100 * [27156] 2020/05/20 15:40:26.545111 [INF] STREAM: --------- Channels Limits -------- [27156] 2020/05/20 15:40:26.545114 [INF] STREAM: Subscriptions: 1000 * [27156] 2020/05/20 15:40:26.545117 [INF] STREAM: Messages : unlimited [27156] 2020/05/20 15:40:26.545119 [INF] STREAM: Bytes : unlimited [27156] 2020/05/20 15:40:26.545122 [INF] STREAM: Age : unlimited * [27156] 2020/05/20 15:40:26.545125 [INF] STREAM: Inactivity : unlimited * [27156] 2020/05/20 15:40:26.545127 [INF] STREAM: ---------------------------------- [27156] 2020/05/20 15:40:26.545130 [INF] STREAM: Streaming Server is ready |
nats-streamingサーバーはデフォルトでメモリーにデータを保存するので、起動する時storeタイプとdir指定必要です。
1 2 3 4 5 |
--store <string> Store type: MEMORY|FILE (default: MEMORY) --dir <string> For FILE store type, this is the root directory --max_msgs <int> Max number of messages per channel (0 for unlimited) --max_bytes <size> Max messages total size per channel (0 for unlimited) -m, --http_port <int> Use port for http monitoring |
monitoring url: http://127.0.0.1:8222/
汎用的なexamplesクライアントとして、動作確認します。
1 2 3 4 5 6 7 |
$ cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples $ ll total 0 drwxr-xr-x 3 s-ri staff 96B May 20 11:11 stan-bench drwxr-xr-x 3 s-ri staff 96B May 20 11:11 stan-pub drwxr-xr-x 3 s-ri staff 96B May 20 11:11 stan-sub |
Pubでメッセージ送ってみます。
1 2 3 4 5 |
$ go run stan-pub/main.go levelup "lv +10" Published [levelup] : 'lv +10' $ go run stan-pub/main.go levelup "lv +20" Published [levelup] : 'lv +20' |
メッセージを受信できることがわかります。
1 2 3 4 5 6 7 |
$ go run stan-sub/main.go --all -c test-cluster -id GAMEEVENT levelup Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [GAMEEVENT] subscribing with DeliverAllAvailable Listening on [levelup], clientID=[GAMEEVENT], qgroup=[] durable=[] [#1] Received on [levelup]: 'sequence:1 subject:"levelup" data:"lv +10" timestamp:1589957841183979000 ' [#2] Received on [levelup]: 'sequence:2 subject:"levelup" data:"lv +20" timestamp:1589957883417973000 ' |
ファイルで永続化保存しているものを確認します。
1 2 3 4 5 6 |
$ ls -l $GOPATH/src/github.com/nats-io/nats-streaming-server/data total 16 -rw-r--r-- 1 s-ri staff 767 May 20 16:00 clients.dat drwxr-xr-x 5 s-ri staff 160 May 20 15:57 levelup -rw-r--r-- 1 s-ri staff 268 May 20 15:34 server.dat |
最後に
簡単に基本的な機能について試してみました。
今後は、NATS Streamingの詳細やクラスタリングの動作を掘り下げていったり、
Goクライアントの紹介や実際にNATSを使用したサンプルアプリケーションの実装なんかも試してみたいところです。
参考
https://docs.nats.io/
https://synadia.com/