技術メモ

技術メモ

ラフなメモ

KafkaConnectを試す その2

Kafka Connect

前回は Kafka Connect を利用してローカルファイルシステムのファイルを Kafka に連携しましたが、今回は RDB(PostgreSQL) と Kafka、Kafka と S3 で連携したいと思います。

構成

f:id:tutuz:20190321000705p:plain

Source 側

Database の作成

yum -y install postgresql-server
postgresql-setup initdb
echo "listen_addresses = '*'" >> /var/lib/pgsql/data/postgresql.conf
cp /var/lib/pgsql/data/pg_hba.conf /var/lib/pgsql/data/pg_hba.conf.org
cat << EOF > /var/lib/pgsql/data/pg_hba.conf
# PostgreSQL Client Authentication Configuration File
# ===================================================
local all all                trust
host  all all 127.0.0.1/32 trust
host  all all 192.168.0.0/16 md5
EOF
systemctl restart postgresql
psql -U postgres -W -c "CREATE DATABASE testdb";
CREATE TABLE test_table (
    seq bigint PRIMARY KEY,
    item varchar(256)
);
CREATE USER connectuser with password 'connectuser';
GRANT ALL ON test_table TO connectuser;
INSERT INTO test_table(seq, item) VALUES (1, 'hoge');
INSERT INTO test_table(seq, item) VALUES (2, 'apple');

load する Kafka Connect の設定

cp -pv /etc/kafka/connect-distributed.properties /etc/kafka/connect-distributed-2.properties
vim /etc/kafka/connect-distributed-2.properties

###bootstrap.servers=localhost:9092
bootstrap.servers=kafka-2:9092

###group.id=connect-cluster
group.id=connect-cluster-datahub-2
cat << EOF > test_db.json
{
  "name": "load-db-sample-data",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", 
    "connection.url" : "jdbc:postgresql://kafka-1:5432/testdb",
    "connection.user" : "connectuser",
    "connection.password" : "connectuser",
    "mode" : "incrementing",
    "incrementing.column.name" : "seq",
    "table.whitelist" : "test_table",
    "topic.prefix" : "db_",
    "tasks.max" : "1"
  }
}
EOF
curl -X POST -H "Content-Type: application/json" http://kafka-2:8083/connectors -d @test_db.json
curl http://kafka-2:8083/connectors

Kafka Connect を起動

connect-distributed /etc/kafka/connect-distributed-2.properties

別のサーバから Kafka の topic を確認すると、データが格納されていることがわかります。

[root@kafka-3 ~]# kafka-console-consumer --bootstrap-server kafka-2:9092 --topic db_test_table --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":1,"item":"hoge"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":2,"item":"apple"}}

この状態で kafka-1 の PostgreSQL にデータを Insert します。

psql -h kafka-1 -p 5432 -d testdb -U connectuser -c "INSERT INTO test_table(seq, item) VALUES (3, 'banana');"
Password for user connectuser:
INSERT 0 1

Kafka にデータが append されていることがわかりました。

[root@kafka-3 ~]# kafka-console-consumer --bootstrap-server kafka-2:9092 --topic db_test_table --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":1,"item":"hoge"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":2,"item":"apple"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"seq"},{"type":"string","optional":true,"field":"item"}],"optional":false,"name":"test_table"},"payload":{"seq":3,"item":"banana"}}

Sink 側

Source 側の取り込みができましたので、今度は Sink 側の取り込みをしてみます。

S3の作成

S3 は AWS のコンソールから作成しました。
AWS のクレデンシャルを以下のディレクトリに作成しておきます。

mkdir ~/.aws
cat << EOF > ~/.aws/credentials
[default]
aws_access_key_id = ********************
aws_secret_access_key = ********************
EOF
chmod 600 ~/.aws/credentials

Sink する Kafka Connect の設定

cat << EOF > test_db_sink.json
{
  "name": "sink-db-sample-data",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.bucket.name" : "test-kafka-connect1234",
    "s3.region" : "ap-northeast-1",
    "storage.class" : "io.confluent.connect.s3.storage.S3Storage",
    "format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
    "flush.size" : 1,
    "topics" : "db_test_table",
    "tasks.max" : "1"
  }
}
EOF
curl -X POST -H "Content-Type: application/json" http://kafka-2:8083/connectors -d @test_db_sink.json
curl http://kafka-2:8083/connectors
PGPASSWORD=connectuser psql -h kafka-1 -p 5432 -d testdb -U connectuser -c "INSERT INTO test_table(seq, item) VALUES (4, 'grape');"
INSERT 0 1

S3 に Json で書き込みされていることがわかりました。

C:\Users\User>aws s3 ls s3://test-kafka-connect1234/topics/db_test_table/partition=0/
C:\Program Files\Amazon\AWSCLI\.\dateutil\parser\_parser.py:1177: UnicodeWarning: Unicode equal comparison failed to convert both arguments to Unicode - interpreting them as being unequal
2019-03-20 23:54:19         24 db_test_table+0+0000000000.json
2019-03-20 23:54:21         25 db_test_table+0+0000000001.json
2019-03-20 23:54:22         26 db_test_table+0+0000000002.json
2019-03-20 23:54:25         25 db_test_table+0+0000000003.json
curl -X DELETE http://kafka-2:8083/connectors/load-db-sample-data
curl -X DELETE http://kafka-2:8083/connectors/sink-db-sample-data
curl http://kafka-2:8083/connectors