Java 中的流分區
1. 概述
在本教程中,我們將探索基於固定最大大小對 Java 8 Stream
進行分割的各種技術。
我們將首先回顧如何使用List
來完成此任務。隨後,我們將透過合併Stream
特定的功能(例如惰性求值和線程安全)來增強我們的方法。
2. List
分區
Java 中有多種對List
進行分割的方法。一個簡單的方法是先根據所需的批次大小和來源清單的大小來確定所需的批次數量:
static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
int nrOfFullBatches = (source.size() - 1) / batchSize;
// ...
}
為了將來源列表劃分為較小的子列表,我們的初始步驟涉及計算劃分每個批次的起點和終點的索引。在執行此計算時,我們應該記住,與其他批次相比,最後一批的大小可能較小:
int startIndex = batch * batchSize;
int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;
最後,我們可以添加一些驗證並涵蓋所有極端情況。例如,當來源清單為空或batchSize
為負數時:
static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
if (batchSize <= 0) {
throw new IllegalArgumentException(String.format("The batchSize cannot be smaller than 0. Actual value: %s", batchSize));
}
if (source.isEmpty()) {
return Stream.empty();
}
int nrOfFullBatches = (source.size() - 1) / batchSize;
return IntStream.rangeClosed(0, nrOfFullBatches)
.mapToObj(batch -> {
int startIndex = batch * batchSize;
int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;
return source.subList(startIndex, endIndex);
});
}
最後,讓我們測試一下該解決方案。對於從1
到8
的數字輸入清單和批次大小3
,我們期望三個子清單:
@Test
void whenPartitionList_thenReturnThreeSubLists() {
List<Integer> source = List.of(1, 2, 3, 4, 5, 6, 7, 8);
Stream<List<Integer>> result = partitionList(source, 3);
assertThat(result).containsExactlyInAnyOrder(
List.of(1, 2, 3),
List.of(4, 5, 6),
List.of(7, 8)
);
}
3. 並行Stream
Stream
具有獨特的特徵,例如惰性求值和並行處理的能力。可以透過建立自訂Collector.
此外,考慮到所需的回傳類型是子列表的列表,我們還將利用Collectors.toList()
已經定義的某些函數,我們稱之為downstream
收集器:
static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
return source.collect(partitionBySize(batchSize, Collectors.toList()));
}
static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
return Collector.of( ... );
}
我們可以使用static
工廠方法Collector.of()
來建立一個Collector
。讓我們查閱文檔,看看每個參數代表什麼:
-
supplier
- 新收集器的供應商功能 -
accumulator
– 新收集器的累加器功能 -
combiner
– 新收集器的組合器功能 -
finisher
– 新收集器的 Finisher 功能 -
characteristics
– 新收集器的收集器特徵
現在,讓我們有系統地瀏覽它們中的每一個,一一創建並理解它們的功能。
3.1. Supplier
我們將使用一個臨時物件來累積資料並將其分成批次。此累加器通常作為實作細節被隱藏。
收集操作完成後,我們呼叫 finisher 函數,該函數將該累加器轉換為收集器傳回的最終結果。工廠方法Collector.of()
的第一個參數將是一個提供自訂Accumulator.
此臨時累加器封裝了值清單和固定批量大小。此外,它還為呼叫者提供了指定偵聽器的靈活性,該偵聽器會在批次達到其容量時收到通知。此外,它還包括一個通用字段來容納下游收集器:
static class Accumulator<T, A> {
private final List<T> values = new ArrayList<>();
private final int batchSize;
private A downstreamAccumulator;
private final BiConsumer<A, List<T>> batchFullListener;
// constructor
}
不用說,蓄電池仍然是完全封裝的。因此,我們將其創建為static
內部類,並且我們將支援包保護的存取修飾符。
現在,讓我們寫一個接受新值的方法。將其新增至清單後,如果清單的大小達到batchSize
,我們將通知偵聽器,然後清除值:
void add(T value) {
values.add(value);
if (values.size() == batchSize) {
batchFullListener.accept(downstreamAccumulator, new ArrayList<>(values));
values.clear();
}
}
讓我們建立實例化此Accumulator
的Supplier
。當一批已滿時,我們將委託給下游累加器,在我們的例子中,來自Collectors.toList():
(acc, values) -> downstream.accumulator().accept(acc, values)
最後,我們可以使用方法引用重寫此BiConsumer
並建立我們的Supplier
:
Supplier<Accumulator> supplier = () -> new Accumulator<>(
batchSize,
downstream.supplier().get(),
downstream.accumulator()::accept
);
3.2. Accumulator
建立自訂Collector
時的第二個參數將是接受Accumulator
和新值的BiConsumer
。在我們的例子中,我們將簡單地委託給Accumulator
並允許它將值添加到當前批次:
BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);
3.3. Combiner
combiner
是一個接受兩個Accumulator
並提供將它們合併在一起的方法的函數。首先,我們需要使用下游的組合器合併它們的downstreamAccumulator
。之後,我們可以串流其中一個累加器累加的所有值,並將它們add
另一個累加器中:
BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> {
acc1.downstreamAccumulator = downstream.combiner()
.apply(acc1.downstreamAccumulator, acc2.downstreamAccumulator);
acc2.values.forEach(acc1::add);
return acc1;
};
讓我們重構程式碼並將此邏輯封裝在Accumulator
類別本身中:
static class Accumulator<T, A> {
private final List<T> values = new ArrayList<>();
private final int batchSize;
private A downstreamAccumulator;
private final BiConsumer<A, List<T>> batchFullListener;
// constructor
void add(T value) {
// ...
}
Accumulator<T, A> combine(Accumulator<T, A> other, BinaryOperator<A> accumulatorCombiner) {
this.downstreamAccumulator = accumulatorCombiner.apply(downstreamAccumulator, other.downstreamAccumulator);
other.values.forEach(this::add);
return this;
}
}
這將我們的組合器簡化為一行:
BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());
3.4. Finisher
如前所述,我們必須建立一種方法將這個自訂Accumulator
轉換為最終結果: List
of List
。這是我們可以依靠downstream
收集器將所有批次聚合到單一清單中的另一個地方。
此外,如果累加器不為空,則表示存在來自最後一個不完整批次的值,我們需要確保在呼叫下游整理器之前合併這些值:
Function<Accumulator<T, A>, R> finisher = acc -> {
if (!acc.values.isEmpty()) {
downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
}
return downstream.finisher().apply(acc.downstreamAccumulator);
};
3.5. Collector Characteristics
我們的收集器被設計為線程安全的,適合與並行流一起使用。這意味著最終的歸約過程在多個執行緒中同時發生。這種並行處理的自然結果是無法保證元素的順序。
Collector
Characteristics
可用於優化縮減實現。根據我們概述的注意事項,我們將配置 features 參數以利用Collector.Characteristics.UNORDERED:
static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List, A, R> downstream) {
// ...
return Collector.of(
supplier,
accumulator,
combiner,
finisher,
Collector.Characteristics.UNORDERED
);
}
3.6.完整的解決方案
我們現在了解了收集器創建中使用的每個函數所扮演的角色。在繼續測試之前,讓我們回顧一下整個方法:
static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
return source.collect(partitionBySize(batchSize, Collectors.toList()));
}
static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
Supplier<Accumulator<T, A>> supplier = () -> new Accumulator<>(
batchSize,
downstream.supplier().get(),
downstream.accumulator()::accept
);
BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);
BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());
Function<Accumulator<T, A>, R> finisher = acc -> {
if (!acc.values.isEmpty()) {
downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
}
return downstream.finisher().apply(acc.downstreamAccumulator);
};
return Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics.UNORDERED);
}
在測試過程中,我們將無法再斷言每個批次中的值。因此,我們的斷言將僅集中於驗證批次的數量和大小。例如,當對包含1
到8
之間的整數、 batchSize
為3
的平行流進行分區時,我們將產生兩個完整的批次,每個批次包含三個元素,以及一個批次包含兩個元素:
@Test
void whenPartitionStream_thenReturnThreeSubLists() {
Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();
List<List<Integer>> result = partitionStream(source, 3);
assertThat(result)
.hasSize(3)
.satisfies(batch -> assertThat(batch).hasSize(3), atIndex(0))
.satisfies(batch -> assertThat(batch).hasSize(3), atIndex(1))
.satisfies(batch -> assertThat(batch).hasSize(2), atIndex(2));
}
4. 使用 Guava Stream
進行分區
為了避免潛在的錯誤,我們可以選擇使用經過驗證的第三方函式庫,而不是從頭開始建立線程安全的Collector
。例如,Google 的 Guava 提供了一種優雅而簡潔的方法,用於將Stream
劃分為包含相同資料類型的List
的Iterable
。
首先,讓我們將依賴項新增到pom.xml
中:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.2-jre</version>
</dependency>
現在,我們可以簡單地使用static
方法Iterables.partition().
此函數接受Iterable
和所需的批次大小作為其參數:
static <T> Iterable<List<T>> partitionUsingGuava(Stream<T> source, int batchSize) {
return Iterables.partition(source::iterator, batchSize);
}
我們的測試方法中的唯一區別在於更改的返回類型,現在是Iterable
。為了斷言批量大小,我們將Iterable
的所有元素收集到ArrayList
。除本次調整外,測試流程不變:
@Test
void whenPartitionParallelStreamWithGuava_thenReturnThreeSubLists() {
Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();
Iterable<List<Integer>> result = partitionUsingGuava(source, 3);
assertThat(result)
.map(ArrayList::new)
.hasSize(3)
.satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(0))
.satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(1))
.satisfies(batch -> assertThat(batch).asList().hasSize(2), atIndex(2));
}
5. 結論
在本文中,我們探討了在 Java 中對Stream
進行分割的各種方法。我們首先回顧如何將List
拆分為固定大小的固定值的較小子清單。接下來,我們討論了Stream
和並行Stream
的優點,
並為它們創建了自己的自訂Collector
。
最後,我們嘗試了 Guava 的 API,它使我們能夠使用static
方法Iterables.partition().
與往常一樣,範例的完整原始程式碼可在 GitHub 上取得。