Apache Storm DRPCのHTTP APIを使う

Apache Stormのdefaults.yamldrpc.http* の項目があることから分かるように、Storm 0.10.0のDRPCサーバはDRPCクライアントから利用されるほかに、HTTP APIを持っている(0.9.6にはない)。つまり、HTTPクライアント(PerlのHTTPクライアントライブラリでもcurlでもWebブラウザでも)を用いて、DRPCサーバに処理リクエストを投げ、トポロジの処理結果をレスポンスとして受け取ることができる。

何が嬉しいか

HTTPリクエストに対する処理がスケールアウトしやすい

StormのDRPCは、Distributed Remote Procedure Callの略だが、Stormのトポロジの入力を外部から受け付け、トポロジを通して得られた処理結果を外部に返すためのものである。Stormのトポロジは、入力データを流す始点となるSpoutと、データを処理するBoltに分けられるが、DRPCアプリケーションの場合、Spoutは外部からの入力データを受け付け次のBoltに流し、Boltはデータに処理を行いその結果を次のBoltに渡し、最後のBoltが最終的な処理結果のデータを外部に返す。

Stormのトポロジを構成するSpout、Boltはスケールアウトできるように設計されているので、処理が重い部分・並列分散処理させたい部分の並列数を増やす指定をすることで、実行するスレッド(あるいはプロセス、マシン)を増やしたり、逆に処理を分散させたくない部分の並列数を1にしたりもできる。また、クラスタを構成するマシンの台数を増やすことで、アプリケーション内の並列分散処理の性能を高めることができる。マシンを増やした際の作業は、Stormの仕組みのおかげで、非常に簡単で、アプリケーションに変更は必要ない。

DRPCのHTTP APIを用いることで、このStormの並列分散処理の仕組みをそのまま、HTTPリクエストに対する処理に適用することができる。

このメリットのイメージ図を551の肉まん風に書いてみた。


マイクロサービスを作りやすい

1つのStormクラスタに、複数のDRPCアプリケーションをデプロイすることができる。Netflixなどがよく言っている、マイクロサービスのアーキテクチャ実装のために、この仕組みは適している。

1つのDRPCアプリケーションにリクエストに対する重厚長大な処理をさせるよりも、複数のDRPCアプリケーションを1つのクラスタの中に並べ、それぞれのDRPCアプリケーションに小さな処理を担当させた方が、より変化に対応しやすい環境となる。たとえば、スケールアウトしやすかったり、処理のある一部分の変更が容易になる。

DRPCのHTTP API仕様

DRPCサーバのHTTP APIstorm-core/src/clj/org/apache/storm/daemon/drpc.cljを見ると、

  • GET: http://DRPCサーバ名:DPRCのHTTPポート番号/drpc/関数名[/引数]
  • POST: http://DRPCサーバ名:DPRCのHTTPポート番号/drpc/関数名[/]、リクエストボディに引数

となっていることがわかる。引数を省略した場合、空文字が関数に渡される。GETとPOSTの両APIに実行時の違いは無さそうなので、クライアントの都合でどちらを使うかを選択すれば良いようだ。

実際に使ってみた。

Stormクラスタのセットアップ

最低限の設定として、ZookeeperとNimbus、DRPCサーバのアドレスを指定する(すべてlocalhost)。それを使ってコンポーネントを起動する。

$ curl -O http://ftp.meisei-u.ac.jp/mirror/apache/dist/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
$ tar zxvf apache-storm-0.10.0.tar.gz; cd apache-storm-0.10.0/
$ cp conf/storm.yaml conf/storm.yaml.default
$ vim conf/storm.yaml
$ diff conf/storm.yaml.default conf/storm.yaml
18,23c18,22
< # storm.zookeeper.servers:
< #     - "server1"
< #     - "server2"
< #
< # nimbus.host: "nimbus"
< #
---
> storm.zookeeper.servers:
>     - "localhost"
>
> nimbus.host: "localhost"
>
37,39c36,37
< # drpc.servers:
< #     - "server1"
< #     - "server2"
---
> drpc.servers:
>     - "localhost"

$ ./bin/storm dev-zookeeper &
$ ./bin/storm nimbus &
$ ./bin/storm supervisor &
$ ./bin/storm drpc &

Storm DRPCアプリケーションのデプロイ

DRPCストリームを含むStormトポロジを、Stormクラスタにデプロイする。今回はapache-storm-0.10.0.tar.gzに同梱されている、storm-starterから2つ拝借する。

$ ./bin/storm jar \
  examples/storm-starter/storm-starter-topologies-0.10.0.jar \
  storm.starter.trident.TridentWordCount TridentWordCount
$ ./bin/storm jar \
  examples/storm-starter/storm-starter-topologies-0.10.0.jar \
  storm.starter.BasicDRPCTopology BasicDRPCTopology

DRPCのHTTP APIにアクセス

TridentWordCountは時間が経つにつれストリームに流れた文の量が増え、単語カウントが大きくなっていく。適当なタイミングでアクセスしてみた。対象関数はwordsで、引数として2つ「the」と「eat」をそれぞれGETとPOSTで渡してみた。

$ curl http://localhost:3774/drpc/words/the
[[2679]]

$ curl -X POST -d eat http://localhost:3774/drpc/words/
[[536]]

DRPCクライアントを用いた時と同様の返り値であるようだ。

次に、BasicDRPCTopologyについて。対象関数はexclamationで、引数として「eat」と「Hello」をそれぞれGETとPOSTで渡してみた。

$ curl http://localhost:3774/drpc/exclamation/eat
eat!

$ curl -X POST -d Hello http://localhost:3774/drpc/exclamation
Hello!

こちらも想定通り。

ちなみに、空白を含む文字列などを引数に渡す際にはURLエンコードして渡してやればStormの方でデコードしてくれて、結果もデコードされた文字列で返される。

$ curl http://localhost:3774/drpc/exclamation/The%20cow%20jumped%20over%20the%20moon
The cow jumped over the moon!

DRPCはStormの機能の中でも割とマイナな方だと思っていたが、かなり実用的な進化をしていて嬉しい。drpc.https.* などの設定項目があることから、セキュアな環境もサポートしているようだ。