KPLでKinesisに投げたデータをboto3で取得してみた
KPLはKinesis Producer Libraryのこと。
受信したデータを処理してKinesisに流す処理をStormのトポロジ(Java)で実装した。で、Kinesisに入ったデータをPythonで処理するために、boto3を使ったのだが、思ったように動かず、試行錯誤したのでメモ。
Aggregated Recordのフォーマットドキュメントが古い
まず、boto3のAPIに書かれている通りに、Kinesisからデータを取得するプログラムを書いた
import boto3 for record in boto3.client("kinesis").get_records(...)["Records"]: data = record["Data"] print(data)
とすると、投入したデータが出てくると思ったが、実際はなにやら色々付いたものが出てきた。
少なくとも以下のようなパターンのデータを取得した。RECORD_DATAが実際に投げた文字列である。
\xf3\x89\x9a\xc2\n\nPARTITION_KEY\x1a...RECORD_DATA...
\xf3\x89\x9a\xc2\n\nPARTITION_KEY\n\nPARTITION_KEY\x1a...RECORD_DATA...
RECORD_DATA
どうやら、KPLを使うとデフォルトでAggregated Recordが有効になるらしく、上2つはKPL Aggregated Record Formatと突き合わせると、\xf3\x89\x9a\xc2
というMagicNumberは一致するが、その後に続くデータはProtobufMessageではなかった。
\n\nPARTITION_KEY
に出てくる値は、Aggregated RecordにまとめられたRECORD_DATAで指定されていたパーティションキーの集合のようだった。
それを読み飛ばして、\x1a
以降がProtobufMessageで、末尾16バイトは確かにチェックサムだったので、どうやら正しくは下記の構造をしているようだった。
0 4 M N N+15 +---+---+---+---+---+---+....+---+---+==================+---+...+---+ | MAGIC NUMBER | \n\nPARTITION KEY+ | PROTOBUF MESSAGE | MD5 | +---+---+---+---+---+---+....+---+---+==================+---+...+---+
なので、最終的にAggregated RecordからRECORD_DATAを取り出すために、PARTITION_KEYに\x1a
が出てこないことを前提として、そしてこれからもProtobufMessageの始まりが\x1a
であることを前提にして以下のようにした。
from generated.messages_pb2 import AggregatedRecord # KPLのソースにある.protoファイルをコンパイルした if data[:4] == "\xF3\x89\x9A\xC2": idx = data.index("\x1A") ar = AggregatedRecord() ar.ParseFromString(data[idx:-16]) for record in ar.records: record_data = record.data print(record_data) else: print(data)
1日分のデータを取得してみたが、とりあえず全件ちゃんと取れている。
もっと楽に解決できたかも
ここまでプログラムを書いて、KCL(Kinesis Client Library)とかkinesis-aggregationとかライブラリを見つけて、Deaggregationしてくれるとか書いてあるので、げんなり。