Kafka 検証環境構築とコンソールクライアントを用いたメッセージ疎通確認
参考書籍の第 3 章を参考に進めていきます。サーバは Vagrant で立ち上げ、各種コンソール作業は root で実施しました。
構築する環境
ソフトウェア | バージョン |
---|---|
OS | Centos7.6 |
JDK | 1.8.0_201 |
Kafka | 2.0.1-cp3 |
Zookeeper | 3.4.13 |
JDK
インストール
OracleJDKをダウンロードして、インストールします。
# curl -OL --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" https://download.oracle.com/otn-pub/java/jdk/8u201-b09/42970487e3af4f5aa5bca3f542482c60/jdk-8u201-linux-x64.rpm # rpm -ivh jdk-8u201-linux-x64.rpm
JAVA_HOME
を設定します。
# vim /etc/profile.d/java.sh export JAVA_HOME=/usr/java/default export PATH=$PATH:$JAVA_JOME/bin
Kafka
インストール
Confluent の Yum リポジトリを登録して、.repo ファイルを作成します。
# rpm --import https://packages.confluent.io/rpm/5.0/archive.key
# vim /etc/yum.repos.d/confluent.repo [Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/5.0/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.0/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/5.0/ gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/5.0/archive.key enabled=1
そうすると yum で confluent が提供している Kafka をインストールできます。
# yum clean all # yum -y install confluent-platform-oss-2.11
Kafka の設定
Broker のデータディレクトリを設定します。/etc/kafka/server.properties の log.dirs
を修正します。
# vim /etc/kafka/server.properties
# mkdir -p /var/lib/kafka/data # chown cp-kafka:confluent /var/lib/kafka/data/
Zookeeper の設定をします。 /etc/kafka/zookeeper.properties に以下の内容を追記します。
# vim /etc/kafka/zookeeper.properties initLimit=10 syncLimit=5 server.1=kafka-1:2888:3888 server.2=kafka-2:2888:3888 server.3=kafka-3:2888:3888
hosts に各 Kafka のホスト名を追記しておきます。それまでにあった 127.0.0.1 のホスト名はコメントアウトしておきます。
# vim /etc/hosts ##127.0.0.1 kafka-1 kafka-1 192.168.33.31 kafka-1 192.168.33.32 kafka-2 192.168.33.33 kafka-3
Kafka クラスタを構築するそれぞれのサーバを識別できるように myid を設定します。
# echo 1 | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid # echo 2 | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid # echo 3 | sudo -u cp-kafka tee -a /var/lib/zookeeper/myid
/etc/kafka/server.properties の broker.id
, broker.id.generation.enable
, zookeeper.connect
を修正します。
# vim /etc/kafka/server.properties broker.id=1 broker.id.generation.enable=false zookeeper.connect=kafka-1:2181,kafka-2:2181,kafka-3:2181
クラスタを起動させます。このとき Zookeeper から立ち上げます。
# systemctl start confluent-zookeeper # systemctl status confluent-zookeeper # systemctl start confluent-kafka # systemctl status confluent-kafka
疎通確認
Topic を作成します。ここでは first-test という名称になっています。
# kafka-topics --zookeeper kafka-1:2181,kafka-2:2181,kafka-3:2181 --create --topic first-test --partitions 3 --replication-factor 3 Created topic "first-test".
Topic が正しく作成されたか確認します。
# kafka-topics --zookeeper kafka-1:2181,kafka-2:2181,kafka-3:2181 --describe --topic first-test Topic:first-test PartitionCount:3 ReplicationFactor:3 Configs: Topic: first-test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2 Topic: first-test Partition: 1 Leader: 1 Replicas: 2,3,1 Isr: 1,3,2 Topic: first-test Partition: 2 Leader: 1 Replicas: 3,1,2 Isr: 1,3,2
上記の通り、Partition は全部で 3 つあって、それぞれの Partition ごとに Leader Replica が存在している Broker がわかります。この場合は各Partition の Leader Replica は Partition 1 に存在することがわかります。また Isr(In Sync Replica) に 1,3,2 と表示されていることから 3 つの Partition でレプリケートされていることがわかります。
Kufka Console Producer を用いて Producer が Topic に向けて Message を送信します。Consumer の起動させておけば、Producer から送信した Message が Consumer に表示されます。
# kafka-console-consumer --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --topic first-test # kafka-console-producer --broker-list kafka-1:9092, kafka-2:9092, kafka-3:9092 --topic first-test
Producer が送信した Message を Consumer が参照することができました。
# kafka-console-producer --broker-list kafka-1:9092, kafka-2:9092, kafka-3:9092 --topic first-test >Hello, Kafka! # kafka-console-consumer --bootstrap-server kafka-1:9092,kafka-2:9092,kafka-3:9092 --topic first-test Hello, Kafka!
まとめ
Kafka の検証構築をして、Kafka Console Producer / Kafka Console Consumer で Message の送受信をすることができました。