Java 中 parallelStream() 和 stream().parallel() 的區別
一、簡介
在本教程中,我們將探索 Java Streams API 的Collections.parallelStream()
和stream().parallel()
產品。 Java 在 Java 8 中將parallelStream()
方法引入了Collection
接口,將parallel()
方法引入了BaseStream
接口。
2. Java中的並行流與並行
並行流允許我們通過跨多個 CPU 內核並行執行流操作來使用多核處理。數據被拆分成多個子流,並行執行預期的操作,最後將結果聚合回來形成最終輸出。
除非另有說明,否則在 Java 中創建的流在默認情況下始終是串行的。我們可以通過兩種方式將流轉換為並行流:
- 我們可以調用
Collections.parallelStream()
- 我們可以調用
BaseStream.parallel()
如果流操作未指定,則 Java 編譯器和運行時會在執行並行流操作時決定處理順序以獲得最佳並行計算優勢。
例如,我們有一個很長的Book
對象列表。我們必須確定在指定年份出版的書籍數量:
public class Book {
private String name;
private String author;
private int yearPublished;
// getters and setters
}
我們可以在這裡利用並行流並找到比串行計算更有效的計數。我們示例中的執行順序不會以任何方式影響最終結果,使其成為並行 Stream 操作的完美候選者。
3. Collections.parallelStream()
的用法
在我們的應用程序中使用並行流的方法之一是在數據源上調用parallelStream()
。此操作返回一個可能並行的 Stream,其中提供的集合作為其源。我們可以將其應用於我們的示例並查找特定年份出版的書籍數量:
long usingCollectionsParallel(Collection<Book> listOfbooks, int year) {
AtomicLong countOfBooks = new AtomicLong();
listOfbooks.parallelStream()
.forEach(book -> {
if (book.getYearPublished() == year) {
countOfBooks.getAndIncrement();
}
});
return countOfBooks.get();
}
parallelStream()
方法的默認實現從Collection
的Spliterator<T>
接口創建一個並行Stream
。 Spliterator
是一個對象,用於遍歷和劃分其源的元素。 Spliterator
可以使用其trySplit()
方法將其源的某些元素分區,使其符合可能的並行操作條件。
Spliterator
API 類似於Iterator,
允許遍歷其源的元素,旨在支持高效的並行遍歷。 Collection
的默認Spliterator
將在**parallelStream()
調用中使用。**
4. 在 Stream 上使用parallel()
我們可以通過首先將集合轉換為 Stream 來獲得相同的結果。我們可以通過調用parallel()
將作為結果生成的順序流轉換為並行流。一旦我們有了一個並行流,我們就可以用與上面相同的方式找到我們的結果:
long usingStreamParallel(Collection<Book> listOfBooks, int year) {
AtomicLong countOfBooks = new AtomicLong();
listOfBooks.stream().parallel()
.forEach(book -> {
if (book.getYearPublished() == year) {
countOfBooks.getAndIncrement();
}
});
return countOfBooks.get();
}
Streams
API 的BaseStream
接口將在源collection's
默認Spliterator
允許的範圍內拆分底層數據,然後使用 Fork-Join 框架將其轉換為並行 Stream。
兩種方法的結果相同。
5. parallelStream()
和stream().parallel()
的區別
Collections.parallelStream()
使用源集合的默認 S pliterator
來拆分數據源以啟用並行執行。均勻地拆分數據源對於啟用正確的並行執行很重要。不均勻分割的數據源在並行執行方面比其順序對應的數據源造成的危害更大。
在所有這些示例中,我們一直在使用List<Book>
來保存我們的書籍列表。現在讓我們嘗試通過重寫Collection<T>
接口為我們的Books
創建一個自定義Collection
。
我們應該記住,覆蓋接口意味著我們必須提供我們對基接口中定義的抽象方法的實現。但是,我們看到諸如spliterator()
、 stream()
和parallelStream()
之類的方法作為接口中的默認方法存在。這些方法有一個默認提供的實現。然而,我們仍然可以用我們的版本覆蓋這些實現。
我們將調用我們的Books,
自定義Collection
MyBookContainer
,我們還將定義我們自己的Spliterator
:
public class BookSpliterator<T> implements Spliterator<T> {
private final Object[] books;
private int startIndex;
public BookSpliterator(Object[] books, int startIndex) {
this.books = books;
this.startIndex = startIndex;
}
@Override
public Spliterator<T> trySplit() {
// Always Assuming that the source is too small to split, returning null
return null;
}
// Other overridden methods such as tryAdvance(), estimateSize() etc
}
在上面的代碼片段中,我們看到我們的Spliterator
版本接受一個Object
數組(在我們的例子中是Book)
進行拆分,並且在trySplit()
方法中,它總是返回 null。
我們應該注意,這種Spliterator<T>
接口的實現容易出錯,並且不會將數據分成相等的部分;而是返回null
,導致數據不平衡。這僅用於表示目的。
我們將在我們的自定義Collection
類MyBookContainer
中使用這個 Spliterator:
public class MyBookContainer<T> implements Collection<T> {
private static final long serialVersionUID = 1L;
private T[] elements;
public MyBookContainer(T[] elements) {
this.elements = elements;
}
@Override
public Spliterator<T> spliterator() {
return new BookSpliterator(elements, 0);
}
@Override
public Stream<T> parallelStream() {
return StreamSupport.stream(spliterator(), false);
}
// standard overridden methods of Collection Interface
}
我們將嘗試將數據存儲在我們的自定義容器類中並對其執行並行Stream
操作:
long usingWithCustomSpliterator(MyBookContainer<Book> listOfBooks, int year) {
AtomicLong countOfBooks = new AtomicLong();
listOfBooks.parallelStream()
.forEach(book -> {
if (book.getYearPublished() == year) {
countOfBooks.getAndIncrement();
}
});
return countOfBooks.get();
}
此示例中的數據源是MyBookContainer
類型的一個實例。此代碼在內部使用我們自定義的Spliterator
來拆分數據源。生成的並行 Stream 充其量是一個順序流。
我們只是利用parallelStream()
方法返回一個順序流,儘管名字暗示了parallelStream
。這是該方法不同於stream().parallel()
的地方,後者總是嘗試返回提供給它的流的並行版本。 Java 在其文檔中記錄了這種行為,如下所示:
@implSpec
* The default implementation creates a parallel {@code Stream} from the
* collection's {@code Spliterator}.
*
* @return a possibly parallel {@code Stream} over the elements in this
* collection
* @since 1.8
*/
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
六,結論
在本文中,我們介紹了從Collection
數據源創建並行流的方法。我們還嘗試通過實現自定義版本的Collection
和Spliterator.
來找出parallelStream()
和stream().parallel()
之間的區別。
與往常一樣,所有代碼示例都可以在 GitHub 上找到.