Java 中的並行化 for 循環
1. 概述
有時,我們可能需要在for
循環中處理大量元素。按順序執行此操作可能會花費大量時間,並使系統無法得到充分利用。
在本教程中,我們將學習在 Java 中並行化for
循環的不同方法,以提高此類情況下應用程序的性能。
2. 順序處理
讓我們首先看看如何在for
循環中順序處理元素並測量處理元素所需的時間。
2.1.使用for
循環進行順序處理
首先,我們將創建一個運行100
次的for
循環,並在每次迭代中執行大量操作。
繁重操作的常見示例是數據庫調用、網絡調用或 CPU 密集型操作。為了模擬繁重操作所花費的時間,讓我們在每次迭代中調用Thread.sleep()
方法:
`public class Processor {
public void processSerially() throws InterruptedException {
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
}
}
}`
在上面的代碼中,我們在每次迭代中調用Thread.sleep()
方法。這會導致執行暫停10
毫秒。當我們運行processSerially()
方法時,需要花費大量時間來順序處理元素。
我們將在接下來的部分中通過並行化for
循環來優化此方法。最後,我們將比較順序處理和並行處理所花費的時間。
3. 使用ExecutorService
進行並行處理
ExecutorService
是一個代表異步執行機制的接口。它允許我們提交要執行的任務並提供管理它們的方法。
讓我們看看如何使用ExecutorService
接口來並行化for
循環:
void processParallelyWithExecutorService() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 100; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executorService);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executorService.shutdown();
}
上面的代碼中有幾點需要注意:
- 我們使用
newFixedThreadPool()
方法創建一個包含10
線程的線程池。 - 接下來,我們使用
CompletableFuture.runAsync()
方法將任務提交到線程池。runAsync()
方法確保提供給它的任務在單獨的線程中異步運行。 - 該方法採用
Callable
或Runnable
對像作為參數。在本例中,我們使用 lambda 表達式創建一個Runnable
對象。 -
runAsync
()
方法返回一個CompletableFuture
對象。我們將其添加到**CompletableFuture
對象列表中,以便稍後使用executorService
實例中的線程池執行**。 - 接下來,我們使用
CompletableFuture.allOf()
方法組合CompletableFuture
對象,並對它們調用join()
操作。執行join()
時,進程會等待所有CompletableFuture
任務並行完成。 - 最後,我們使用
shutdown()
方法關閉執行器服務。該方法釋放線程池中的所有線程。
4. 使用流進行並行處理
Java 8 引入了 Stream API,它支持並行處理。讓我們探討一下 Stream API 如何並行化for
循環。
4.1.使用並行流
讓我們看看如何使用 Stream API 的parallel()
方法來並行化 for 循環:
void processParallelyWithStream() { IntStream.range(0, 100) .parallel() .forEach(i -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); }
在上面的代碼中,我們使用IntStream.range()
方法創建一個整數流。接下來,我們調用parallel()
方法來並行化流。
最後,我們調用forEach()
方法來處理流的元素。對於每個元素,我們調用Thread.sleep()
方法來模擬繁重的操作。
4.2.使用StreamSupport
並行化for
循環的另一種方法是使用StreamSupport
類。讓我們看一下相同的代碼:
void processParallelyWithStreamSupport() { Iterable<Integer> iterable = () -> IntStream.range(0, 100).iterator(); Stream<Integer> stream = StreamSupport.stream(iterable.spliterator(), true); stream.forEach(i -> { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); }
StreamSupport
類提供了一個stream()
方法,該方法採用Iterable
對像作為參數。此外,它還需要一個boolean
參數來指示流是否應該並行。
在這裡,我們使用IntStream.range()
方法創建一個Iterable
對象。接下來,我們調用stream()
方法來創建整數流。最後,我們調用forEach()
方法來處理流的元素。
parallel()
方法和StreamSupport
類的工作方式類似。它們在內部創建線程來處理流的元素。創建的線程數量取決於系統中可用的核心數量。
5. 性能比較
現在我們已經了解了並行化for
循環的不同方法,讓我們比較一下每種方法的性能。為此,我們使用 Java Microbenchmark Harness (JMH)。首先,我們需要將 JMH 依賴項添加到我們的項目中。
接下來,讓我們將BenchmarkMode
註釋添加到我們的方法中,並使它們能夠對平均時間進行基準測試:
@Benchmark
@BenchmarkMode(Mode.AverageTime)
public void processSerially() throws InterruptedException {
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
}
}
同樣,讓我們對所有並行處理方法執行相同的操作。
為了運行基準測試,我們創建一個main()
方法並設置 JMH:
class Benchmark {
public static void main(String[] args) {
try {
org.openjdk.jmh.Main.main(new String[] { "com.baeldung.concurrent.parallel.Processor" });
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
從main()
方法中,我們調用 JMH 的main()
方法並將路徑作為參數傳遞給Processor
類。這告訴 JMH 對Processor
類的方法運行基準測試。
當我們運行main()
方法時,我們會看到以下結果:
從上面的結果可以看出,並行處理元素所花費的時間比順序處理它們所花費的時間要少得多。
值得注意的是,處理元素所花費的時間可能因係統而異。這取決於系統中可用的核心數量。
此外,每種並行方法每次運行所花費的時間可能會有所不同,並且這些數字並不是這些方法之間的精確比較。
六,結論
在本文中,我們研究了在 Java 中並行化for
循環的不同方法。我們探索瞭如何使用ExecutorService
接口、Stream API 和StreamSupport
實用程序來並行化for
循環。最後,我們使用 JMH 比較了每種方法的性能。
與往常一樣,示例的代碼可以在 GitHub 上獲取。