Java 流中聚合運行時異常
1. 概述
在本教程中,我們將學習如何在流管道中聚合異常。
Stream API 本身不提供任何聲明性方式來處理異常。它在管道中只有一個通道來處理數據,並且沒有單獨的通道來處理異常。這意味著它不提供在遇到異常時調用函數的方法。因此,我們必須回退到使用 try-catch 塊來捕獲異常。
因此,在流管道中聚合異常並處理它們可能具有挑戰性。
2. 在流管道中使用 Try Catch 塊聚合異常
通常情況下,只需要調用一個方法即可發揮作用,例如,一個簡單的數據庫更新可能會因連接失敗而引發異常。考慮到這一點,讓我們考慮一個在管道中調用processThrowsExAndNoOutput()
的簡單示例:
@Test
public void givenTryCatchInPipeline_whenFoundEx_thenSuppressExIntoRuntimeEx() {
String[] strings = {"1", "2", "3", "a", "b", "c"};
RuntimeException runEx = Arrays.stream(strings)
.map(str -> {
try {
processThrowsExAndNoOutput(str);
return null;
} catch (RuntimeException e) {
return e;
}
})
.filter(Objects::nonNull)
.collect(Collectors.collectingAndThen(Collectors.toList(), list -> {
RuntimeException runtimeException = new RuntimeException("Errors Occurred");
list.forEach(runtimeException::addSuppressed);
return runtimeException;
}));
processExceptions(runEx);
assertEquals("Errors Occurred", runEx.getMessage());
assertEquals(3, runEx.getSuppressed().length);
}
在上面的程序中,我們將捕獲的異常視為流中的數據。 map()
方法返回 null 或異常。使用filter()
,下游只允許例外。最後,我們使用addSuppressed()
將其簡化為RuntimeException
。然後我們可以調用processExceptions()
來處理聚合異常。
這樣可行!但它可以更具聲明性嗎?讓我們在接下來的部分中努力實現這一目標。
3. 通過將 Try Catch 塊提取到方法中來聚合異常
讓我們使實現更加可讀和簡潔。為此,我們將 try-catch 塊移動到一個方法中:
static Throwable callProcessThrowsExAndNoOutput(String input) {
try {
processThrowsExAndNoOutput(input);
return null;
} catch (RuntimeException e) {
return e;
}
}
現在,我們可以從管道內部調用上述方法:
@Test
public void givenExtractedMethod_whenFoundEx_thenSuppressExIntoRuntimeEx() {
String[] strings = {"1", "2", "3", "a", "b", "c"};
RuntimeException runEx = Arrays.stream(strings)
.map(str -> callProcessThrowsExAndNoOutput(str))
.filter(Objects::nonNull)
.reduce(new RuntimeException("Errors Occurred"), (o1, o2) -> {
o1.addSuppressed(o2);
return o1;
});
// handle the aggregate exception as before
}
上面的方法看起來更乾淨。然而,仍有改進的空間和更多的用例可供討論。
4. 使用反射聚合流管道中的異常和輸出
大多數程序必須處理異常和預期輸出。讓我們舉一個可以返回異常或某些輸出的方法的示例:
static Object processReturnsExAndOutput(String input) {
try {
return Integer.parseInt(input);
} catch (Exception e) {
return new RuntimeException("Exception in processReturnsExAndOutput for " + input, e);
}
}
現在,讓我們看看管道處理:
@Test
public void givenProcessMethod_whenStreamResultHasExAndOutput_thenHandleExceptionListAndOutputList() {
List<String> strings = List.of("1", "2", "3", "a", "b", "c");
Map map = strings.stream()
.map(s -> processReturnsExAndOutput(s))
.collect(Collectors.partitioningBy(o -> o instanceof RuntimeException, Collectors.toList()));
List<RuntimeException> exceptions = (List<RuntimeException>) map.getOrDefault(Boolean.TRUE, List.of());
List<Integer> results = (List<Integer>) map.getOrDefault(Boolean.FALSE, List.of());
handleExceptionsAndOutputs(exceptions, results);
}
上面的流管道在終端collect()中使用了partitioningBy()
collect().
它利用反射將結果劃分為異常和整數列表。再往下,程序調用handleExceptionsAndOutputs()
來處理異常和進一步處理的輸出。
這次,我們沒有將異常減少為聚合的RuntimeException.
相反,我們傳遞了異常列表以供進一步處理。這是聚合異常的另一種方法。
正如我們所看到的,它絕對不是最乾淨的方法,需要原始類型和轉換。因此,接下來的部分將探索更通用的解決方案來解決當前的問題。
4. 使用自定義映射器聚合異常和輸出
展望未來,我們將更多地關注函數式編程。
我們將創建一個包裝另一個map()
流函數的自定義映射器函數。它返回一個Result
對象,該對象封裝了結果和異常。
首先,我們看一下Result
類:
public class Result<R, E extends Throwable> {
private Optional<R> result;
private Optional<E> exception;
public Result(R result) {
this.result = Optional.of(result);
this.exception = Optional.empty();
}
public Result(E exception) {
this.exception = Optional.of(exception);
this.result = Optional.empty();
}
public Optional<R> getResult() {
return result;
}
public Optional<E> getException() {
return exception;
}
}
Result
類使用泛型和Optional
。由於result
和exception
可以保存空值或非空值,因此我們使用了Optional
。隨著我們繼續前進,它的用法將會變得更加清晰。
我們在本節開頭討論了自定義映射器,現在讓我們看看它的實現:
public class CustomMapper {
public static <T, R> Function<T, Result<R, Throwable>> mapper(Function<T, R> func) {
return arg -> {
try {
return new Result(func.apply(arg));
} catch (Exception e) {
return new Result(e);
}
};
}
}
現在是時候看看mapper()
運行情況了:
@Test
public void givenCustomMapper_whenStreamResultHasExAndSuccess_thenHandleExceptionListAndOutputList() {
List<String> strings = List.of("1", "2", "3", "a", "b", "c");
strings.stream()
.map(CustomMapper.mapper(Integer::parseInt))
.collect(Collectors.collectingAndThen(Collectors.toList(),
list -> handleErrorsAndOutputForResult(list)));
}
這次,我們使用[Collectors.CollectingAndThen()](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html#collectingAndThen-java.util.stream.Collector-java.util.function.Function-)
在管道末尾使用Result<Integer, Throwable>
對象列表調用handleErrorsAndOutputForResult()
。我們看一下handleErrorsAndOutputForResult():
static String handleErrorsAndOutputForResult(List<Result<Integer, Throwable>> successAndErrors) {
logger.info("handle errors and output");
successAndErrors.forEach(result -> {
if (result.getException().isPresent()) {
logger.error("Process Exception " + result.getException().get());
} else {
logger.info("Process Result" + result.getResult().get());
}
});
return "Errors and Output Handled";
}
如上所示,我們只需迭代Result
列表,並藉助Optional.isPresent()方法分叉成成功或失敗流程。當必須明確處理成功和錯誤情況(例如向單獨的用戶發送通知)時,這可能是一種有用的方法。
當Stream.map()
內部使用的函數無法修改時,例如,因為它來自外部庫,我們可以使用自定義的mapper()
函數來包裝它並以更通用的方式處理結果。
4. 使用自定義收集器聚合異常和輸出
聚合管道的異常和輸出是一種收集活動。因此,實現一個專門為此目的而設計的收集器是有意義的。
讓我們看看如何做到這一點:
public class CustomCollector<T, R> {
private final List<R> results = new ArrayList<>();
private final List<Throwable> exceptions = new ArrayList<>();
public static <T, R> Collector<T, ?, CustomCollector<T, R>> of(Function<T, R> mapper) {
return Collector.of(
CustomCollector::new,
(collector, item) -> {
try {
R result = mapper.apply(item);
collector.results.add(result);
} catch (Exception e) {
collector.exceptions.add(e);
}
},
(left, right) -> {
left.results.addAll(right.results);
left.exceptions.addAll(right.exceptions);
return left;
}
);
}
// standard getters...
}
最後,我們來看看收集器到底是如何工作的:
@Test
public void givenCustomCollector_whenStreamResultHasExAndSuccess_thenHandleAggrExceptionAndResults() {
String[] strings = {"1", "2", "3", "a", "b", "c"};
Arrays.stream(strings)
.collect(Collectors.collectingAndThen(CustomCollector.of(Integer::parseInt),
col -> handleExAndResults(col.getExceptions(), col.getResults())));
}
5. 使用 Vavr 庫中的 Try 和 Either 聚合異常和輸出
Try
是一個容器,用於保存未捕獲的異常或成功時的實際輸出。就像前面討論的自定義映射器一樣, Try
也可以包裝函數。
而**Either
是一個更通用的容器,它保存錯誤類型或預期輸出類型**。
讓我們看看如何一起利用這些功能:
@Test
public void givenVavrEitherAndTry_whenStreamResultHasExAndSuccess_thenHandleExceptionListAndOutputList() {
List<String> strings = List.of("1", "2", "3", "a", "b", "c");
strings.stream()
.map(str -> Try.of(() -> Integer.parseInt(str)).toEither())
.collect(Collectors.collectingAndThen(Collectors.partitioningBy(Either::isLeft, Collectors.toList()),
map -> handleErrorsAndOutputForEither(map)));
}
正如我們所看到的,程序將Try
對象轉換為Either
,然後將其收集到一個映射中以調用handleErrorsAndOutputForEither():
static void handleErrorsAndOutputForEither(Map<Boolean, List<Either<Throwable, Integer>>> map) {
logger.info("handle errors and output");
map.getOrDefault(Boolean.TRUE, List.of())
.forEach(either -> logger.error("Process Exception " + either.getLeft()));
map.getOrDefault(Boolean.FALSE, List.of())
.forEach(either -> logger.info("Process Result " + either.get()));
}
此外,如上所示,可以通過在Either
對像上向左或向右滑動來處理異常和輸出。正如我們所看到的, Try
並Either
方法為我們提供了當今所見過的最簡潔的解決方案。
六,結論
在本教程中,我們探索了在處理流時聚合運行時異常的幾種方法。儘管可以採用多種方法,但保持流處理的本質非常重要,包括簡潔性、不變性和聲明性語法。
與往常一樣,代碼可以在 GitHub 上獲取。