技術メモ

技術メモ

ラフなメモ

Semaphore(セマフォ)でスレッドの流量を制御する

概要

Semaphore(セマフォ)とはリソースへの同時アクセススレッド数を制御することができる機構です。Semaphoreには計量Semaphoreと二値Semaphoreがあります。java.util.concurrent.Semaphore は計量Semaphoreです。

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/Semaphore.html

  • 並行処理のスレッド数を制限したい
  • 外部リソースへの同時アクセスを制限したい

といった場面で役に立つ概念です。

試してみる

java.util.concurrent.Semaphore を試してみます。ポイントは以下の3つです。

  1. Semaphoreの作成
  2. Semaphoreのpermitの取得
  3. Semaphoreのpermitの解放

Semaphoreの作成

Semaphoreは以下のようにインスタンスを生成できます。インスタンスの引数でpermitsの初期値を設定します。これは初期値であって、semaphore.release の呼び出し方によっては初期値以上のpermitsを保持することは可能です。

final int MAX_PERMITS = 3;
Semaphore semaphore = new Semaphore(MAX_PERMITS);

Semaphoreのpermitの取得

acquiretryAcquire() を使ってSemaphoreからpermitを取得します。

acquire() の場合はpermitが利用可能またはスレッドが割込みされるまでブロックします。permitを得ることができなければ(割込みされるまで)無限時間待機することになります。

    // acquiring the lock
    semaphore.acquire();

tryAcquire では、

  • permitが利用可能になる場合
  • スレッドに割込みがある
  • 指定の待機時間が経過する

場合にブロックから開放されます。

例えば以下の実装例を考えてみます。この場合は5秒以内にNUMBER_NEED_PERMITのpermitsを取得できなければtryAcquireの結果がfalseになり、処理がスキップされます。実践的にはこのように実装されることが多いのではないでしょうか。

    // acquiring the lock
    if (!semaphore.tryAcquire(NUMBER_NEED_PERMIT, 5, TimeUnit.SECONDS)) {
      log.debug("Task is not executed.");
      return;
    };

Semaphoreのpermitの解放

release(int permits) を用いてpermitを解放することができます。指定された数のpermitを解放すると、利用可能なpermitの数がその分増えます。いくつかのスレッドがpermitを取得しようと試みている場合、その中の1つのスレッドが選択されて、解放されたばかりのpermitが与えられます。

こんな感じです。

@Slf4j
public class Sample1 {
  public static void main(String[] args) {

    Semaphore semaphore = new Semaphore(3);

     log.debug("semaphore.availablePermits() -> {}", semaphore.availablePermits());

     try {
      semaphore.tryAcquire(2, 1, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    log.debug("semaphore.availablePermits() -> {}", semaphore.availablePermits());

    semaphore.release(1);

    log.debug("semaphore.availablePermits() -> {}", semaphore.availablePermits());
  }
}

結果

22:45:42.992 [main] DEBUG semaphore.Sample1 - semaphore.availablePermits() -> 3
22:45:42.996 [main] DEBUG semaphore.Sample1 - semaphore.availablePermits() -> 1
22:45:42.996 [main] DEBUG semaphore.Sample1 - semaphore.availablePermits() -> 2

実践的な例

実践的なProduer/Consumerモデルの実装例を上げておきます。Consumerはある処理をするタスクを別スレッドで呼び出すことにします。

QueueはLinkedBlockingQueueのFIFOキューとし、Producerが最初に10つのメッセージをキューにaddします。Consumerがキューからメッセージを取得して、後続タスクを非同期で実行します。同時に非同期処理できる数は3とし、これがSemaphoreのpermit数になります。

Producer.java

Consumer.java

Task.java

Test.java

結果
最大Permitsである3スレッドで同時にタスクが非同期で処理されていることがわかります。

23:50:46.470 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - Producer process start.
23:50:46.511 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 1
23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 2
23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 3
23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 4
23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 5
23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 6
23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 7
23:50:46.513 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 8
23:50:46.513 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 9
23:50:46.513 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 10
23:50:47.469 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - Consumer process start.
23:50:47.472 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 1 started. wait time is 1683
23:50:47.473 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Task - task 2 started. wait time is 5185
23:50:47.473 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 3 started. wait time is 3426
23:50:49.155 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 1 completed.
23:50:49.156 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 1
23:50:49.156 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 4 started. wait time is 1374
23:50:50.530 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 4 completed.
23:50:50.530 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 4
23:50:50.530 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 5 started. wait time is 1082
23:50:50.899 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 3 completed.
23:50:50.900 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 3
23:50:50.900 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 6 started. wait time is 3495
23:50:51.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 5 completed.
23:50:51.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 5
23:50:51.614 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 7 started. wait time is 4999
23:50:52.658 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Task - task 2 completed.
23:50:52.658 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 2
23:50:52.659 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Task - task 8 started. wait time is 4865
23:50:54.396 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 6 completed.
23:50:54.396 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 6
23:50:54.396 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 9 started. wait time is 2400
23:50:56.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 7 completed.
23:50:56.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 7
23:50:56.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 10 started. wait time is 1173
23:50:56.796 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 9 completed.
23:50:56.796 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 9
23:50:57.525 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Task - task 8 completed.
23:50:57.525 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 8
23:50:57.786 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 10 completed.
23:50:57.787 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 10
23:51:01.798 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - no message.
23:51:06.798 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - no message.
23:51:11.799 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - no message.
23:51:16.800 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - no message.

まとめ

Semaphoreを用いて、スレッド数を制限することができました。並行処理の流量を制限するためには強力な機能です。