TL;DR
ForkJoinPool
是实现了work stealing的线程池,其中所有线程都是daemon thread。
几个栗子🌰
1 | // Java 7+ |
1 | // Java 8+ |
1 | // Scala |
三个例子分别是:
- 使用
CompletableFuture
组合异步计算:其中每个以Async结尾的方法,都再次将任务提交给了线程池,大概率会在另外一个线程中执行; - 使用
.parallel()
并行处理Stream:.sum()
对并行Stream有优化,可以提升效率; - 使用
.par
将Vector
变成并行的进行fold
:Monoid满足结合律,所以可以并行fold
。
承载这些异步、并行计算的线程池,默认会使用一个JVM为我们生成的ForkJoinPool
,可以用ForkJoinPool.commonPool()
得到实例。
除此之外,Scala中的scala.concurrent.Future
一般会使用到scala.concurrent.ExecutionContext.Implicits.global
,而后者就是包装了这个common pool。
ForkJoinPool是个线程池?
没错,ForkJoinPool
实现了ExecutorService
接口,是个线程池。
对于common pool,我们可以看见它的默认大小:
1 | // CPU: i7-7920HQ(4 Cores, 8 Threads @3.10 GHz): |
相比其他线程池,ForkJoinPool
有两个显著的特点:
- 所有线程都是daemon thread,这意味着当JVM停止时,所有未完成的任务会直接终中断;
- work stealing策略:每个线程会有自己的deque来维护任务列表,当自己的deque空了,会从别的thread队列(或者是pool级别的deque)末尾偷任务过来执行。换句话说,每个线程都会尽可能地不让自己空闲下来。
比较好的实践
- 不要提交可能阻塞的任务,这样可能阻塞所有线程,使整个线程池无法响应;
- 如果一定要这么做(
ForkJoinPool
会先增大并发度(加线程),再处理):- Java用
managedBlock(ForkJoinPool.ManagedBlocker blocker)
来替代submit
、execute
等 - Scala的
Future
使用scala.concurrent.blocking
- Java用
- 如果要sync,使用
CompletableFuture
的join()
或者CountDownLatch
。
另一种用法
上面描述的用法,主要在利用了work stealing以及common pool。
仔细浏览一下ForkJoinPool
的API,会发现有一个长得很像的类ForkJoinTask
。
1 | public final ForkJoinTask<V> fork() |
ForkJoinTask
有这么几个特点:
- 可以对分治递归进行建模,拆分出来的子任务让线程池去处理;
- 子任务可以依赖父任务,任务间依赖构成DAG;
- 可以当成
Future
使用。
具体实践的例子可以参考这里Guide to the Fork/Join Framework in Java。
不过这种并发模型挺诡异的,可维护性也比较差。如果能够使用别的模型,还是尽量不要用这个。