Apache Storm DRPCのHTTP APIを使う
Apache Stormのdefaults.yamlに drpc.http*
の項目があることから分かるように、Storm 0.10.0のDRPCサーバはDRPCクライアントから利用されるほかに、HTTP APIを持っている(0.9.6にはない)。つまり、HTTPクライアント(PerlのHTTPクライアントライブラリでもcurlでもWebブラウザでも)を用いて、DRPCサーバに処理リクエストを投げ、トポロジの処理結果をレスポンスとして受け取ることができる。
何が嬉しいか
HTTPリクエストに対する処理がスケールアウトしやすい
StormのDRPCは、Distributed Remote Procedure Callの略だが、Stormのトポロジの入力を外部から受け付け、トポロジを通して得られた処理結果を外部に返すためのものである。Stormのトポロジは、入力データを流す始点となるSpoutと、データを処理するBoltに分けられるが、DRPCアプリケーションの場合、Spoutは外部からの入力データを受け付け次のBoltに流し、Boltはデータに処理を行いその結果を次のBoltに渡し、最後のBoltが最終的な処理結果のデータを外部に返す。
Stormのトポロジを構成するSpout、Boltはスケールアウトできるように設計されているので、処理が重い部分・並列分散処理させたい部分の並列数を増やす指定をすることで、実行するスレッド(あるいはプロセス、マシン)を増やしたり、逆に処理を分散させたくない部分の並列数を1にしたりもできる。また、クラスタを構成するマシンの台数を増やすことで、アプリケーション内の並列分散処理の性能を高めることができる。マシンを増やした際の作業は、Stormの仕組みのおかげで、非常に簡単で、アプリケーションに変更は必要ない。
DRPCのHTTP APIを用いることで、このStormの並列分散処理の仕組みをそのまま、HTTPリクエストに対する処理に適用することができる。
このメリットのイメージ図を551の肉まん風に書いてみた。
DRPCのHTTP API仕様
DRPCサーバのHTTP APIはstorm-core/src/clj/org/apache/storm/daemon/drpc.cljを見ると、
- GET: http://DRPCサーバ名:DPRCのHTTPポート番号/drpc/関数名[/引数]
- POST: http://DRPCサーバ名:DPRCのHTTPポート番号/drpc/関数名[/]、リクエストボディに引数
となっていることがわかる。引数を省略した場合、空文字が関数に渡される。GETとPOSTの両APIに実行時の違いは無さそうなので、クライアントの都合でどちらを使うかを選択すれば良いようだ。
実際に使ってみた。
Stormクラスタのセットアップ
最低限の設定として、ZookeeperとNimbus、DRPCサーバのアドレスを指定する(すべてlocalhost)。それを使ってコンポーネントを起動する。
$ curl -O http://ftp.meisei-u.ac.jp/mirror/apache/dist/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz $ tar zxvf apache-storm-0.10.0.tar.gz; cd apache-storm-0.10.0/ $ cp conf/storm.yaml conf/storm.yaml.default $ vim conf/storm.yaml $ diff conf/storm.yaml.default conf/storm.yaml 18,23c18,22 < # storm.zookeeper.servers: < # - "server1" < # - "server2" < # < # nimbus.host: "nimbus" < # --- > storm.zookeeper.servers: > - "localhost" > > nimbus.host: "localhost" > 37,39c36,37 < # drpc.servers: < # - "server1" < # - "server2" --- > drpc.servers: > - "localhost" $ ./bin/storm dev-zookeeper & $ ./bin/storm nimbus & $ ./bin/storm supervisor & $ ./bin/storm drpc &
Storm DRPCアプリケーションのデプロイ
DRPCストリームを含むStormトポロジを、Stormクラスタにデプロイする。今回はapache-storm-0.10.0.tar.gzに同梱されている、storm-starterから2つ拝借する。
- TridentWordCount
- 内部で無限に投入される文に含まれる単語をカウントし続けるストリームと、その結果を取得するDRPCストリーム(関数名は
words
、引数は任意の単語、返ってくる値はその単語のカウント数)を持っている - https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWordCount.java
- 内部で無限に投入される文に含まれる単語をカウントし続けるストリームと、その結果を取得するDRPCストリーム(関数名は
- BasicDRPCTopology
- 単純なDRPCストリーム(関数名は
exclamation
、引数は任意の文字列、返ってくる値は引数の文字列末尾に!を付けたもの)を持っている - https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/BasicDRPCTopology.java
- 単純なDRPCストリーム(関数名は
$ ./bin/storm jar \ examples/storm-starter/storm-starter-topologies-0.10.0.jar \ storm.starter.trident.TridentWordCount TridentWordCount $ ./bin/storm jar \ examples/storm-starter/storm-starter-topologies-0.10.0.jar \ storm.starter.BasicDRPCTopology BasicDRPCTopology
DRPCのHTTP APIにアクセス
TridentWordCountは時間が経つにつれストリームに流れた文の量が増え、単語カウントが大きくなっていく。適当なタイミングでアクセスしてみた。対象関数はwords
で、引数として2つ「the」と「eat」をそれぞれGETとPOSTで渡してみた。
$ curl http://localhost:3774/drpc/words/the [[2679]] $ curl -X POST -d eat http://localhost:3774/drpc/words/ [[536]]
DRPCクライアントを用いた時と同様の返り値であるようだ。
次に、BasicDRPCTopologyについて。対象関数はexclamation
で、引数として「eat」と「Hello」をそれぞれGETとPOSTで渡してみた。
$ curl http://localhost:3774/drpc/exclamation/eat eat! $ curl -X POST -d Hello http://localhost:3774/drpc/exclamation Hello!
こちらも想定通り。
ちなみに、空白を含む文字列などを引数に渡す際にはURLエンコードして渡してやればStormの方でデコードしてくれて、結果もデコードされた文字列で返される。
$ curl http://localhost:3774/drpc/exclamation/The%20cow%20jumped%20over%20the%20moon
The cow jumped over the moon!
DRPCはStormの機能の中でも割とマイナな方だと思っていたが、かなり実用的な進化をしていて嬉しい。drpc.https.*
などの設定項目があることから、セキュアな環境もサポートしているようだ。