Apache Kafka + Druidを使ってインタラクティブに時系列データを集計処理してみた
以下の内容をより実用的・網羅的にまとめ、Kindle電子書籍としてリリースした。
- 作者: 佐伯嘉康
- 発売日: 2015/07/25
- メディア: Kindle版
- この商品を含むブログを見る
概要、参考資料
Druidは、リアルタイムに(ストリーミングな)時系列データを収集するコンポーネントと、収集したデータセットに対し集計クエリを受け付け処理結果を返すコンポーネントからなるデータベースシステムである。
Google技術でいうところのDremel (VLDB2010) とPowerDrill (VLDB2012) との中間に位置するらしく、つまりCloudera Impalaとも似ている。Druidが基準にしている性能指標は、6TBのデータを1桁秒で処理することであると挙げられているが、それを実現するためのシステム要件はそれほど高くない(メモリは特にあればあるだけ安心といった感じである)。
- https://github.com/druid-io/druid
- https://github.com/druid-io/druid-api
- https://github.com/metamx/java-util
- Druidが依存しているライブラリ。なぜかデータパーサのインタフェースがこっちにある
- Druid
- SIGMOD2014の論文
- Druid: Interactive Queries Meet Real-time Data: O'Reilly Open Source Convention: OSCON, July 20 - 24, 2014 in Portland, OR
- OSCON2014の発表スライド
- https://www.youtube.com/results?search_query=druid+real-time
- Druidに関するプレゼンが幾つかある
Druidを用いた時系列データの収集
Druidの現在のStableバージョンは0.6.160である。本稿でもこのバージョンを用いる。github上ではバージョン毎にtagが付いているので、コードはdruid-0.6.160、そして依存するdruid-api-0.2.14.1を参照する。
最新版と比較すると分かる通り、多くのクラスやメソッドが現在も変更が加えられており、コードが安定していない。いつ安定するかも不明であるため、Druid内でやる(拡張プラグインを作る)か、Druid外でやる(入力データの前処理や出力結果データの後処理をするためのシステム・プログラムを別に用意する)かは、判断が難しい。本稿では情報の陳腐化を遅らせるため、Druidのコードを書くことを出来る限り避けた活用を方針とする。
システムの構築のために、VagrantとDockerを用いた。VagrantfileはGitHub - coreos/coreos-vagrant: Minimal Vagrantfile for Container Linuxを用い、メモリを4GB確保している。
diff --git a/Vagrantfile b/Vagrantfile index 2a1572e..c877612 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -13,7 +13,7 @@ $num_instances = 1 $update_channel = "alpha" $enable_serial_logging = false $vb_gui = false -$vb_memory = 1024 +$vb_memory = 4096 $vb_cpus = 1 # Attempt to apply the deprecated environment variable NUM_INSTANCES to
$ vagrant up # VM立ち上げ $ vagrant ssh # VMに入る CoreOS (alpha) core@core-01 ~ $ mkdir zookeeper kafka druid clients core@core-01 ~ $ which docker /usr/bin/docker
以降のコンソール操作は、全てVM上でのものである。
依存システムの構築
Druidは複数のコンポーネント(サーバ)からなる分散システムである。そのシステムを構成する複数サーバのコーディネーションのためにZookeeperが用いられる。
Dockerfileは https://github.com/laclefyoshi/druid_test/blob/master/zookeeper
$ cd zookeeper $ docker build -t laclefyoshi/zookeeper:1.0 . $ docker run -d -p 2181:2181 --name zookeeper laclefyoshi/zookeeper:1.0
DruidはFirehoseと呼ばれるデータ源から時系列データを取得する。拡張プラグインを実装することで様々なものをFirehoseとすることができる。例えば、書籍「Storm Blueprints: Patterns for Distributed Real-time Computation」の中では、Apache StormのTrident StateをFirehoseとして扱い、Stormの処理結果をそのままDruidの入力データとするような例が掲載されている。
本稿では、メッセージキューイングシステムであるApache Kafka (Version 0.8) をFirehoseとして用いる。Apache Kafkaが依存するZookeeperは上のものを用い、Druidと共用とする。
Dockerfileは https://github.com/laclefyoshi/druid_test/tree/master/kafka
$ cd kafka $ docker build -t laclefyoshi/kafka:1.0 . $ docker run -d -p 9092:9092 --link zookeeper:zookeeper --name kafka laclefyoshi/kafka:1.0
DruidシステムとDruidサービスの構築
Druidシステムの上で、Druidサービスが動作する。Druidサービスは、サービス名の他、どのFirehoseをデータ入力源とし、どのようなデータを処理対象とするかを定義する。
Druidシステムの構築パターンには、スタンドアロン版とクラスタ版がある。本稿ではシンプルな方、スタンドアロン版の構築について紹介する。スタンドアロン版は、ストリーミングデータの入力、永続化、クエリの受付と処理・結果返却を1台のマシンで実行する。1つのスタンドアロン版システム上で動作するDruidサービスは1つである。ちなみに、クラスタ版になると、これらの処理が複数マシンで分担され、複数のサービスを起動することができる。
Dockerfileは https://github.com/laclefyoshi/druid_test/tree/master/druid
$ cd druid $ docker build -t laclefyoshi/druid:1.0 . $ docker run -d -p 8083:8083 --link zookeeper:zookeeper --link kafka:kafka --name druid laclefyoshi/druid:1.0
最後に、Apache Kafkaにデータを投入するコンテナを用意する。これはいわゆるKafka Producerであり、Apache Kafkaに以下のようなJSONデータを送信する。
{"timestamp": "2014-12-21T10:17:00", "number": 7, "word": "fox"}
timestampはデータを送信した日時を示す。numberはランダムな整数で、wordはランダムな英単語を出力する。
Dockerfileは https://github.com/laclefyoshi/druid_test/tree/master/clients
$ cd clients $ docker build -t laclefyoshi/clients:1.0 . $ docker run -d --link zookeeper:zookeeper --link kafka:kafka --name clients laclefyoshi/clients:1.0
以上でシステムの構築は完了した。clientsコンテナからApache Kafkaに投入されたデータは、Druidによってキャッチされ、Druidの入力データとして収集され続ける。
$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 4021acc277c7 laclefyoshi/clients:1.0 "bash send_data.sh" About a minute ago Up About a minute clients 9280e328e97d laclefyoshi/druid:1.0 "bash ./start_druid_ 9 minutes ago Up 9 minutes 0.0.0.0:8083->8083/tcp druid 9e17bab05cc2 laclefyoshi/kafka:1.0 "bash ./kafka_create 11 minutes ago Up 11 minutes 0.0.0.0:9092->9092/tcp kafka c0614de5dce0 laclefyoshi/zookeeper:1.0 "./bin/zkServer.sh s 25 minutes ago Up 25 minutes 0.0.0.0:2181->2181/tcp zookeeper
Druidを用いた時系列データの集計
clientsコンテナが起動しいくらか時間が経ってから、以下の様なクエリをHTTP POSTでdruidコンテナに投げる。
$ cat query.json { "queryType":"timeBoundary", "dataSource":"druid_test", "intervals":["2014-12-01/2014-12-31"] } $ curl -X 'POST' -H 'Content-Type:application/json' -d @query.json http://localhost:8083/druid/v2/?pretty [ { "timestamp" : "2014-12-21T10:17:00.000Z", "result" : { "minTime" : "2014-12-21T10:17:00.000Z", "maxTime" : "2014-12-21T10:51:00.000Z" } } ]
このクエリは、druid_testという名前で定義されたDruidサービスに、いつからいつまでのデータが投入されているか(集計対象になっているか)を確認するためのクエリである。サービスの名前はDruidのruntime.propertiesのdruid.serviceで定義し、タイムスタンプがデータのどのフィールドにあるかはrealtime.specのパーサ設定で定義されている。
次に、指定時間内を15分ずつ区切り、各15分間でどの単語が何回出現したかをカウントするクエリを投げる。ついでに、カウントしたら、降順でソートさせる。
$ cat query.json { "queryType": "groupBy", "dataSource": "druid_test", "granularity": "fifteen_minute", "dimensions": ["word"], "limitSpec": { "type": "default", "columns": [ {"dimension": "count_word", "direction": "DESCENDING"} ], "limit": 10 }, "aggregations": [ {"type": "count", "name": "count_word"} ], "intervals": ["2014-12-21T10:20/2014-12-21T10:50"] } $ time curl -X 'POST' -H 'Content-Type:application/json' -d @query.json http://localhost:8083/druid/v2/?pretty [ { "version" : "v1", "timestamp" : "2014-12-21T10:15:00.000Z", "event" : { "count_word" : 4, "word" : "lazy" } }, { "version" : "v1", "timestamp" : "2014-12-21T10:15:00.000Z", "event" : { "count_word" : 4, "word" : "the" } }, { "version" : "v1", "timestamp" : "2014-12-21T10:15:00.000Z", "event" : { "count_word" : 3, "word" : "over" } }, { "version" : "v1", "timestamp" : "2014-12-21T10:15:00.000Z", "event" : { "count_word" : 3, "word" : "quick" } }, { "version" : "v1", "timestamp" : "2014-12-21T10:15:00.000Z", "event" : { "count_word" : 2, "word" : "dog" } }, { "version" : "v1", "timestamp" : "2014-12-21T10:15:00.000Z", "event" : { "count_word" : 2, "word" : "jumps" } }, { "version" : "v1", "timestamp" : "2014-12-21T10:15:00.000Z", "event" : { "count_word" : 1, "word" : "fox" } }, { "version" : "v1", "timestamp" : "2014-12-21T10:30:00.000Z", "event" : { "count_word" : 6, "word" : "brown" } }, { "version" : "v1", "timestamp" : "2014-12-21T10:30:00.000Z", "event" : { "count_word" : 6, "word" : "quick" } }, { "version" : "v1", "timestamp" : "2014-12-21T10:30:00.000Z", "event" : { "count_word" : 4, "word" : "fox" } } ] real 0m0.035s user 0m0.003s sys 0m0.001s
データは1分に1〜2個程度投げられているらしい。合計30分間、15分間隔の処理(つまり処理は2回行われた)に、0.03秒掛かっていることが分かる。
カウントの結果を見てみると、10:15から10:30の間に"lazy"と"the"が4回投げられているし、10:30から10:45の間に、"brown"が6回投げられていることが分かる。これは非常に単純なデータの集計である。例えばデータがWebサーバのログであったなら、IPアドレスやユーザエージェント毎のアクセス数の集計などが可能になるし、SSHDのログであれば、アクセス失敗のログ集計から異常検知に応用できる。
Druidが受け付けるクエリはgroupByの他に、ある数値フィールドの合計値を求めるものや、集計結果に対し上位N件を求めるもの、あるフィールドの値によりデータのフィルタリングを実行するものなどがある。
まとめ
本稿では、Apache Kafkaに入ったデータを入力とするシンプルなDruidサービスを構築した。実際にデータを収集させ、集計クエリを投げ、結果を得ることを確認した。
DruidのFirehoseとしたいデータ源が、必ずしもDruidの入力に適したフォーマットのデータを持っているとは限らない。そのため、データ源とFirehoseの間にApache Stormなど逐次ストリーミングデータ処理システムを挟むか、Druidのパーサを拡張することが必要とされる場面がほとんどだろう。
時間があるときに、クラスタ版Druidシステムの構築、Druidによるデータ永続化とHadoopとの連携を試してみたい。