技術メモ

技術メモ

ラフなメモ

KafkaConnectを試す

Kafka Connect とは?

  • Apache Kafka に含まれるフレームワーク
  • Kafka と他システムとのデータ連携に使う
    • Kafka にデータをいれたり、Kafka からデータを出力したり
  • スケーラブルなアーキテクチャで複数サーバでクラスタを組むことができる
    • Connector インスタンスが複数のタスクを保持できる構造のため、スケーラブルなデータの入出力が可能になっている
  • Kafka と Kafka Connect が接続する部分を Connector と呼ぶ
    • Producer 側の Connector は Source
    • Consumer 側の Connector は Sink
  • いろいろなプラグインが公開されているので、接続先にあったプラグインがあれば自分で実装せずに Kafka へのデータ入出力が可能
    • Confluent とかにプラグインの例がある
    • Confluent Developer Guide などもある
  • Source, Sink のプラグインでデータを入出力するときには、論理的なジョブが実行される

実装してみる

ローカルのファイルシステムを Kafka に連携

必要な Connector は以下

  • FileSystem Connectors(Source)
  • FileSystem Connectors(Sink)

テストデータ作成

kafka-1 # mkdir ~/local_put_sample
kafka-1 # cat << EOF > ~/local_put_sample/test.txt
ITEM001,SHOP001,929,2018-10-01 01:01:01
ITEM002,SHOP002,480,2018-10-01 01:02:03
ITEM003,SHOP001,25,2018-10-02 01:01:01
ITEM004,SHOP001,6903,2018-10-02 01:01:01
EOF

設定ファイルの config

kafka-1 # cp -pv /etc/kafka/connect-distributed.properties /etc/kafka/connect-distributed-1.properties

kafka-1 # vim /etc/kafka/connect-distributed-1.properties

###bootstrap.servers=localhost:9092
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092

###group.id=connect-cluster
group.id=connect-cluster-datahub-1

Kafka Connect の起動

kafka-1 # connect-distributed /etc/kafka/connect-distributed-1.properties

起動中の Connector プラグインを調べる

デフォルトでいろいろなプラグインが動いていることがわかります。

kafka-3 # curl http://kafka-1:8083/connector-plugins | jq
[
  {
    "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type": "sink",
    "version": "5.0.2"
  },
  {
    "class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "type": "sink",
    "version": "5.0.2"
  },
  {
    "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
    "type": "source",
    "version": "2.0.1-cp3"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "5.0.2"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "type": "source",
    "version": "5.0.2"
  },
  {
    "class": "io.confluent.connect.s3.S3SinkConnector",
    "type": "sink",
    "version": "5.0.2"
  },
  {
    "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
    "type": "source",
    "version": "2.0.1-cp3"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.0.1-cp3"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.0.1-cp3"
  }
]

JSON でデータをの POST

kafka-1 # echo '
{
  "name" : "load-sample-data",
  "config" : {
    "connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "file" : "/usr/local/source.txt",
    "tasks.max" : "1",
    "topic" : "sample-local"
  }
}
' | curl -X POST -d @- http://kafka-1:8083/connectors --header "content-Type:application/json"

実行中の Kafka Connect の設定を確認

kafka-3 # curl http://kafka-1:8083/connectors
["load-sample-data"]

Kafka からローカルファイルシステムにデータを連携

連携定義の作成

REST API で投入する JSON を作成します。 test.json

cat << EOF > test.json
{
  "name": "sink-sample-data",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", 
    "file": "/usr/local/dest.txt",
    "tasks.max" : "1",
    "topics": "sample-local"
  }
}
EOF

上記の JSON ファイルを使って Sink の定義を登録します。

# curl -X POST -H "Content-Type: application/json" http://kafka-1:8083/connectors -d @test.json

定義が Kafka Connector に登録されたか確認します。

# curl http://kafka-1:8083/connectors

定義は入ったものの、Kafka Connect を利用したファイルの準同期は確認できず。なぜかファイルの append が反映されない。 vim でファイルを編集するとうまくいかない??ファイルの同期機構ではないので...

echo などで /usr/local/source.txt に追記すると想定通り /usr/local/dest.txt への追記が確認できた。

# echo "Hello" >> /usr/local/source.txt
# echo "Hi" >> /usr/local/source.txt
# echo "Hello kafka local" >> /usr/local/source.txt

別コンソールで立ち上げておくと以下のように追記されていることがわかる。

# tail -f /usr/local/dest.txt
Hello
Hi
Hello kafka local

Kafka-Broker に取り込まれたデータを確認

# kafka-console-consumer --bootstrap-server=kafka-1:9092 --topic sample-local --from
-beginning
{"schema":{"type":"string","optional":false},"payload":"Hello"}
{"schema":{"type":"string","optional":false},"payload":"Hi"}
{"schema":{"type":"string","optional":false},"payload":"Hello kafka local "}
^CProcessed a total of 3 messages

投入した定義を削除

# curl -X DELETE http://kafka-1:8083/connectors/load-sample-data
# curl -X DELETE http://kafka-1:8083/connectors/sink-sample-data