KafkaJavaAPIクライアントでメッセージを送受信する
Kafka Java クライアント
開発環境
Maven で構築することにします。pom は以下です。
ソフトウェア | バージョン |
---|---|
OS | Windows 10 |
JDK | 1.8.0_51 |
Maven | 3.3.9 |
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.chapter4</groupId>
<artifactId>firstapp</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Producer
Properties クラスに設定をします。
Broker の接続先を '"bootstrap.servers"' に記載します。メッセージはシリアライズした状態で Kafka-Broker へ送信するため、Key と Value をシリアライザでシリアライズします。Kafka に用意されているシリアライザを用います。
Properties のパラメータを元に KafkaProducer インスタンスを生成します。
設定
FirstAppProducer.java
Properties conf = new Properties(); conf.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092"); conf.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); conf.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<Integer, String> producer = new KafkaProducer<>(conf);
Kafka へメッセージを送信する場合は、ProducerRecord インスタンスを生成して、送信するメッセージを格納します。メッセージの Key, Value と Topic を設定します。
ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, key, value);
メッセージ送信
send メソッドを用いて、非同期でメッセージを送信します。send メソッドを呼び出すと KafkaProducer の送信キューにメッセージを送信します。送信キューのメッセージはユーザスレッドとは別に順次送信処理がされます。メッセージが正しく送信された場合に Kafka クラスタから Ack が返却されます。 Callback クラスが Ack を受け取ると、onCompletion の処理がされます。
このとき以下のように設定していると、metadata は null でないが、 offset = -1 となって、Kafka クラスタのデータを参照するとメッセージが格納されていませんでした。
conf.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.33.31:9092,192.168.33.32:9092,192.168.33.33:9092");
producer.send(record, new org.apache.kafka.clients.producer.Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (metadata != null) { String infoString = String.format( "Success partition:%d, offser:%d", metadata.partition(), metadata.offset()); System.out.println(infoString); } else { String infoString = String.format("Failed:%s", e.getMessage()); System.err.println(infoString); } } });
close メソッドを呼び出すことで KafkaProducer をクローズします。このとき送信キューにあるメッセージはすべて Kafka クラスタへ送信されます。
producer.close();
Consumer
設定
Producer のときと同様に Properties に設定します。group.id
は適当な名称にしておきます。enable.auto.commit
についてはデフォルトでは true
になっていて 5000 ms ごとに commit されます。ここでは手動で commit することにします。
Properties conf = new Properties(); conf.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka-1:9092,kafka-2:9092,kafka-3:9092"); conf.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer"); conf.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); conf.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FirstAppConsumerGroup"); conf.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); Consumer<Integer, String> consumer = new KafkaConsumer<>(conf);
受信
Consumer は複数の Topic を subscribe することができます。topicList に複数の topicName を add すれば可能です。
List<String> topicList = new ArrayList<>();
topicList.add(topicName);
consumer.subscribe(topicList);
Topic を subscribe した後、メッセージを受信します。poll メソッドを呼び出すことでメッセージを受信します。メッセージは ConsumerRecords オブジェクトで渡されます。ConsumerRecords は Iterable を実装していて、それぞれのメッセージを拡張 for 文で取得できます。
ConsumerRecords<Integer, String> records = consumer.poll(1);
受信したそれぞれのメッセージごとに Offset Commit をしています。
for (ConsumerRecord<Integer, String> record : records) { String msgString = String.format("key:%d, value:%s, topic:%s, partition:%d, offset:%d", record.key(), record.value(), record.topic(), record.partition(), record.offset()); System.out.println(msgString); TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1); Map<TopicPartition, OffsetAndMetadata> commitInfo = Collections.singletonMap(topicPartition, offsetAndMetadata); consumer.commitSync(commitInfo); } try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); }
close メソッドを呼び出すことで KafkaProducer の送信キューに残っているメッセージが送信されます。
consumer.close();
Consumer を起動させたおき、Producer からメッセージを送信すると以下のようにコンソールに表示されました。パーティションごとに Offset が連続になって取得できていることがわかります。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. key:2, value:2, topic:first-app, partition:2, offset:91 key:5, value:5, topic:first-app, partition:2, offset:92 key:6, value:6, topic:first-app, partition:2, offset:93 key:1, value:1, topic:first-app, partition:0, offset:81 key:7, value:7, topic:first-app, partition:0, offset:82 key:8, value:8, topic:first-app, partition:0, offset:83 key:3, value:3, topic:first-app, partition:1, offset:98 key:4, value:4, topic:first-app, partition:1, offset:99 key:9, value:9, topic:first-app, partition:1, offset:100 key:10, value:10, topic:first-app, partition:1, offset:101 key:2, value:2, topic:first-app, partition:2, offset:94 key:5, value:5, topic:first-app, partition:2, offset:95 key:6, value:6, topic:first-app, partition:2, offset:96 key:1, value:1, topic:first-app, partition:0, offset:84 key:7, value:7, topic:first-app, partition:0, offset:85 key:8, value:8, topic:first-app, partition:0, offset:86 key:3, value:3, topic:first-app, partition:1, offset:102 key:4, value:4, topic:first-app, partition:1, offset:103 key:9, value:9, topic:first-app, partition:1, offset:104 key:10, value:10, topic:first-app, partition:1, offset:105
まとめ
Kafka API を利用して、Kafka Broker とメッセージを送受信することができました。