技術メモ

学んだことをメモしていくブログ...

JavaでいろいろなEcho Server/Clientを作る

JavaのSocket通信を試してみたいと思い、いろいろなEcho Server/Clientを実装しました。

Socket通信に関連するクラスは java.net にまとまっています。

ブロッキングI/O+シングルスレッド

サーバ側

Server

public class EchoServer {

    public static void main(String[] args) throws IOException {
        int portNumber = 8888;

        try (
            ServerSocket serverSocket = new ServerSocket(portNumber);
        ) {
            System.out.printf("Server running. port->%d\n", portNumber);
            while (true) {
                try (
                    Socket clientSocket = serverSocket.accept();
                    PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
                    BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                    ) {
                    System.out.printf("[%s] :Accepted!\n", Thread.currentThread().getName());
                }
            }
        }
    }
}

ソケットの作成

まず、ソケットを作成します。Javaのソケットは以下で作成できます。

ServerSocket serverSocket = new ServerSocket(portNumber);

ServerSocket (Java Platform SE 8)

受信する接続のキューの最大長はデフォルト値だと50です。キューが埋まっている場合に接続要求が来ると、接続は拒否されます。受信接続キューの長さは以下のメソッドで定義できます。

public ServerSocket(int port, int backlog) throws IOException

ServerSocket (Java Platform SE 8)

accept処理

クライアントからの処理は、Socketクラスのインスタンスに対して行います。

Socket clientSocket = serverSocket.accept();

ServerSocket (Java Platform SE 8)

ソケットに対する接続要求を待機します。ServerSocket#setSoTimeoutで指定した時間が経過するか、またはクライアントからのリクエストを受信するまで待ち状態となります。ServerSocket#setSoTimeoutでの待ち時間の指定がない場合は無制限に待ちます。

クライアント側

クライアント側の実装例です。

Client

public class Sample {

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                try {
                    new EchoClient().connect();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            thread.start();
        }
    }
}

public class EchoClient {

    public void connect() throws UnknownHostException, IOException {
        String hostName = "localhost";
        int portNumber = 8888;

        try (
            Socket echoSocket = new Socket(hostName, portNumber);
        ) {
            System.out.printf("[%s] :connected!\n", Thread.currentThread().getName());
        }
    }
}

ソケットの作成

以下でソケットを作成できます。

Socket echoSocket = new Socket(hostName, portNumber);

簡易のため、そのクライアントはソケットのアウトプットストリームには何も入力せず、単に接続するだけのクライアントです。

動作確認

サーバを起動します。

$ java sample/EchoServer
Server running. port->8888

クライアントから接続します。

$ java sample/Sample
[Thread-5] :connected!
[Thread-2] :connected!
[Thread-0] :connected!
[Thread-1] :connected!
[Thread-9] :connected!
[Thread-4] :connected!
[Thread-7] :connected!
[Thread-6] :connected!
[Thread-3] :connected!
[Thread-8] :connected!

サーバ側には以下のように出力されます。

Sun May 12 22:20:30 JST 2019 [main] :Accepted!
Sun May 12 22:20:31 JST 2019 [main] :Accepted!
Sun May 12 22:20:32 JST 2019 [main] :Accepted!
Sun May 12 22:20:33 JST 2019 [main] :Accepted!
Sun May 12 22:20:34 JST 2019 [main] :Accepted!
Sun May 12 22:20:35 JST 2019 [main] :Accepted!
Sun May 12 22:20:36 JST 2019 [main] :Accepted!
Sun May 12 22:20:37 JST 2019 [main] :Accepted!
Sun May 12 22:20:38 JST 2019 [main] :Accepted!
Sun May 12 22:20:39 JST 2019 [main] :Accepted!

1秒ごとに逐次処理されていることが分かります。
ひとまずブロッキングI/Oを用いたソケット通信ができました。続いてノンブロッキングI/Oを用いて、複数のクライアントから並行して処理をできるようにしてみます。

ブロッキングI/O+マルチスレッド

サーバ側

並行してクライアントからの処理を受け付けるようにするために、サーバ側で ServerSocket#accetp() した後は別スレッドでソケット通信するようにします。

イメージ図は以下になります。

f:id:tutuz:20190517215909p:plain

https://www.techscore.com/page_attachments/0000/0513/MultiServerSocket.png より引用

public class EchoMultiServer {

    public static void main(String[] args) throws IOException {
        int portNumber = 8888;

        try (
            ServerSocket serverSocket = new ServerSocket(portNumber);
        ) {
            System.out.printf("Server running. port->%d\n", portNumber);
            while (true) {
                try (
                    Socket clientSocket = serverSocket.accept();
                    PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
                    BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                    ) {
                    new EchoThread(clientSocket).start();
                }
            }
        }
    }

}

class EchoThread extends Thread {

    private Socket socket;

    public EchoThread(Socket socket) {
        this.socket = socket;
        System.out.printf("%s [%s] :Accepted!\n", new Date(), Thread.currentThread().getName());
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s [%s] :Process finished.\n", new Date(), Thread.currentThread().getName());
    }
}

スレッド生成

スレッドを生成する方法は大きく2つの方法があります。

  1. クラスをThreadのサブクラスであると宣言すること
    Thread#run() をオーバーライドし、そのThreadのサブクラスを Thread#start() することで生成できます。
  2. Runnableインタフェースを実装するクラスを宣言すること
    Thread#run() を実装します。実装したクラスを Thread コンストラクタの引数として渡し、Thread#start() することで生成できます。

今回は 1. クラスをThreadのサブクラスであると宣言すること の方法を試してみました。

class EchoThread extends Thread {

    private Socket socket;

    public EchoThread(Socket socket) {
        this.socket = socket;
        System.out.printf("%s [%s] :Accepted!\n", new Date(), Thread.currentThread().getName());
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s [%s] :Process finished.\n", new Date(), Thread.currentThread().getName());
    }
}

動作確認

サーバを起動します。

$ java sample/EchoMultiServer
Server running. port->8888

クライアントから接続します。

$ java sample/Sample
[Thread-8] :connected!
[Thread-3] :connected!
[Thread-1] :connected!
[Thread-6] :connected!
[Thread-5] :connected!
[Thread-7] :connected!
[Thread-0] :connected!
[Thread-4] :connected!
[Thread-9] :connected!
[Thread-2] :connected!

サーバ側には以下のように出力されます。

Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:21 JST 2019 [main] :Accepted!
Fri May 17 22:23:22 JST 2019 [Thread-0] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-1] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-2] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-3] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-0] :Socket[addr=/127.0.0.1,port=56691,localport=8888] closed.
Fri May 17 22:23:22 JST 2019 [Thread-3] :Socket[addr=/127.0.0.1,port=56692,localport=8888] closed.
Fri May 17 22:23:22 JST 2019 [Thread-9] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-8] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-2] :Socket[addr=/127.0.0.1,port=56695,localport=8888] closed.
Fri May 17 22:23:22 JST 2019 [Thread-7] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-6] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-1] :Socket[addr=/127.0.0.1,port=56690,localport=8888] closed.
Fri May 17 22:23:22 JST 2019 [Thread-4] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-5] :Process finished.
Fri May 17 22:23:22 JST 2019 [Thread-4] :Socket[addr=/127.0.0.1,port=56693,localport=8888] closed.
Fri May 17 22:23:22 JST 2019 [Thread-6] :Socket[addr=/127.0.0.1,port=56698,localport=8888] closed.
Fri May 17 22:23:22 JST 2019 [Thread-7] :Socket[addr=/127.0.0.1,port=56697,localport=8888] closed.
Fri May 17 22:23:22 JST 2019 [Thread-8] :Socket[addr=/127.0.0.1,port=56696,localport=8888] closed.
Fri May 17 22:23:22 JST 2019 [Thread-9] :Socket[addr=/127.0.0.1,port=56699,localport=8888] closed.
Fri May 17 22:23:22 JST 2019 [Thread-5] :Socket[addr=/127.0.0.1,port=56694,localport=8888] closed.

並行してクライアントからのソケット通信ができていることが分かります。
1つ問題があって、サーバ側の実装で直接Threadを生成しています。
このクライアントプログラムを複数回動作させると以下のようになります。

結果

Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:40 JST 2019 [main] :Accepted!
Fri May 17 22:30:41 JST 2019 [Thread-160] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-161] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-165] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-164] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-160] :Socket[addr=/127.0.0.1,port=57162,localport=8888] closed.
Fri May 17 22:30:41 JST 2019 [Thread-162] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-162] :Socket[addr=/127.0.0.1,port=57164,localport=8888] closed.
Fri May 17 22:30:41 JST 2019 [Thread-163] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-164] :Socket[addr=/127.0.0.1,port=57165,localport=8888] closed.
Fri May 17 22:30:41 JST 2019 [Thread-168] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-165] :Socket[addr=/127.0.0.1,port=57167,localport=8888] closed.
Fri May 17 22:30:41 JST 2019 [Thread-169] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-169] :Socket[addr=/127.0.0.1,port=57168,localport=8888] closed.
Fri May 17 22:30:41 JST 2019 [Thread-167] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-166] :Process finished.
Fri May 17 22:30:41 JST 2019 [Thread-161] :Socket[addr=/127.0.0.1,port=57166,localport=8888] closed.
Fri May 17 22:30:41 JST 2019 [Thread-166] :Socket[addr=/127.0.0.1,port=57170,localport=8888] closed.
Fri May 17 22:30:41 JST 2019 [Thread-167] :Socket[addr=/127.0.0.1,port=57171,localport=8888] closed.
Fri May 17 22:30:41 JST 2019 [Thread-168] :Socket[addr=/127.0.0.1,port=57169,localport=8888] closed.
Fri May 17 22:30:41 JST 2019 [Thread-163] :Socket[addr=/127.0.0.1,port=57163,localport=8888] closed.
...

つまり、スレッド数は管理していないので、新しいスレッドが生成されていることが分かります。
いつかjava.lang.OutOfMemoryErrorなどになり死んでしまいそうです。
ソケット通信をcloseした(TCPとしてはTIME_WAITでCLOSEDになるソケット)Threadについては、使いまわしたいです。

ThreadPoolを使って改善してみます。

ブロッキングI/O+マルチスレッド(スレッド数固定)

サーバ側

public class EchoMultiFixedServer {

    static final int MAX_THREADS = 20;
    static final int PORT_NUMBER = 8888;

    public static void main(String[] args) throws IOException {

        ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);

        try (
            ServerSocket serverSocket = new ServerSocket(PORT_NUMBER);
        ) {
            System.out.printf("Server running. port->%d\n", PORT_NUMBER);
            while (true) {
                try (
                    Socket clientSocket = serverSocket.accept();
                    PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
                    BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                    ) {
                    executor.submit(new EchoThread(clientSocket));
                }
            }
        }
    }
}

Runnableインターフェースを実装したEchoTask.javaです。

public class EchoTask implements Runnable {

    private Socket socket;

    public EchoTask(Socket socket) {
        this.socket = socket;
        System.out.printf("%s [%s] :Accepted!\n", new Date(), Thread.currentThread().getName());
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("%s [%s] :Process finished.\n", new Date(), Thread.currentThread().getName());

        try {
            socket.close();
            System.out.printf("%s [%s] :%s closed.\n", new Date(), Thread.currentThread().getName(), socket.toString());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

ThreadPoolは以下で生成できます。ThreadPoolの種類はいくつかあります。それぞれの種類の動作はこちらにまとめておきました。

今回はスレッド数の上限を固定する newFixedThreadPool(int nThreads) を使います。上限数は20にしておきました。

ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);

動作確認

クライアント側結果

$ java sample/Sample
[Thread-2] :connected!
[Thread-7] :connected!
[Thread-8] :connected!
[Thread-9] :connected!
[Thread-1] :connected!
[Thread-0] :connected!
[Thread-5] :connected!
[Thread-6] :connected!
[Thread-4] :connected!
[Thread-3] :connected!

$ java sample/Sample
[Thread-1] :connected!
[Thread-9] :connected!
[Thread-4] :connected!
[Thread-3] :connected!
[Thread-6] :connected!
[Thread-0] :connected!
[Thread-5] :connected!
[Thread-8] :connected!
[Thread-2] :connected!
[Thread-7] :connected!

$ java sample/Sample
[Thread-0] :connected!
[Thread-1] :connected!
[Thread-6] :connected!
[Thread-2] :connected!
[Thread-4] :connected!
[Thread-3] :connected!
[Thread-5] :connected!
[Thread-9] :connected!
[Thread-7] :connected!
[Thread-8] :connected!

サーバ側結果

$ java sample/EchoMultiFixedServer
Server running. port->8888
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:03 JST 2019 [main] :Accepted!
Fri May 17 23:01:04 JST 2019 [pool-1-thread-1] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-4] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-3] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-2] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-3] :Socket[addr=/127.0.0.1,port=59595,localport=8888] closed.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-10] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-9] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-4] :Socket[addr=/127.0.0.1,port=59599,localport=8888] closed.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-8] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-7] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-1] :Socket[addr=/127.0.0.1,port=59598,localport=8888] closed.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-5] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-6] :Process finished.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-5] :Socket[addr=/127.0.0.1,port=59596,localport=8888] closed.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-7] :Socket[addr=/127.0.0.1,port=59601,localport=8888] closed.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-8] :Socket[addr=/127.0.0.1,port=59602,localport=8888] closed.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-9] :Socket[addr=/127.0.0.1,port=59603,localport=8888] closed.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-10] :Socket[addr=/127.0.0.1,port=59604,localport=8888] closed.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-2] :Socket[addr=/127.0.0.1,port=59600,localport=8888] closed.
Fri May 17 23:01:04 JST 2019 [pool-1-thread-6] :Socket[addr=/127.0.0.1,port=59597,localport=8888] closed.
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:05 JST 2019 [main] :Accepted!
Fri May 17 23:01:06 JST 2019 [pool-1-thread-12] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-14] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-13] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-11] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-13] :Socket[addr=/127.0.0.1,port=59611,localport=8888] closed.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-20] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-14] :Socket[addr=/127.0.0.1,port=59610,localport=8888] closed.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-19] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-18] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-17] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-16] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-12] :Socket[addr=/127.0.0.1,port=59609,localport=8888] closed.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-15] :Process finished.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-16] :Socket[addr=/127.0.0.1,port=59612,localport=8888] closed.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-17] :Socket[addr=/127.0.0.1,port=59614,localport=8888] closed.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-18] :Socket[addr=/127.0.0.1,port=59615,localport=8888] closed.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-19] :Socket[addr=/127.0.0.1,port=59616,localport=8888] closed.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-20] :Socket[addr=/127.0.0.1,port=59613,localport=8888] closed.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-11] :Socket[addr=/127.0.0.1,port=59607,localport=8888] closed.
Fri May 17 23:01:06 JST 2019 [pool-1-thread-15] :Socket[addr=/127.0.0.1,port=59608,localport=8888] closed.
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:07 JST 2019 [main] :Accepted!
Fri May 17 23:01:08 JST 2019 [pool-1-thread-3] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-5] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-7] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-1] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-4] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-1] :Socket[addr=/127.0.0.1,port=59622,localport=8888] closed.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-7] :Socket[addr=/127.0.0.1,port=59625,localport=8888] closed.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-5] :Socket[addr=/127.0.0.1,port=59620,localport=8888] closed.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-6] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-10] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-2] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-9] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-3] :Socket[addr=/127.0.0.1,port=59619,localport=8888] closed.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-8] :Process finished.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-9] :Socket[addr=/127.0.0.1,port=59626,localport=8888] closed.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-2] :Socket[addr=/127.0.0.1,port=59627,localport=8888] closed.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-10] :Socket[addr=/127.0.0.1,port=59624,localport=8888] closed.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-6] :Socket[addr=/127.0.0.1,port=59628,localport=8888] closed.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-4] :Socket[addr=/127.0.0.1,port=59621,localport=8888] closed.
Fri May 17 23:01:08 JST 2019 [pool-1-thread-8] :Socket[addr=/127.0.0.1,port=59623,localport=8888] closed.

たしかにスレッド数が20に抑えられていることが分かります。良さそうです。

ノンブロッキングI/O+シングルスレッド

ノンブロッキングI/Oとは

ブロッキングI/OとノンブロッキングI/Oのイメージについては以下の図がわかりやすいです。

takezoe.hatenablog.com

定義についてはこちらの定義で理解しておきます。

3.245 Non-Blocking A property of an open file description that causes function calls involving it to return without delay when it is detected that the requested action associated with the function call cannot be completed without unknown delay.

つまりI/Oの呼び出しについて、I/Oが遅延なしに完了しなければ、呼び出しそのものは遅延なしに呼び出しからリターンするという性質です。

サーバ側

java.nio パッケージを使ってノンブロッキングI/Oを実現することができます。java.nioについてはこちらにまとめました。

さて、ノンブロッキングI/Oのサーバ側実装サンプルですが、以下はほぼまんま techscore さんのコードではあります...

public class NonBlockingChannelEchoServer {

    private static final int ECHO_PORT = 8888;
    private static final int BUF_SIZE = 1000;

    private Selector selector;

    public static void main(String[] args) {
        new NonBlockingChannelEchoServer().run();
    }

    public void run() {
        ServerSocketChannel serverChannel = null;
        try {
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(new InetSocketAddress(ECHO_PORT));
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.printf("NonBlockingChannelEchoServer is running... port->%d\n", serverChannel.socket().getLocalPort());
            while (selector.select() > 0) {
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = it.next();
                    if (selectionKey.isAcceptable()) {
                        doAccept((ServerSocketChannel) selectionKey.channel());
                    } else if (selectionKey.isReadable()) {
                        doRead((SocketChannel) selectionKey.channel());
                    }
                    it.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverChannel != null && serverChannel.isOpen()) {
                try {
                    System.out.printf("NonBlockingChannelEchoServer is stopping.\n");
                    serverChannel.close();
                } catch (IOException e) {
                }
            }
        }
    }

    private void doAccept(ServerSocketChannel serverChannel) {
        try {
            SocketChannel channel = serverChannel.accept();
            String remoteAddress = channel.socket().getRemoteSocketAddress().toString();
            System.out.printf("%s [%s] :Accepted! remoteAddress -> %s\n", new Date(), Thread.currentThread().getName(), remoteAddress);
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void doRead(SocketChannel channel) {
        ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
        Charset charset = Charset.forName("UTF-8");
        String remoteAddress = channel.socket().getRemoteSocketAddress().toString();
        try {
            if (channel.read(buf) < 0) {
                return;
            }
            buf.flip();
            System.out.print(remoteAddress + ":" + charset.decode(buf).toString());
            buf.flip();
            channel.write(buf);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            System.out.printf("%s [%s] :Closed. remoteAddress -> %s\n", new Date(), Thread.currentThread().getName(), remoteAddress);
            try {
                channel.close();
            } catch (IOException e) {
            }
        }
    }
}

クライアント側

testとChannelにwriteするクライアントを作成しました。

public class WriteClient {

    final String HOSTNAME = "localhost";
    final int PORT_NUMBER = 8888;

    public void connect() throws UnknownHostException, IOException {

        try (
            Socket echoSocket = new Socket(HOSTNAME, PORT_NUMBER);
            PrintWriter out = new PrintWriter(echoSocket.getOutputStream(), true);
            BufferedReader in = new BufferedReader(new InputStreamReader(echoSocket.getInputStream()));
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
        ) {
            String userInput = "test";
            out.println(userInput);
        }
    }
}
public class Sample {

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                try {
                    new WriteClient().connect();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            thread.start();
        }
    }
}

SelectorとChannelの生成

SelectorとChannelの生成は以下の内容です。

   selector = Selector.open();
    serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);
    serverChannel.socket().bind(new InetSocketAddress(ECHO_PORT));
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);

Selector#open はSelectorオブジェクトを生成するためのファクトリメソッドで、このメソッドを用いてSelectorオブジェクトを生成します。

ServerSocketChannelオブジェクトを生成して、ノンブロッキングやバインドの設定をし、SelectorにServerSocketChannelオブジェクトを登録します。

SelectionKeyの取得とChannel取得

以下で操作可能な SelectionKey のイテレータを取得してacceptやreadの処理をしています。Selector/SelectionKeyの扱いについてはこちらにまとめました。

   while (selector.select() > 0) {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> it = selectedKeys.iterator();
        while (it.hasNext()) {
            SelectionKey selectionKey = it.next();
            if (selectionKey.isAcceptable()) {
                doAccept((ServerSocketChannel) selectionKey.channel());
            } else if (selectionKey.isReadable()) {
                doRead((SocketChannel) selectionKey.channel());
            }
            it.remove();
        }
    }

動作確認

クライアント側

$ java sample/Sample

サーバ側

$ java nio/NonBlockingChannelEchoServer
NonBlockingChannelEchoServer is running... port->8888
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58507
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58510
/127.0.0.1:58507:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58507
/127.0.0.1:58510:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58510
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58509
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58508
/127.0.0.1:58509:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58509
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58512
/127.0.0.1:58508:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58508
/127.0.0.1:58512:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58512
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58514
/127.0.0.1:58514:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58514
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58511
/127.0.0.1:58511:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58511
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58513
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58515
/127.0.0.1:58513:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58513
Fri May 17 23:58:37 JST 2019 [main] :Accepted! remoteAddress -> /127.0.0.1:58516
/127.0.0.1:58515:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58515
/127.0.0.1:58516:test
Fri May 17 23:58:37 JST 2019 [main] :Closed. remoteAddress -> /127.0.0.1:58516

見かけ上は並列に処理しているように見えます。良さそうです。

まとめ

シングルスレッド+ブロッキングI/O、マルチスレッド+ブロッキングI/O、シングルスレッド+ノンブロッキングI/Oの3種類のEcho Server/Clientを実装しました。はじめはSocket通信を試してみるテーマでいろいろ調べていましたが、そこから派生して勉強になりました。

参考