Java并行流的使用及速度比较
前言
当我第一次听到流这个概念时,对他模糊不清,后来逐步使用后,才知晓流的含义,而当再次听说还有并行流的时候,那就更是一头雾水,所以今天来介绍一下并行流,首先还需要知道什么是流,用一段话概括就是:依次处理集合中的数据,这里的依次不是说依次遍历集合中的元素,然后处理,而是依次安不同的方式处理集合中的数据,比如操作某个集合,首先找出大于5的数,对其构建出新的集合,并且打印出来,那么使用流就可以这样做:
fun main() {
val randoms = Stream
.generate { Random().nextInt(10) }
.limit(10)
.collect(Collectors.toList())
randoms.stream()
.filter { it>5 }
.collect(Collectors.toList())
.forEach (System.out::println)
}
这是两段代码,第一段用来生成10个0-9的随机数,第二个将完成上述我们的需求。
而并行流,可以理解为多线程处理流中的数据。
基本用法
我们还拿上面的例子来说,转换为并行流也比较简单,调用parallel即可。
randoms.stream()
.parallel()
.filter { it>5 }
.collect(Collectors.toList())
.forEach (System.out::println)
但是貌似这样做,体会不到有什么不一样的地方呀,我们可以打印出他的线程名称,
randoms.stream()
.parallel()
.filter {
println(Thread.currentThread().name);
it > 5
}
.collect(Collectors.toList())
.forEach(System.out::println)
输出结果如下:
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-6
6
7
7
8
9
可以发现在通过filter过滤的时候,会有很多线程来处理,而且也包括main线程。
而并行流是使用ForkJoin来管理线程的,默认线程数是处理器的数量,可以通过下面方式获取,在我电脑上,他是8。
println(Runtime.getRuntime().availableProcessors())
除此之外,还可以通过下面参数更改,但他是全局性的,我们无法对不同流设置设置不同线程数量。
-D java.util.concurrent.ForkJoinPool.common.parallelism=4
速度测试
ForkJoin的思想是将一个大任务拆分成若干个小任务,直到不可再拆时,再将一个个小的任务运算的结果进行汇总,这也是ForkJoin名称的由来,Fork是分支的意思,Join则是汇总。
下面我们看下他们的速度,由于10个太小了,换为1百万个。
fun main() {
val randoms = Stream.generate { Random().nextInt(10) }.limit(1000000).collect(Collectors.toList())
runTest {
randoms.stream().filter { it > 5 }.collect(Collectors.toList())
}
runTest {
var mutableListOf = mutableListOf<Int>()
randoms.forEach {
if (it > 5) {
mutableListOf.add(it)
}
}
}
runTest {
randoms.stream().parallel().filter { it > 5 }.collect(Collectors.toList())
}
}
fun runTest(function: () -> Unit) {
val start = System.currentTimeMillis()
function()
println(System.currentTimeMillis() - start)
}
普通流 | 并行流 | 普通方法 |
---|---|---|
20 | 52 | 25 |
这都是运行好几次取他们最大的,可以发现,普通流最快,并行流最慢,并行流慢的原因也很简单,原因是并行流解决问题总是要比顺序流的工作多。并行流开销涉到将工作分配到多个线程,并且进行结果和并。
另一方面使用多个线程处理时,还需要初始化线程池。此外,如果有竞争,并行处理的性能可能会持续下降。
但如果再把数据量扩大几倍,结果就会反过来,比如当随机数是1亿时,结果如下。
普通流 | 并行流 | 普通方法 |
---|---|---|
957 | 789 | 1082 |
另外,当我们操作网络时候,这有是另一回事了,比如下面这样。
object TestStream {
@JvmStatic
fun download(url: String) {
var bytes = URL(StringBuffer(url).append("?id="+Random().nextInt(10000)).toString()).openStream().readBytes()
Files.write(Paths.get("/home/HouXinLin/pic", UUID.randomUUID().toString()), bytes)
}
}
fun main() {
val randoms = mutableListOf(
"https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g7/M00/01/00/ChMkLGJGYbyIbjIrABQ1DysNhHMAACEDwLHNV8AFDUn770.jpg",
"https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g7/M00/01/00/ChMkK2JGYcOIPDZqAB868hHvfqcAACEEAB3iKYAHzsK046.jpg",
"https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g5/M00/0C/05/ChMkJ14dLNeIfJBuAAZKuwc_TagAAwWUAJPWVAABkrT442.jpg",
"https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g5/M00/0C/05/ChMkJl4dNbmIS-1MAAYHKJ-tub0AAwWXACrfHkABgdA472.jpg",
"https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g7/M00/01/00/ChMkLGJGYbyIbjIrABQ1DysNhHMAACEDwLHNV8AFDUn770.jpg",
"https://desk-fd.zol-img.com.cn/t_s1920x1080c5/g7/M00/01/00/ChMkK2JGYcOIPDZqAB868hHvfqcAACEEAB3iKYAHzsK046.jpg",
)
var threadPoolExecutor = ThreadPoolExecutor(4, 4, 1, TimeUnit.HOURS, LinkedBlockingQueue())
runTest {
randoms.stream().parallel().forEach(TestStream::download)
}
runTest {
randoms.stream().parallel().forEach {
threadPoolExecutor.execute { TestStream.download(it) }
}
threadPoolExecutor.shutdown()
threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS)
}
runTest {
randoms.stream().forEach(TestStream::download)
}
}
fun runTest(function: () -> Unit) {
val start = System.currentTimeMillis()
function()
println(System.currentTimeMillis() - start)
}
这其实就是比那种方法管理的线程池最合适了,肯定最后一种方式最慢,但这个测试还需要网络原因,家里网不稳定,一会快一会慢,没办法给出并行流快还是自定义线程池快,另外这也是需要看情况的,用不用并行流或者还是使用自定义线程池也说不准,最好办法还是多测试。
这也与任务类型有关,比如是CPU密集型还是IO密集型,总之,并行编程,难!
- END -
原文始发于微信公众号(十四个字节):Java并行流的使用及速度比较