Java 8並行流中的自定義線程池
1.概述
Java 8引入了S treams
的概念, treams
是對數據執行批量操作的有效方法。並且可以在支持並發的環境中獲得併行Streams
。
這些流可以帶來更高的性能-以多線程開銷為代價。
在本快速教程中,我們將研究**Stream
API的最大局限之一,**並了解如何使並行流與自定義ThreadPool
實例一起使用-或者有一個庫來處理這個問題。
2.並行Stream
讓我們從一個簡單的示例開始-在任何Collection
類型上調用parallelStream
方法-這將返回一個可能的並行Stream
:
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
在此類Stream
中發生的默認處理使用ForkJoinPool.commonPool(),
這是整個應用程序共享的Thread Pool
。
3.自定義Thread Pool
實際上,在處理stream
時,我們可以傳遞自定義ThreadPool
。
下面的示例讓並行Stream
使用自定義Thread Pool
來計算1到1,000,000(含)之間的長值之和:
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
我們使用了並行度為4的ForkJoinPool
構造函數。需要進行一些實驗才能確定不同環境的最佳值,但是一個很好的經驗法則是根據CPU的核心數量來選擇數字。
接下來,我們處理了並行Stream
的內容,並在reduce
調用中將其匯總。
這個簡單的示例可能無法展示使用自定義Thread Pool
的全部用處,但是在我們不希望將通用Thread Pool
與長期運行的任務捆綁在一起的情況下(例如處理來自網絡源的數據),其好處顯而易見。 ,或者應用程序中的其他組件正在使用公共Thread Pool
。
4。結論
我們已經簡要介紹瞭如何使用自定義Thread Pool
運行並行Stream
。在正確的環境中,並通過適當使用並行度,在某些情況下可以提高性能。
可以在Github上找到本文引用的完整代碼示例。
0 條評論,你可以發表評論,我們會進行改進
