如何將一個流拆分為多個流
一、概述
Java 的 Streams API 是一種功能強大且用途廣泛的數據處理工具。根據定義,流式操作是對一組數據的單次迭代。
但是,有時我們希望以不同的方式處理流的某些部分並獲得不止一組結果。
在本教程中,我們將學習如何將流拆分為多個組並獨立處理它們。
2. 使用收集器
一個 Stream 應該被操作一次並且有一個終端操作。它可以有多個中間操作,但數據只能在關閉之前收集一次。
這意味著 Streams API 規范明確禁止分叉流並為每個分叉提供不同的中間操作。這將導致多個終端操作。但是,我們可以在終端操作裡面拆分流。這會創建一個分為兩個或更多組的結果。
2.1。使用partitioningBy
進行二進制拆分
如果我們想將一個流一分為二,我們可以使用Collectors
類中的partitioningBy
。它接受一個Predicate
並返回一個Map
,它將滿足Boolean
true
鍵下的謂詞和false
下的其餘元素分組。
假設我們有一個文章列表,其中包含有關它們應該發佈到的目標站點以及它們是否應該被推薦的信息。
List<Article> articles = Lists.newArrayList(
new Article("Baeldung", true),
new Article("Baeldung", false),
new Article("Programming Daily", false),
new Article("The Code", false));
我們將它分為兩組,一組僅包含 Baeldung 文章,第二組包含其余文章:
Map<Boolean, List<Article>> groupedArticles = articles.stream()
.collect(Collectors.partitioningBy(a -> a.target.equals("Baeldung")));
我們看看地圖中的true
鍵下分別歸檔false
哪些文章:
assertThat(groupedArticles.get(true)).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get(false)).containsExactly(
new Article("Programming Daily", false),
new Article("The Code", false));
2.2.使用groupingBy
拆分
如果我們想要更多的類別,那麼我們需要使用groupingBy
方法。它需要一個函數,將每個元素分類到一個組中。然後它返回一個將每個組分類器鏈接到其元素集合的Map
。
假設我們要按目標站點對文章進行分組。返回的Map
將具有包含站點名稱的鍵和包含與給定站點關聯的文章集合的值:
Map<String, List<Article>> groupedArticles = articles.stream()
.collect(Collectors.groupingBy(a -> a.target));
assertThat(groupedArticles.get("Baeldung")).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get("Programming Daily")).containsExactly(new Article("Programming Daily", false));
assertThat(groupedArticles.get("The Code")).containsExactly(new Article("The Code", false));
3. 使用teeing
從 Java 12 開始,我們為二進制拆分提供了另一種選擇。我們可以使用teeing
收集器。 teeing
將兩個收集器組合成一個複合材料。每個元素都由它們處理,然後使用提供的合併函數合併為單個返回值。
3.1。與Predicate
teeing
teeing
收集器與Collectors
類中的另一個收集器很好地配對,稱為filtering
。它接受一個謂詞並使用它來過濾處理過的元素,然後將它們傳遞給另一個收集器。
讓我們將文章分為 Baeldung 和非 Baeldung 的組併計算它們。我們還將使用List
構造函數作為合併函數:
List<Long> countedArticles = articles.stream().collect(Collectors.teeing(
Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.counting()),
Collectors.filtering(article -> !article.target.equals("Baeldung"), Collectors.counting()),
List::of));
assertThat(countedArticles.get(0)).isEqualTo(2);
assertThat(countedArticles.get(1)).isEqualTo(2);
3.2.處理eeing
結果
此解決方案與以前的解決方案之間存在一個重要區別。我們之前創建的組沒有重疊,源流中的每個元素最多屬於一個組。使用teeing,
我們不再受此限制的約束,因為每個收集器都可能處理整個流。讓我們看看如何利用它。
我們可能希望將文章分為兩組,一組僅包含特色文章,第二組僅包含 Baeldung 文章。生成的文章集可能會重疊,因為一篇文章可以同時成為 Baeldung 的特色和目標。
這次我們將它們收集到列表中,而不是計數:
List<List<Article>> groupedArticles = articles.stream().collect(Collectors.teeing(
Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.toList()),
Collectors.filtering(article -> article.featured, Collectors.toList()),
List::of));
assertThat(groupedArticles.get(0)).hasSize(2);
assertThat(groupedArticles.get(1)).hasSize(1);
assertThat(groupedArticles.get(0)).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get(1)).containsExactly(new Article("Baeldung", true));
4. 使用 RxJava
雖然 Java 的 Streams API 是一個有用的工具,但有時它還不夠。其他解決方案,例如 RxJava 提供的響應式流,可能能夠幫助我們。讓我們看一個簡短的示例,說明如何使用Observable
和多個Subscribers
來實現與Stream
示例相同的結果。
4.1。創建一個Observable
首先,我們需要從文章列表中創建一個Observable
實例。我們可以使用Observable
類的from
factory 方法:
Observable<Article> observableArticles = Observable.from(articles);
4.2.過濾Observables
接下來,我們需要創建將過濾文章的Observables
。為此,我們將使用Observable
類中的filter
方法:
Observable<Article> baeldungObservable = observableArticles.filter(
article -> article.target.equals("Baeldung"));
Observable<Article> featuredObservable = observableArticles.filter(
article -> article.featured);
4.3.創建多個Subscribers
最後,我們需要訂閱Observables
並提供一個Action
來描述我們想要對文章做什麼。一個真實的例子是將它們保存在數據庫中或將它們發送給客戶端,但我們將滿足於將它們添加到列表中:
List<Article> baeldungArticles = new ArrayList<>();
List<Article> featuredArticles = new ArrayList<>();
baeldungObservable.subscribe(baeldungArticles::add);
featuredObservable.subscribe(featuredArticles::add);
5. 結論
在本教程中,我們學習瞭如何將流拆分為組並分別處理它們。首先,我們查看了較舊的 Streams API 方法: groupingBy
和partitionBy
。接下來,我們使用了一種更新的方法,利用了 Java 12 中引入的teeing
方法。最後,我們研究瞭如何使用 RxJava 來獲得具有更大彈性的類似結果。
與往常一樣,源代碼可在 GitHub 上獲得。