Semaphore(セマフォ)でスレッドの流量を制御する
概要
Semaphore(セマフォ)とはリソースへの同時アクセススレッド数を制御することができる機構です。Semaphoreには計量Semaphoreと二値Semaphoreがあります。java.util.concurrent.Semaphore
は計量Semaphoreです。
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/Semaphore.html
- 並行処理のスレッド数を制限したい
- 外部リソースへの同時アクセスを制限したい
といった場面で役に立つ概念です。
試してみる
java.util.concurrent.Semaphore
を試してみます。ポイントは以下の3つです。
- Semaphoreの作成
- Semaphoreのpermitの取得
- Semaphoreのpermitの解放
Semaphoreの作成
Semaphoreは以下のようにインスタンスを生成できます。インスタンスの引数でpermitsの初期値を設定します。これは初期値であって、semaphore.release
の呼び出し方によっては初期値以上のpermitsを保持することは可能です。
final int MAX_PERMITS = 3; Semaphore semaphore = new Semaphore(MAX_PERMITS);
Semaphoreのpermitの取得
acquire
や tryAcquire()
を使ってSemaphoreからpermitを取得します。
acquire()
の場合はpermitが利用可能またはスレッドが割込みされるまでブロックします。permitを得ることができなければ(割込みされるまで)無限時間待機することになります。
// acquiring the lock
semaphore.acquire();
tryAcquire
では、
- permitが利用可能になる場合
- スレッドに割込みがある
- 指定の待機時間が経過する
場合にブロックから開放されます。
例えば以下の実装例を考えてみます。この場合は5秒以内にNUMBER_NEED_PERMITのpermitsを取得できなければtryAcquireの結果がfalseになり、処理がスキップされます。実践的にはこのように実装されることが多いのではないでしょうか。
// acquiring the lock if (!semaphore.tryAcquire(NUMBER_NEED_PERMIT, 5, TimeUnit.SECONDS)) { log.debug("Task is not executed."); return; };
Semaphoreのpermitの解放
release(int permits)
を用いてpermitを解放することができます。指定された数のpermitを解放すると、利用可能なpermitの数がその分増えます。いくつかのスレッドがpermitを取得しようと試みている場合、その中の1つのスレッドが選択されて、解放されたばかりのpermitが与えられます。
こんな感じです。
@Slf4j public class Sample1 { public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); log.debug("semaphore.availablePermits() -> {}", semaphore.availablePermits()); try { semaphore.tryAcquire(2, 1, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("semaphore.availablePermits() -> {}", semaphore.availablePermits()); semaphore.release(1); log.debug("semaphore.availablePermits() -> {}", semaphore.availablePermits()); } }
結果
22:45:42.992 [main] DEBUG semaphore.Sample1 - semaphore.availablePermits() -> 3 22:45:42.996 [main] DEBUG semaphore.Sample1 - semaphore.availablePermits() -> 1 22:45:42.996 [main] DEBUG semaphore.Sample1 - semaphore.availablePermits() -> 2
実践的な例
実践的なProduer/Consumerモデルの実装例を上げておきます。Consumerはある処理をするタスクを別スレッドで呼び出すことにします。
QueueはLinkedBlockingQueueのFIFOキューとし、Producerが最初に10つのメッセージをキューにaddします。Consumerがキューからメッセージを取得して、後続タスクを非同期で実行します。同時に非同期処理できる数は3とし、これがSemaphoreのpermit数になります。
Producer.java
Consumer.java
Task.java
Test.java
結果
最大Permitsである3スレッドで同時にタスクが非同期で処理されていることがわかります。
23:50:46.470 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - Producer process start. 23:50:46.511 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 1 23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 2 23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 3 23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 4 23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 5 23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 6 23:50:46.512 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 7 23:50:46.513 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 8 23:50:46.513 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 9 23:50:46.513 [pool-1-thread-1] DEBUG semaphore.consumer.producer.Producer - add message -> 10 23:50:47.469 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - Consumer process start. 23:50:47.472 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 1 started. wait time is 1683 23:50:47.473 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Task - task 2 started. wait time is 5185 23:50:47.473 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 3 started. wait time is 3426 23:50:49.155 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 1 completed. 23:50:49.156 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 1 23:50:49.156 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 4 started. wait time is 1374 23:50:50.530 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 4 completed. 23:50:50.530 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 4 23:50:50.530 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 5 started. wait time is 1082 23:50:50.899 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 3 completed. 23:50:50.900 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 3 23:50:50.900 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 6 started. wait time is 3495 23:50:51.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 5 completed. 23:50:51.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 5 23:50:51.614 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 7 started. wait time is 4999 23:50:52.658 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Task - task 2 completed. 23:50:52.658 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 2 23:50:52.659 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Task - task 8 started. wait time is 4865 23:50:54.396 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 6 completed. 23:50:54.396 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 6 23:50:54.396 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 9 started. wait time is 2400 23:50:56.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 7 completed. 23:50:56.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 7 23:50:56.613 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 10 started. wait time is 1173 23:50:56.796 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Task - task 9 completed. 23:50:56.796 [ForkJoinPool.commonPool-worker-3] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 9 23:50:57.525 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Task - task 8 completed. 23:50:57.525 [ForkJoinPool.commonPool-worker-2] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 8 23:50:57.786 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Task - task 10 completed. 23:50:57.787 [ForkJoinPool.commonPool-worker-1] DEBUG semaphore.consumer.producer.Consumer - task completed. message is 10 23:51:01.798 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - no message. 23:51:06.798 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - no message. 23:51:11.799 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - no message. 23:51:16.800 [pool-1-thread-2] DEBUG semaphore.consumer.producer.Consumer - no message.
まとめ
Semaphoreを用いて、スレッド数を制限することができました。並行処理の流量を制限するためには強力な機能です。