使用並行收集器和虛擬執行緒進行平行收集處理
一、簡介
在上一篇文章中,我們介紹了parallel-collectors ,這是一個小型零依賴函式庫,它支援自訂執行緒池上的Stream
API 的平行處理。
Loom 專案是向 JVM 引入輕量級虛擬執行緒(以前稱為 Fibers)的有組織努力的代號,該專案在 JDK21 中最終確定。
讓我們看看如何在平行收集器中利用這一點。
2.Maven依賴
如果我們想開始使用該庫,我們需要在 Maven 的pom.xml
檔案中新增一個條目:
<dependency>
<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>3.0.0</version>
</dependency>
或 Gradle 建置檔中的一行:
compile 'com.pivovarit:parallel-collectors:3.0.0'
3. 作業系統執行緒與虛擬執行緒的並行處理
3.1.作業系統線程並行度
讓我們看看為什麼虛擬線程並行處理很重要。
我們將從創建一個簡單的範例開始。我們需要一個並行化操作,這將是一個人為延遲的String
連接:
private static String fetchById(int id) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore shamelessly
}
return "user-" + id;
}
我們還將使用自訂程式碼來測量執行時間:
private static <T> T timed(Supplier<T> supplier) {
var before = Instant.now();
T result = supplier.get();
var after = Instant.now();
log.info("Execution time: {} ms", Duration.between(before, after).toMillis());
return result;
}
現在,讓我們建立一個簡單的平行Stream
處理範例,其中我們建立n
元素,然後在n
並行度為n
的執行緒上處理它們:
@Test
public void processInParallelOnOSThreads() {
int parallelProcesses = 5_000;
var e = Executors.newFixedThreadPool(parallelProcesses);
var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
.collect(ParallelCollectors.parallel(i -> fetchById(i), toList(), e, parallelProcesses))
.join());
log.info("{}", result);
}
當我們運行它時,我們可以觀察到它顯然完成了工作,因為我們不需要等待 5000 秒才能得到結果:
Execution time: 1321 ms
[user-0, user-1, user-2, ...]
但讓我們看看如果我們嘗試將並行處理的元素數量增加到20_000
會發生什麼:
[2.795s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (...)
[2.795s][warning][os,thread] Failed to start the native thread for java.lang.Thread "pool-1-thread-16111"
基於作業系統執行緒的方法無法擴展,因為建立執行緒的成本很高,而且我們很快就會達到資源限制。
讓我們看看如果切換到虛擬線程會發生什麼。
3.2.虛擬線程並行性
在 Java 21 之前,為執行緒池配置提供合理的預設值並不容易。幸運的是,虛擬線程不需要任何線程——我們可以創建任意數量的線程,並且它們在共享 ForkJoinPool 實例上進行內部調度,使它們非常適合運行阻塞操作!
如果我們運行 Parallel Collectors 3.x,我們可以毫不費力地利用虛擬線程:
@Test
public void processInParallelOnVirtualThreads() {
int parallelProcesses = 5_000;
var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
.collect(ParallelCollectors.parallel(i -> fetchById(i), toList()))
.join());
}
正如我們所看到的,這就像省略executor
和parallelism
參數一樣簡單,因為虛擬執行緒是預設的執行實用程式。
如果我們嘗試運行它,我們可以看到它實際上比原始範例完成得更快:
Execution time: 1101 ms
[user-0, user-1, user-2, ...]
這是因為我們創建了 5000 個虛擬線程,這些線程是使用高度有限的作業系統線程集進行調度的。
讓我們嘗試將並行度增加到20_000
,這對於經典的Executor
來說是不可能的:
Execution time: 1219 ms
[user-0, user-1, user-2, ...]
這不僅成功執行,而且比作業系統執行緒上小 4 倍的作業完成得更快!
讓我們將並行度增加到 100_000 看看會發生什麼:
Execution time: 1587 ms
[user-0, user-1, user-2, ...]
儘管觀察到顯著的開銷,但工作得很好。
如果我們將並行等級增加到 1_000_000 會怎麼樣?
Execution time: 6416 ms
[user-0, user-1, user-2, ...]
2_000_000?
Execution time: 12906 ms
[user-0, user-1, user-2, ...]
5_000_000?
Execution time: 25952 ms
[user-0, user-1, user-2, ...]
正如我們所看到的,我們可以輕鬆擴展到作業系統執行緒無法實現的高水準並行性。除了較小並行工作負載的效能改進之外,這是利用虛擬執行緒並行處理阻塞操作的主要好處。
3.3.虛擬線程和舊版本的平行收集器
利用虛擬執行緒的最簡單方法是升級到該函式庫的最新版本,但如果不可能,我們也可以在 JDK21 上執行時使用 2.xy 版本來實現此目的。
技巧是手動提供Executors.newVirtualThreadPerTaskExecutor()
作為執行程序,並提供Integer.MAX_VALUE
作為最大並行度等級:
@Test
public void processInParallelOnVirtualThreadsParallelCollectors2() {
int parallelProcesses = 100_000;
var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
.collect(ParallelCollectors.parallel(
i -> fetchById(i), toList(),
Executors.newVirtualThreadPerTaskExecutor(), Integer.MAX_VALUE))
.join());
log.info("{}", result);
}
5. 結論
在本文中,我們有機會了解如何透過平行收集器庫輕鬆利用虛擬線程,事實證明,該庫的擴展性比傳統的基於作業系統線程的解決方案要好得多。我們的測試機器最終達到了約 16,000 個執行緒的資源限制,而很容易擴展到數百萬個虛擬執行緒。
與往常一樣,可以在 GitHub 上找到程式碼範例。