技術メモ

技術メモ

ラフなメモ

Javaのjava.nioを試してみる

ノンブロッキングI/OをJavaで扱うにあたって、 java.nio について学んでおきます。

構成要素概要

  • Buffer
    • データのコンテナ
  • Channel
    • 入出力操作を実行できるエンティティへの接続を表す
  • Charset
  • Selector/SelectionKey

BufferとChannelの関係は以下です。

ChannelからBufferにデータを読み込むこともでき、BufferからChannelにデータを書き込むこともできます。

f:id:tutuz:20190518140818p:plain

http://tutorials.jenkov.com/images/java-nio/overview-channels-buffers.pngより引用

Buffer

特定のプリミティブ型データのコンテナです。 バッファは、特定のプリミティブ型要素のリニアで有限のシーケンスです。内容のほかに、容量、リミット、位置という必須プロパティがあります。

容量」は、そのバッファに含まれる要素数によって決定されます。容量の値は固定で、必ず正の値になります。

リミット」は、読み込みまたは書込みを行ってはならない最初の要素のインデックスです。容量以下の正の値になります。

位置」は、次に読み込みまたは書込みを行う要素のインデックスです。リミット以下の正の値になります。

非ブール型のプリミティブ型には、このクラスのサブクラスが1つずつ割り当てられています。

Bufferが書き込みの状態なのか読み込みの状態なのかプログラマーが制御しないといけないのが、かなりややこしいです。

Bufferを使う上でややこしいのが、そのBufferインスタンスを今「書き込みに使用しているのか」「読み取りに使用しているのか」をプログラマーが把握・意識して、それに応じた使い方(メソッド呼び出し)をしなければならない(制御する必要がある)こと。 また、同一メソッドでも、読み取りの場合と書き込みの場合で意味合いが異なるものがある。→メソッド一覧

サンプルでファイルをByteBufferからreadするプログラムを記載しました。

public static void main(String[] args) throws IOException {

    RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
    FileChannel inChannel = aFile.getChannel();

    // bufferは書き込める状態
    ByteBuffer buf = ByteBuffer.allocate(128);

    // チャネルからbufferに書き込む
    int bytesRead = inChannel.read(buf);
    while (bytesRead != -1) {

        // bufferを読み込み状態に変更
        buf.flip();

        while(buf.hasRemaining()){
            // 128バイトまでまとめて読み込む
            byte[] bytes = new byte[buf.remaining()];
            buf.get(bytes);
            System.out.println(new String(bytes));
        }

        // bufferを書き込める状態に変更
        buf.clear();

        // チャネルからbufferに書き込む
        bytesRead = inChannel.read(buf);
    }
    aFile.close();
}

Channel

java.nio.channels

入出力操作を実行できるエンティティ(ファイル、ソケットなど)への接続を表すチャネルや、多重化された非ブロック入出力操作用のセレクタを定義します。

チャネルは、ハードウェア・デバイス、ファイル、ネットワーク・ソケットのほか、1つ以上の個別の入出力操作(読み込み、書き込みなど)を実行できるプログラム・コンポーネントなどのエンティティへのオープン接続を表します。Channelインタフェースに指定されているとおり、チャネルの状態はオープンかクローズのどちらかです。どちらの状態のチャネルも、非同期クローズ可能かつ割込み可能です。

上記に記載したとおり、ChannelからBufferにデータを読み込むこともでき、BufferからChannelにデータを書き込むこともできます。

Channel はインターフェースになっていて、いくつかの実装クラスがあります。

たとえば ServerSocketChannel はストリーム型のリスニング・ソケット用のSelectableチャネル、FileChannel はファイルの読み込み、書き込み、マッピング、操作用チャネルです。

FileChannelには以下のメソッドがあります。

メソッド 説明
Channel#read ChannelからBufferに入れます。読取り位置は記録します。
Channel#write ChannelにBufferから入れます。書込み位置を記録します。

Charset

byteとUnicode文字の相互変換を行うため、文字セット、デコーダ、およびエンコーダを定義します。

Selector/SelectionKey

SelectableChannelオブジェクトのマルチプレクサです。

これだけではなんのことかにわかに分かりませんが、以下がわかりやすかったです。

http://tutorials.jenkov.com/java-nio/selectors.html

SelectorはNIO Channelインスタンスを調べて、どのチャネルが利用できるか判断するコンポーネントとあります。以下の図のイメージです。

f:id:tutuz:20190518122208p:plain

http://tutorials.jenkov.com/images/java-nio/overview-selectors.pngより引用

AbstractSelectableChannel#register() でSelectorにチャネルを登録することでSelectionKeyオブジェクトが生成されます。Selector#selectedKeys() で取得できるSelectionKeyの集合について、イテレータで順番にアクセスすることでそのチャネルを利用できます。

Selectorで複数のチャネルを管理することで、シングルスレッドにおいても複数チャネルの処理ができます

サンプルコードは以下です。

public class SelectorSample {

    public static void main(String[] args) {
        new SelectorSample().run();
    }

    public void run() {

        try (ServerSocketChannel channel = ServerSocketChannel.open();
                Selector selector = Selector.open()) {

            channel.configureBlocking(false);
            channel.bind(new InetSocketAddress(8888));
            channel.register(selector, channel.validOps());

            while (true) {

                int readyChannels = selector.selectNow();
                if (readyChannels == 0) continue;

                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();

                    // a connection was accepted by a ServerSocketChannel.
                    if (key.isAcceptable()) {
                        handleAcceptable(key);

                    // a connection was established with a remote server.
                    // for client app.
                    } else if (key.isConnectable()) {

                    // a channel is ready for reading
                    } else if (key.isReadable()) {
                        handleRead(key);

                    // a channel is ready for writing
                    } else if (key.isWritable()) {
                        handleWrite(key);
                    }

                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void handleAcceptable(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverChannel.accept();
        channel.configureBlocking(false);

        ByteBuffer buffer = ByteBuffer.allocate(8192);
        channel.register(key.selector(), SelectionKey.OP_READ, buffer);
    }

    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();

        int size = channel.read(buffer);

        buffer.flip();

        if (size > 0) {
            while (buffer.position() > 0) {
                buffer.flip();
                channel.write(buffer);
                buffer.compact();
            }

            key.cancel();
            channel.close();
        }
    }

    private void handleWrite(SelectionKey key) throws IOException {
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.flip();
        SocketChannel serverChannel = (SocketChannel) key.channel();
        while (buffer.hasRemaining()) {
            serverChannel.write(buffer);
        }
        buffer.compact();
    }
}

参考