技術メモ

技術メモ

ラフなメモ

java.util.concurrentを試す

いくつか java.lang.Thread の使い方を見てきました。今回は java.util.concurrent にあるいくつかの仕組みを試してみようと思います。

全体像(の一部)

f:id:tutuz:20190504072153p:plain

動かす

まずは簡単に試してみます。

public class MyTask implements Runnable  {

    @Override
    public void run() {
        // doSomething.
        System.out.printf("[%s] :%s\n", Thread.currentThread().getName(), "doSomething.");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class Main {

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executor = Executors.newFixedThreadPool(5);
        try {
            executor.execute(new MyTask());
            executor.execute(new MyTask());
            executor.execute(new MyTask());
            Thread.sleep(500);
        } finally {
            executor.shutdown();
        }
    }
}

結果

[pool-1-thread-1] :start.
[pool-1-thread-3] :start.
[pool-1-thread-2] :start.

java.lang.Thread と同じように実行したいタスクを定義でき、実行することができます。java.util.concurrent にはExecutor Frameworkを含んでおり、柔軟なインターフェースに基づくタスク実行機構です。

Executor

送信されたRunnableタスクを実行するオブジェクトです。このインタフェースは、タスク送信を各タスクの実行方式(スレッドの使用やスケジューリングの詳細などを含む)から分離する方法を提供します。通常、Executorは、明示的にスレッドを作成するかわりに使用されます。

戻り値がないタスクを実行します。戻り値がある場合は Callable を実装するのが良さそうです。

ExecutorServices

終了を管理するメソッド、および1つ以上の非同期タスクの進行状況を追跡するFutureを生成できるメソッドを提供するExecutorです。

シャットダウンするためには shutdown()submit() のメソッドが用意されています。

ScheduledExecutorServices

指定された遅延時間後または定期的にコマンドを実行するようにスケジュールできるExecutorServiceです。

ExecutorServicesを拡張したクラスになっています。

Executors

このパッケージで定義されたExecutor、ExecutorService、ScheduledExecutorService、ThreadFactory、およびCallableクラス用のファクトリおよびユーティリティ・メソッドです。

ExecutorServiceを作るファクトリクラスです(他のファクトリクラスにもなっていますが)。主に以下の戦略でスレッドプールを生成することができます。

メソッド 説明
newCachedThreadPool 必要に応じ、新規スレッドを作成するが、利用可能な場合には以前に構築されたスレッドを再利用するスレッド・プール
newFixedThreadPool 固定数のスレッドを再利用するスレッド・プール
newScheduledThreadPool 指定された遅延時間後、または周期的にコマンドの実行をスケジュールできる、スレッド・プール
newWorkStealingPool 指定された並列性レベルをサポートするのに十分な数のスレッドを保持するスレッド・プールを作成し、場合によっては競合を減らすために複数のキューを使用するスレッドプール
newSingleThreadScheduledExecutor 指定された遅延時間後、または周期的にコマンドの実行をスケジュールできる、単一スレッドのexecutorを作成

newCachedThreadPool

  • 短期の非同期タスクを多数実行するプログラムのパフォーマンスを改善
  • executeを呼び出すと、以前に構築されたスレッドが利用可能であれば、それを再利用
  • 既存のスレッドが使用できない場合、新しい接続が作成され、プールに追加
  • 60秒間使用されなかったスレッドは、終了して、キャッシュから削除

スレッドが生成される場合

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            executor.execute(new MyTask());
            executor.execute(new MyTask());
            executor.execute(new MyTask());
            executor.execute(new MyTask());
            executor.execute(new MyTask());
            Thread.sleep(500);
        } finally {
            executor.shutdown();
            System.out.printf("[%s] :shutdowned = %s\n", Thread.currentThread().getName(), executor.isShutdown());
        }
    }
}

結果
5スレッドで並行して実行されていることが分かります。

[pool-1-thread-1] :start.
[pool-1-thread-5] :start.
[pool-1-thread-4] :start.
[pool-1-thread-3] :start.
[pool-1-thread-2] :start.
[main] :shutdowned = true

次にスレッドが再利用されている場合

public class Main {

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            executor.execute(new MyTask());
            Thread.sleep(500);
            executor.execute(new MyTask());
            Thread.sleep(500);
            executor.execute(new MyTask());
            Thread.sleep(500);
            executor.execute(new MyTask());
            Thread.sleep(500);
            executor.execute(new MyTask());
            Thread.sleep(500);
        } finally {
            executor.shutdown();
            System.out.printf("[%s] :shutdowned = %s\n", Thread.currentThread().getName(), executor.isShutdown());
        }
    }
}

結果
キャッシュされたスレッドが再利用された結果、合計で3スレッドしか生成されていません。

[pool-1-thread-1] :start.
[pool-1-thread-2] :start.
[pool-1-thread-3] :start.
[pool-1-thread-2] :start.
[pool-1-thread-3] :start.
[main] :shutdowned = true

newFixedThreadPool

  • 固定数のスレッド・プール
  • スレッドがアクティブな場合に、追加のタスクが送信されると、それらのタスクはスレッドが使用可能になるまでキューで待機
  • 実行中に発生した障害のために、いずれかのスレッドがシャットダウン前に終了した場合は、必要に応じて新規スレッドが引き継いで後続のタスクを実行
  • 明示的なshutdownが行われるまでは、スレッドはプール内に存在
  • 高負荷な環境で有用

[動かす]で見た例の実装になります。

newWorkStealingPool

  • work-stealingプールは、送信されたタスクの実行順序に関して非保証
public class Main {

    public static void main(String[] args) throws InterruptedException {

        System.out.printf("[%s] :CPU %s\n", Thread.currentThread().getName(), Runtime.getRuntime().availableProcessors());

        ExecutorService executor = Executors.newWorkStealingPool();
        try {
            for (int i = 0; i < 30; i++) {
                executor.execute(new MyTask());
            }
            Thread.sleep(3000);
        } finally {
            executor.shutdown();
            System.out.printf("[%s] :shutdowned = %s\n", Thread.currentThread().getName(), executor.isShutdown());
        }
    }
}

結果
検証したPCの使用可能なプロセッサ数が8であることから、8つのタスクまで並行して実行でき、実際にそのようにタスクが実行されています。

[main] :CPU 8
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-2] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-3] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-4] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-1] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-5] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-6] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-7] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-0] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-3] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-7] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-1] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-5] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-6] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-0] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-2] :start.
Sat May 04 11:55:23 JST 2019 [ForkJoinPool-1-worker-4] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-7] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-1] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-3] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-5] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-6] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-0] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-4] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-2] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-7] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-1] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-5] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-3] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-6] :start.
Sat May 04 11:55:24 JST 2019 [ForkJoinPool-1-worker-0] :start.
Sat May 04 11:55:26 JST 2019 [main] :shutdowned = true

newScheduledThreadPool

定期的なタスクを生成したスレッドプール内で実行します。

スケジュール用のタスククラスです。scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) を実装する必要があります。

public class MyScheduledTask {

    private final ScheduledExecutorService scheduler;

    public MyScheduledTask(ScheduledExecutorService scheduler) {
        this.scheduler = scheduler;
    }

    public void doSomething() {
        scheduler.scheduleAtFixedRate(() -> {
            System.out.printf("%s [%s] :%s\n", new Date(), Thread.currentThread().getName(), "something");
        }, 2, 1, TimeUnit.SECONDS);
    }

}
public class Main {

    public static void main(String[] args) throws InterruptedException {

        ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
        MyScheduledTask myScheduledTask = new MyScheduledTask(service);
        try {
            System.out.printf("%s [%s] :%s\n", new Date(), Thread.currentThread().getName(), "start.");
            myScheduledTask.doSomething();
            myScheduledTask.doSomething();
            myScheduledTask.doSomething();
            while (true) {
                Thread.sleep(3000);
            }
        } finally {
            service.shutdown();
        }
    }
}

結果
引数で指定された数のスレッドが再利用されていることが分かります。

Sat May 04 09:06:57 JST 2019 [main] :start.
Sat May 04 09:06:59 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:06:59 JST 2019 [pool-1-thread-2] :something
Sat May 04 09:06:59 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:07:00 JST 2019 [pool-1-thread-2] :something
Sat May 04 09:07:00 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:07:00 JST 2019 [pool-1-thread-2] :something
Sat May 04 09:07:01 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:07:01 JST 2019 [pool-1-thread-2] :something
Sat May 04 09:07:01 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:07:02 JST 2019 [pool-1-thread-2] :something
Sat May 04 09:07:02 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:07:02 JST 2019 [pool-1-thread-2] :something
Sat May 04 09:07:03 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:07:03 JST 2019 [pool-1-thread-2] :something
Sat May 04 09:07:03 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:07:04 JST 2019 [pool-1-thread-2] :something
Sat May 04 09:07:04 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:07:04 JST 2019 [pool-1-thread-2] :something

newSingleThreadScheduledExecutor

  • タスクが順番に実行されること、およびどの時点においても複数のタスクがアクティブになることはないことを保証
public class Main {

    public static void main(String[] args) throws InterruptedException {

        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
        MyScheduledTask myScheduledTask = new MyScheduledTask(service);
        try {
            System.out.printf("%s [%s] :%s\n", new Date(), Thread.currentThread().getName(), "start.");
            myScheduledTask.doSomething();
            myScheduledTask.doSomething();
            myScheduledTask.doSomething();
            while (true) {
                Thread.sleep(3000);
            }
        } finally {
            service.shutdown();
        }
    }
}

結果
すべて1つのスレッドでScheduledExecutorServiceが実行されていることが分かります。

Sat May 04 09:09:23 JST 2019 [main] :start.
Sat May 04 09:09:25 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:25 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:25 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:26 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:26 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:26 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:27 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:27 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:27 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:28 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:28 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:28 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:29 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:29 JST 2019 [pool-1-thread-1] :something
Sat May 04 09:09:29 JST 2019 [pool-1-thread-1] :something

参考