技術メモ

技術メモ

ラフなメモ

KafkaJavaAPIクライアントでメッセージを送受信する

Kafka Java クライアント

開発環境

Maven で構築することにします。pom は以下です。

ソフトウェア バージョン
OS Windows 10
JDK 1.8.0_51
Maven 3.3.9

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example.chapter4</groupId>
    <artifactId>firstapp</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.1</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Producer

Properties クラスに設定をします。
Broker の接続先を '"bootstrap.servers"' に記載します。メッセージはシリアライズした状態で Kafka-Broker へ送信するため、Key と Value をシリアライザでシリアライズします。Kafka に用意されているシリアライザを用います。
Properties のパラメータを元に KafkaProducer インスタンスを生成します。

設定

FirstAppProducer.java

       Properties conf = new Properties();

        conf.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
        conf.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        conf.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<Integer, String> producer = new KafkaProducer<>(conf);

Kafka へメッセージを送信する場合は、ProducerRecord インスタンスを生成して、送信するメッセージを格納します。メッセージの Key, Value と Topic を設定します。

           ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, key, value);

メッセージ送信

send メソッドを用いて、非同期でメッセージを送信します。send メソッドを呼び出すと KafkaProducer の送信キューにメッセージを送信します。送信キューのメッセージはユーザスレッドとは別に順次送信処理がされます。メッセージが正しく送信された場合に Kafka クラスタから Ack が返却されます。 Callback クラスが Ack を受け取ると、onCompletion の処理がされます。

このとき以下のように設定していると、metadata は null でないが、 offset = -1 となって、Kafka クラスタのデータを参照するとメッセージが格納されていませんでした。

        conf.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.33.31:9092,192.168.33.32:9092,192.168.33.33:9092");
           producer.send(record, new org.apache.kafka.clients.producer.Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (metadata != null) {
                        String infoString = String.format(
                                "Success partition:%d, offser:%d",
                                metadata.partition(), metadata.offset());
                        System.out.println(infoString);
                    } else {
                        String infoString = String.format("Failed:%s", e.getMessage());
                        System.err.println(infoString);
                    }
                }
            });

close メソッドを呼び出すことで KafkaProducer をクローズします。このとき送信キューにあるメッセージはすべて Kafka クラスタへ送信されます。

        producer.close();

Consumer

設定

Producer のときと同様に Properties に設定します。group.id は適当な名称にしておきます。enable.auto.commit についてはデフォルトでは true になっていて 5000 ms ごとに commit されます。ここでは手動で commit することにします。

       Properties conf = new Properties();
        conf.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka-1:9092,kafka-2:9092,kafka-3:9092");
        conf.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer");
        conf.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        conf.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FirstAppConsumerGroup");
        conf.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        
        Consumer<Integer, String> consumer = new KafkaConsumer<>(conf);

受信

Consumer は複数の Topic を subscribe することができます。topicList に複数の topicName を add すれば可能です。

       List<String> topicList = new ArrayList<>();
        topicList.add(topicName);
        consumer.subscribe(topicList);

Topic を subscribe した後、メッセージを受信します。poll メソッドを呼び出すことでメッセージを受信します。メッセージは ConsumerRecords オブジェクトで渡されます。ConsumerRecords は Iterable を実装していて、それぞれのメッセージを拡張 for 文で取得できます。

           ConsumerRecords<Integer, String> records = consumer.poll(1);

受信したそれぞれのメッセージごとに Offset Commit をしています。

           for (ConsumerRecord<Integer, String> record : records) {
                String msgString = String.format("key:%d, value:%s, topic:%s, partition:%d, offset:%d", record.key(), record.value(), record.topic(), record.partition(), record.offset());
                System.out.println(msgString);

                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
                Map<TopicPartition, OffsetAndMetadata> commitInfo = Collections.singletonMap(topicPartition, offsetAndMetadata);
                consumer.commitSync(commitInfo);
            }
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }

close メソッドを呼び出すことで KafkaProducer の送信キューに残っているメッセージが送信されます。

        consumer.close();

Consumer を起動させたおき、Producer からメッセージを送信すると以下のようにコンソールに表示されました。パーティションごとに Offset が連続になって取得できていることがわかります。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
key:2, value:2, topic:first-app, partition:2, offset:91
key:5, value:5, topic:first-app, partition:2, offset:92
key:6, value:6, topic:first-app, partition:2, offset:93
key:1, value:1, topic:first-app, partition:0, offset:81
key:7, value:7, topic:first-app, partition:0, offset:82
key:8, value:8, topic:first-app, partition:0, offset:83
key:3, value:3, topic:first-app, partition:1, offset:98
key:4, value:4, topic:first-app, partition:1, offset:99
key:9, value:9, topic:first-app, partition:1, offset:100
key:10, value:10, topic:first-app, partition:1, offset:101
key:2, value:2, topic:first-app, partition:2, offset:94
key:5, value:5, topic:first-app, partition:2, offset:95
key:6, value:6, topic:first-app, partition:2, offset:96
key:1, value:1, topic:first-app, partition:0, offset:84
key:7, value:7, topic:first-app, partition:0, offset:85
key:8, value:8, topic:first-app, partition:0, offset:86
key:3, value:3, topic:first-app, partition:1, offset:102
key:4, value:4, topic:first-app, partition:1, offset:103
key:9, value:9, topic:first-app, partition:1, offset:104
key:10, value:10, topic:first-app, partition:1, offset:105

まとめ

Kafka API を利用して、Kafka Broker とメッセージを送受信することができました。

参考書籍