KafkaConnectを試す
Kafka Connect とは?
- Apache Kafka に含まれるフレームワーク
- Kafka と他システムとのデータ連携に使う
- Kafka にデータをいれたり、Kafka からデータを出力したり
- スケーラブルなアーキテクチャで複数サーバでクラスタを組むことができる
- Connector インスタンスが複数のタスクを保持できる構造のため、スケーラブルなデータの入出力が可能になっている
- Kafka と Kafka Connect が接続する部分を Connector と呼ぶ
- Producer 側の Connector は Source
- Consumer 側の Connector は Sink
- いろいろなプラグインが公開されているので、接続先にあったプラグインがあれば自分で実装せずに Kafka へのデータ入出力が可能
- Confluent とかにプラグインの例がある
- Confluent Developer Guide などもある
- Source, Sink のプラグインでデータを入出力するときには、論理的なジョブが実行される
実装してみる
ローカルのファイルシステムを Kafka に連携
必要な Connector は以下
- FileSystem Connectors(Source)
- FileSystem Connectors(Sink)
テストデータ作成
kafka-1 # mkdir ~/local_put_sample kafka-1 # cat << EOF > ~/local_put_sample/test.txt ITEM001,SHOP001,929,2018-10-01 01:01:01 ITEM002,SHOP002,480,2018-10-01 01:02:03 ITEM003,SHOP001,25,2018-10-02 01:01:01 ITEM004,SHOP001,6903,2018-10-02 01:01:01 EOF
設定ファイルの config
kafka-1 # cp -pv /etc/kafka/connect-distributed.properties /etc/kafka/connect-distributed-1.properties kafka-1 # vim /etc/kafka/connect-distributed-1.properties ###bootstrap.servers=localhost:9092 bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092 ###group.id=connect-cluster group.id=connect-cluster-datahub-1
Kafka Connect の起動
kafka-1 # connect-distributed /etc/kafka/connect-distributed-1.properties
起動中の Connector プラグインを調べる
デフォルトでいろいろなプラグインが動いていることがわかります。
kafka-3 # curl http://kafka-1:8083/connector-plugins | jq [ { "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "type": "sink", "version": "5.0.2" }, { "class": "io.confluent.connect.hdfs.HdfsSinkConnector", "type": "sink", "version": "5.0.2" }, { "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector", "type": "source", "version": "2.0.1-cp3" }, { "class": "io.confluent.connect.jdbc.JdbcSinkConnector", "type": "sink", "version": "5.0.2" }, { "class": "io.confluent.connect.jdbc.JdbcSourceConnector", "type": "source", "version": "5.0.2" }, { "class": "io.confluent.connect.s3.S3SinkConnector", "type": "sink", "version": "5.0.2" }, { "class": "io.confluent.connect.storage.tools.SchemaSourceConnector", "type": "source", "version": "2.0.1-cp3" }, { "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "2.0.1-cp3" }, { "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source", "version": "2.0.1-cp3" } ]
JSON でデータをの POST
kafka-1 # echo ' { "name" : "load-sample-data", "config" : { "connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector", "file" : "/usr/local/source.txt", "tasks.max" : "1", "topic" : "sample-local" } } ' | curl -X POST -d @- http://kafka-1:8083/connectors --header "content-Type:application/json"
実行中の Kafka Connect の設定を確認
kafka-3 # curl http://kafka-1:8083/connectors ["load-sample-data"]
Kafka からローカルファイルシステムにデータを連携
連携定義の作成
REST API で投入する JSON を作成します。 test.json
cat << EOF > test.json { "name": "sink-sample-data", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "file": "/usr/local/dest.txt", "tasks.max" : "1", "topics": "sample-local" } } EOF
上記の JSON ファイルを使って Sink の定義を登録します。
# curl -X POST -H "Content-Type: application/json" http://kafka-1:8083/connectors -d @test.json
定義が Kafka Connector に登録されたか確認します。
# curl http://kafka-1:8083/connectors
定義は入ったものの、Kafka Connect を利用したファイルの準同期は確認できず。なぜかファイルの append が反映されない。
vim
でファイルを編集するとうまくいかない??ファイルの同期機構ではないので...
echo
などで /usr/local/source.txt
に追記すると想定通り /usr/local/dest.txt
への追記が確認できた。
# echo "Hello" >> /usr/local/source.txt # echo "Hi" >> /usr/local/source.txt # echo "Hello kafka local" >> /usr/local/source.txt
別コンソールで立ち上げておくと以下のように追記されていることがわかる。
# tail -f /usr/local/dest.txt Hello Hi Hello kafka local
Kafka-Broker に取り込まれたデータを確認
# kafka-console-consumer --bootstrap-server=kafka-1:9092 --topic sample-local --from -beginning {"schema":{"type":"string","optional":false},"payload":"Hello"} {"schema":{"type":"string","optional":false},"payload":"Hi"} {"schema":{"type":"string","optional":false},"payload":"Hello kafka local "} ^CProcessed a total of 3 messages
投入した定義を削除
# curl -X DELETE http://kafka-1:8083/connectors/load-sample-data # curl -X DELETE http://kafka-1:8083/connectors/sink-sample-data