KPLでKinesisに投げたデータをboto3で取得してみた

KPLはKinesis Producer Libraryのこと。

受信したデータを処理してKinesisに流す処理をStormのトポロジ(Java)で実装した。で、Kinesisに入ったデータをPythonで処理するために、boto3を使ったのだが、思ったように動かず、試行錯誤したのでメモ。

Aggregated Recordのフォーマットドキュメントが古い

まず、boto3のAPIに書かれている通りに、Kinesisからデータを取得するプログラムを書いた

import boto3

for record in boto3.client("kinesis").get_records(...)["Records"]:
  data = record["Data"]
  print(data)

とすると、投入したデータが出てくると思ったが、実際はなにやら色々付いたものが出てきた。

少なくとも以下のようなパターンのデータを取得した。RECORD_DATAが実際に投げた文字列である。

  • \xf3\x89\x9a\xc2\n\nPARTITION_KEY\x1a...RECORD_DATA...
  • \xf3\x89\x9a\xc2\n\nPARTITION_KEY\n\nPARTITION_KEY\x1a...RECORD_DATA...
  • RECORD_DATA

どうやら、KPLを使うとデフォルトでAggregated Recordが有効になるらしく、上2つはKPL Aggregated Record Formatと突き合わせると、\xf3\x89\x9a\xc2というMagicNumberは一致するが、その後に続くデータはProtobufMessageではなかった。

\n\nPARTITION_KEYに出てくる値は、Aggregated RecordにまとめられたRECORD_DATAで指定されていたパーティションキーの集合のようだった。

それを読み飛ばして、\x1a以降がProtobufMessageで、末尾16バイトは確かにチェックサムだったので、どうやら正しくは下記の構造をしているようだった。

0               4                    M                  N          N+15
+---+---+---+---+---+---+....+---+---+==================+---+...+---+
|  MAGIC NUMBER | \n\nPARTITION KEY+ | PROTOBUF MESSAGE |    MD5    |
+---+---+---+---+---+---+....+---+---+==================+---+...+---+

なので、最終的にAggregated RecordからRECORD_DATAを取り出すために、PARTITION_KEYに\x1a が出てこないことを前提として、そしてこれからもProtobufMessageの始まりが\x1aであることを前提にして以下のようにした。

from generated.messages_pb2 import AggregatedRecord  # KPLのソースにある.protoファイルをコンパイルした

if data[:4] == "\xF3\x89\x9A\xC2":
  idx = data.index("\x1A")
  ar = AggregatedRecord()
  ar.ParseFromString(data[idx:-16])
  for record in ar.records:
      record_data = record.data
      print(record_data)
else:
  print(data)

1日分のデータを取得してみたが、とりあえず全件ちゃんと取れている。

もっと楽に解決できたかも

ここまでプログラムを書いて、KCL(Kinesis Client Library)とかkinesis-aggregationとかライブラリを見つけて、Deaggregationしてくれるとか書いてあるので、げんなり。