[转]并行任务,线程池,工作窃取算法
记得在上个月,微博上有一则热议的新闻:小学数学老师布置作业,要求“数一亿粒米”
网友大多数是以吐槽的态度去看待这件事,也有人指出能用估算的方法,这是一道考察发散思维的题目。
一开始我也觉得这个题目很荒唐,似乎是不可能完成的任务。但这个细细想来值得玩味,我在思考一个问题:如果从计算机的角度去看,如何才能最快速地数一亿粒米呢?
首先我们先将问题简单地抽象一下:
1. 抽象过程
作为有煮饭经验的我来说,米中是存在一些杂质的,所以数米应该不仅仅是单纯的数数,其中还有一个判断是米还是杂质的过程。
那么可以将其视作一个长度为L的数组(L大于一亿),这个数组是随机生成的,但是满足数组的每个元素是一个整型类型的数字(0或1)。约定:元素如果为1,则视作有效的“米”;如果为0,则视作无效的“杂质”。
为了更快地完成计算,并行的效率应该是比串行来得高。
那么我们将一个人视作一个工作线程,全家一起数米的情景可以视作并发情况。
有了以上的铺垫,接下来就是最核心的问题,如何才能最快地数一亿粒米。我不妨假设以下的几种情景:
2. 情景一:串行
今天刚上小学四年级的小季放学回家,妈妈正在做饭,爸爸正在沙发上刷手机。
小季说:“妈妈,今天老师布置了一项作业,要数一亿粒米。”
妈妈:“找你爸去。”
爸爸:“?”
于是爸爸一个人开始数米,开启一个循环,遍历整个数组进行计算。
以下是单线程执行的代码。
首先定义一个计算接口:
1 2 3 |
public interface Counter { long count(int[] riceArray); } |
爸爸循环数米:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public class FatherCounter implements Counter { @Override public long count(int[] riceArray) { long total = 0; for (int i : riceArray){ if (i == 1) total += 1; if (total >= 1e8) break } return total; } } |
主函数:
1 2 3 4 5 6 7 8 9 10 11 |
public class FatherCounterMain { @Override public static void main(String[] args) { long length = (long) 1.2e8; int[] riceArray = createArray(length); Counter counter = new FatherCounter(); long startTime = System.currentTimeMillis(); long value = counter.count(riceArray); long endTime = System.currentTimeMillis(); System.out.println("消耗时间(毫秒):" + (endTime - startTime)); } |
最后的运算结果:
1 |
消耗时间(毫秒):190 |
我运行了多次,最后的消耗时间都在190ms左右。这个单线程循环计算平平无奇,没有什么值得深究的地方。由于大量的计算机资源都在闲置,我猜测,这肯定不是最优的解法。
3. 情景二:并行
3-1-1. 线程池ExecutorService
爸爸一个人数了一会,觉得自己一个人数米实在是太慢了,家里有这么多人,为什么不大家一起分摊一点任务呢?每个人数一部分,最后再合并。
于是小季全家总动员,一起来完成作业。
除去三大姑八大姨,现在到场的有爸爸、妈妈、哥哥、姐姐、爷爷、奶奶、外公、外婆八位主要家庭成员(8个CPU的计算机)。
小季说:既然要数1亿粒米,那么就你们每人数12500000粒米,然后再合并一起吧!
爸爸说:崽子,别想偷懒,我刚刚数过了,现在换你去,我来给你们分配任务。(主线程)
大家说干就干,各自埋头工作起来。
以下是使用ExecutorService方式的代码:
还是同一个接口:
1 2 3 |
public interface Counter { long count(int[] riceArray); } |
创建一个新的实现类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
public class FamilyCounter implements Counter{ private int familyMember; private ExecutorService pool; public FamilyCounter() { this.familyMember = 8; this.pool = Executors.newFixedThreadPool(this.familyMember); } private static class CounterRiceTask implements Callable<Long>{ private int[] riceArray; private int from; private int to; public CounterRiceTask(int[] riceArray, int from, int to) { this.riceArray = riceArray; this.from = from; this.to = to; } @Override public Long call() throws Exception { long total = 0; for (int i = from; i<= to; i++){ if (riceArray[i] == 1) total += 1; if (total >= 0.125e8) break; } return total; } } @Override public long count(int[] riceArray) { long total = 0; List<Future<Long>> results = new ArrayList<>(); int part = riceArray.length / familyMember; for (int i = 0; i < familyMember; i++){ results.add(pool.submit(new CounterRiceTask(riceArray, i * part, (i + 1) * part))); } for (Future<Long> j : results){ try { total += j.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException ignore) { } } return total; } } |
最终输出:
1 |
消耗时间(毫秒):46 |
我运行了多次,结果都在46ms左右,说明这个结果具有一般性。那么有一个问题来了,既然一个人数米花费了190ms,那么照理来说8个人同时工作,最终应该只需要190/8=23ms呀,为什么结果是46ms?
因为线程池、线程的创建以及结果的合并计算都是需要消耗时间的(因为我的计算机是8核,所以这里应该不存在线程切换带来的消耗)
假如小季请来更多的亲戚,能够以更快的速度数完一亿粒米吗?我猜不可以,反而会适得其反。我将线程池的核心线程数调至16,再次运行,输出结果为:
1 |
消耗时间(毫秒):62 |
可见线程之前的切换消耗了一定的资源,所以很多情况下并非“人多好办事”,人多所带来的团队协调等问题,可能会降低整个团队的工作效率。
到这里,小季已经颇为满意,毕竟计算时间从一开始的190ms,优化到现在的46ms,效率提升了四倍之多。但是爸爸眉头一锁,发现事情并没有这么简单,以他丰富的经验来看,此事还有蹊跷。
3-1-2. 线程池ForkJoinPool
在之前大家埋头数米的过程中,爸爸作为任务的分配者,也在观察着大家。
他发现,爷爷奶奶由于年纪大了,数米速度完全比不上眼疾手快的哥哥姐姐。哥哥姐姐完成自己的任务就出去玩了,最后只剩爷爷奶奶还在工作。年轻人居然不为老人分忧,成何体统!
小季(内心OS):爸爸,好像只有你一直在玩。
于是,爸爸在想能不能有一个算法,当线程池中的某个线程完成自己工作队列中的任务后,并不是直接挂起,而是能帮助其他线程。
有了,这不就是work-stealing算法吗?爸爸决定试试ForkJoinPool。
什么是工作窃取算法(work-stealing)呢?当我们需要完成一个很庞大的任务时(比如这里的数一亿粒米),我们可以将这个大任务分割为一些互不相关的子任务,为了减少线程间的竞争,将其放在线程的独立工作队列中。当某个线程完成自己工作队列中的任务时,可以从头部窃取其他线程的工作队列中的任务(双端队列,线程本身是从队列尾部获取任务处理,这样进一步避免了线程的竞争)就像下图:
如何划分子任务呢?Fork/Pool采用递归的形式,先将整个数组一分为二,分为left和right,然后对left和right进行相同的操作,直到数组的长度到达一个我们设定的阈值(这个阈值十分重要,可以影响程序的效率,假设为1000),然后对这个长度的数组进行计算,返回计算结果。上层的任务收到下层任务完成的消息后,开始执行,以此传递,直到任务全部完成。
以下是使用ForkJoinPool方式的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
public class TogetherCounter implements Counter { private int familyMember; private ForkJoinPool pool; private static final int THRESHOLD = 3000; public TogetherCounter() { this.familyMember = 8; this.pool = new ForkJoinPool(this.familyMember); } private static class CounterRiceTask extends RecursiveTask<Long> { private int[] riceArray; private int from; private int to; public CounterRiceTask(int[] riceArray, int from, int to) { this.riceArray = riceArray; this.from = from; this.to = to; } @Override protected Long compute() { long total = 0; if (to - from <= THRESHOLD){ for(int i = from; i < to; i++){ if (riceArray[1] == 1) total += 1; } return total; }else { int mid = (from + to) /2; CounterRiceTask left = new CounterRiceTask(riceArray, from, mid); left.fork(); CounterRiceTask right = new CounterRiceTask(riceArray, mid + 1, to); right.fork(); return left.join() + right.join(); } } } @Override public long count(int[] riceArray) { return pool.invoke(new CounterRiceTask(riceArray, 0, riceArray.length - 1)); } } |
当我把阈值设置在7000-8000的时候,计算时间缩短到了惊人的15ms,效率又提升了3倍之多!
1 |
消耗时间(毫秒):15 |
得到这个结果,爸爸十分满意。此时小季却疑惑了,同样是并行,为什么效率相差这么大呢?
爸爸摸着小季的头,说道:这个还是需要看具体的场景。并不是所有情况下,ForkJoinPool都比ExecutorService出色。
ForkJoinPool主要使用了分治法的思想。
它有两个最大的特点:
- 能够将一个大型任务分割成小任务,并以先进后出的规则(LIFO)来执行,在有些并发中,当任务需要按照一定的顺序来执行时,ForkJoin将发挥其能力。ExecutorService是无法做到的,因为ExecutorService不能决定任务的执行顺序。
- ForkJoinPool的偷窃算法,能够在应对任务量不均衡的情况下,或者任务完成存在快慢的情况下,使闲置的线程去帮助正在工作的线程,保证资源的利用率,并且减少线程间的竞争。
爸爸喝了口Java,继续说道:在这次测试中,由于分配给各个线程的任务数是相同的,而且每个任务都是非常简单的计算。按照我的理解,应该是线程启动先后的微小时间差异,导致有的线程留下了短暂的闲置时间,而这极短的时间也被利用起来了,所以才会看到最终的优化结果。若是更复杂的任务,效果将会更加明显。
此外,在JDK8中,ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。这也是为什么Arrays.sort()快排速度非常快的原因,因为引入了自动并行化(Automatic Parallelization)。
[resource]只有程序员才能完成的小学数学作业