KafkaConnectを試す その2
Kafka Connect
前回は Kafka Connect を利用してローカルファイルシステムのファイルを Kafka に連携しましたが、今回は RDB(PostgreSQL) と Kafka、Kafka と S3 で連携したいと思います。
構成
Source 側
Database の作成
yum -y install postgresql-server postgresql-setup initdb echo "listen_addresses = '*'" >> /var/lib/pgsql/data/postgresql.conf cp /var/lib/pgsql/data/pg_hba.conf /var/lib/pgsql/data/pg_hba.conf.org cat << EOF > /var/lib/pgsql/data/pg_hba.conf # PostgreSQL Client Authentication Configuration File # =================================================== local all all trust host all all 127.0.0.1/32 trust host all all 192.168.0.0/16 md5 EOF systemctl restart postgresql
psql -U postgres -W -c "CREATE DATABASE testdb"; CREATE TABLE test_table ( seq bigint PRIMARY KEY, item varchar(256) ); CREATE USER connectuser with password 'connectuser'; GRANT ALL ON test_table TO connectuser; INSERT INTO test_table(seq, item) VALUES (1, 'hoge'); INSERT INTO test_table(seq, item) VALUES (2, 'apple');
load する Kafka Connect の設定
cp -pv /etc/kafka/connect-distributed.properties /etc/kafka/connect-distributed-2.properties
vim /etc/kafka/connect-distributed-2.properties ###bootstrap.servers=localhost:9092 bootstrap.servers=kafka-2:9092 ###group.id=connect-cluster group.id=connect-cluster-datahub-2
cat << EOF > test_db.json { "name": "load-db-sample-data", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url" : "jdbc:postgresql://kafka-1:5432/testdb", "connection.user" : "connectuser", "connection.password" : "connectuser", "mode" : "incrementing", "incrementing.column.name" : "seq", "table.whitelist" : "test_table", "topic.prefix" : "db_", "tasks.max" : "1" } } EOF curl -X POST -H "Content-Type: application/json" http://kafka-2:8083/connectors -d @test_db.json curl http://kafka-2:8083/connectors
Kafka Connect を起動
connect-distributed /etc/kafka/connect-distributed-2.properties
別のサーバから Kafka の topic を確認すると、データが格納されていることがわかります。
[root@kafka-3 ~]# kafka-console-consumer --bootstrap-server kafka-2:9092 --topic db_test_table --from-beginning {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":1,"item":"hoge"}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":2,"item":"apple"}}
この状態で kafka-1 の PostgreSQL にデータを Insert します。
psql -h kafka-1 -p 5432 -d testdb -U connectuser -c "INSERT INTO test_table(seq, item) VALUES (3, 'banana');" Password for user connectuser: INSERT 0 1
Kafka にデータが append されていることがわかりました。
[root@kafka-3 ~]# kafka-console-consumer --bootstrap-server kafka-2:9092 --topic db_test_table --from-beginning {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":1,"item":"hoge"}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":2,"item":"apple"}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":3,"item":"banana"}}
Sink 側
Source 側の取り込みができましたので、今度は Sink 側の取り込みをしてみます。
S3の作成
S3 は AWS のコンソールから作成しました。
AWS のクレデンシャルを以下のディレクトリに作成しておきます。
mkdir ~/.aws cat << EOF > ~/.aws/credentials [default] aws_access_key_id = ******************** aws_secret_access_key = ******************** EOF chmod 600 ~/.aws/credentials
Sink する Kafka Connect の設定
cat << EOF > test_db_sink.json { "name": "sink-db-sample-data", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "s3.bucket.name" : "test-kafka-connect1234", "s3.region" : "ap-northeast-1", "storage.class" : "io.confluent.connect.s3.storage.S3Storage", "format.class" : "io.confluent.connect.s3.format.json.JsonFormat", "flush.size" : 1, "topics" : "db_test_table", "tasks.max" : "1" } } EOF curl -X POST -H "Content-Type: application/json" http://kafka-2:8083/connectors -d @test_db_sink.json curl http://kafka-2:8083/connectors
PGPASSWORD=connectuser psql -h kafka-1 -p 5432 -d testdb -U connectuser -c "INSERT INTO test_table(seq, item) VALUES (4, 'grape');" INSERT 0 1
S3 に Json で書き込みされていることがわかりました。
C:\Users\User>aws s3 ls s3://test-kafka-connect1234/topics/db_test_table/partition=0/ C:\Program Files\Amazon\AWSCLI\.\dateutil\parser\_parser.py:1177: UnicodeWarning: Unicode equal comparison failed to convert both arguments to Unicode - interpreting them as being unequal 2019-03-20 23:54:19 24 db_test_table+0+0000000000.json 2019-03-20 23:54:21 25 db_test_table+0+0000000001.json 2019-03-20 23:54:22 26 db_test_table+0+0000000002.json 2019-03-20 23:54:25 25 db_test_table+0+0000000003.json
curl -X DELETE http://kafka-2:8083/connectors/load-db-sample-data curl -X DELETE http://kafka-2:8083/connectors/sink-db-sample-data curl http://kafka-2:8083/connectors