使用 Stream API 處理 JDBC 結果集
1. 概述
迭代ResultSet是從 JDBC 查詢中檢索資料的常用方法。然而,在某些情況下,我們可能更喜歡使用記錄流。
在本文中,我們將探討使用 Stream API 處理ResultSet幾種方法。
2. 使用 Spliterator
我們將從純 JDK 方法開始,使用 Spliterators 建立流。
首先,讓我們為我們的實體定義一個模型:
public record CityRecord(String city, String country) {
}
在我們的CityRecord中,我們儲存有關城市及其國家/地區的資訊。
接下來,讓我們建立一個與資料庫互動並傳回CityRecord實例流的儲存庫:
public class JDBCStreamAPIRepository {
private static final String QUERY = "SELECT name, country FROM cities";
private final Logger logger = LoggerFactory.getLogger(JDBCStreamAPIRepository.class);
public Stream<CityRecord> getCitiesStreamUsingSpliterator(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<CityRecord>(
Long.MAX_VALUE, Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super CityRecord> action) {
try {
if(!resultSet.next()) return false;
action.accept(createCityRecord(resultSet));
return true;
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
}
}, false);
}
private CityRecord createCityRecord(ResultSet resultSet) throws SQLException {
return new CityRecord(resultSet.getString(1), resultSet.getString(2));
}
}
我們建立了一個PreparedStatement來檢索cities表中的所有項目,並指定獲取大小以控制記憶體消耗。我們使用AbstractSpliterator產生一個流,只要ResultSet有更多值,就會產生新記錄。此外,我們使用createCityRecord方法將每一行對應到CityRecord 。
最後,讓我們為我們的儲存庫編寫一個測試:
public class JDBCResultSetWithStreamAPIUnitTest {
private static Connection connection = null;
private static final String JDBC_URL = "jdbc:h2:mem:testDatabase";
private static final String USERNAME = "dbUser";
private static final String PASSWORD = "dbPassword";
JDBCStreamAPIRepository jdbcStreamAPIRepository = new JDBCStreamAPIRepository();
@BeforeEach
void setup() throws Exception {
connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD);
initialDataSetup();
}
private void initialDataSetup() throws SQLException {
Statement statement = connection.createStatement();
String ddlQuery = "CREATE TABLE cities (name VARCHAR(50), country VARCHAR(50))";
statement.execute(ddlQuery);
List<String> sqlQueryList = Arrays.asList(
"INSERT INTO cities VALUES ('London', 'United Kingdom')",
"INSERT INTO cities VALUES ('Sydney', 'Australia')",
"INSERT INTO cities VALUES ('Bucharest', 'Romania')"
);
for (String query : sqlQueryList) {
statement.execute(query);
}
}
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingSpliterator_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingSpliterator(connection);
List<CityRecord> cities = cityRecords.toList();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
我們建立與 H2 資料庫的連接,並在測試之前準備好包含一些條目的cities表。最後,我們驗證我們的儲存庫是否以流的形式從表中傳回所有預期的項目。
3.使用JOOQ
JOOQ 是一個流行的關係資料庫庫。它已經提供了從ResultSet檢索記錄流的方法。
讓我們先加入必要的依賴項:
<dependency>
<groupId>org.jooq</groupId>
<artifactId>jooq</artifactId>
<version>3.19.11</version>
</dependency>
接下來,讓我們為JDBCStreamAPIRepository新增一個方法:
public Stream<CityRecord> getCitiesStreamUsingJOOQ(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return DSL.using(connection)
.fetchStream(resultSet)
.map(r -> new CityRecord(r.get("NAME", String.class),
r.get("COUNTRY", String.class)))];
}
我們使用ResultQuery類別中的[fetchStream()](https://www.jooq.org/javadoc/latest/org.jooq/org/jooq/ResultQuery.html#fetchStream())方法從ResultSet記錄流。此外,我們在從方法傳回 JOOQ 記錄之前將它們對應到CityRecord實例。
讓我們呼叫我們的新方法並驗證它的行為是否正確:
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJOOQ_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingJOOQ(connection);
List<CityRecord> cities = cityRecords.toList();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
正如預期的那樣,我們從流程中的資料庫中檢索了所有城市記錄。
4.使用jdbc-stream
或者,我們可以使用名為jdbc-stream 的輕量級程式庫從ResultSet建立流。
讓我們加入它的依賴項:
<dependency>
<groupId>com.github.juliomarcopineda</groupId>
<artifactId>jdbc-stream</artifactId>
<version>0.1.1</version>
</dependency>
現在,讓我們為JDBCStreamAPIRepository新增一個方法:
public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return JdbcStream.stream(resultSet)
.map(r -> {
try {
return createCityRecord(resultSet);
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
}
我們使用JdbcStream從ResultSet建置流。在底層,它使用 Spliterator 並使用與我們自己的實作相同的邏輯來建立流。
現在,我們將檢查新的儲存庫方法是如何運作的:
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingJdbcStream(connection);
List<CityRecord> cities = cityRecords.toList();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
我們已經使用 jdbc-stream 函式庫獲得了所有預期的項目。
5. 關閉資源
使用 JDBC 時,我們必須關閉所有使用的資源以避免連線洩漏。常見的做法是在Connection 、 PreparedStatement和ResultSet周圍使用 try-with-resources 語法。然而,當使用流時,這種方法不適合。如果我們從儲存庫方法返回串流,則所有資源都將關閉,並且流上的任何操作都將無法存取它們。
為了避免這個問題,我們需要使用串流的onClose()方法來關閉所有資源。此外,我們必須確保串流在完成處理後關閉。
讓我們修改我們的儲存庫方法以包含資源關閉邏輯:
public Stream<CityRecord> getCitiesStreamUsingJdbcStream(Connection connection)
throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);
connection.setAutoCommit(false);
preparedStatement.setFetchSize(5000);
ResultSet resultSet = preparedStatement.executeQuery();
return JdbcStream.stream(resultSet)
.map(r -> {
try {
return createCityRecord(resultSet);
} catch (SQLException e) {
throw new RuntimeException(e);
}
})
.onClose(() -> closeResources(connection, resultSet, preparedStatement));
}
private void closeResources(Connection connection, ResultSet resultSet, PreparedStatement preparedStatement) {
try {
resultSet.close();
preparedStatement.close();
connection.close();
logger.info("Resources closed");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
我們新增了closeResources()方法並將其附加到onClose()流程處理程序。
現在,讓我們修改客戶端程式碼以確保流在使用後關閉:
@Test
void givenJDBCStreamAPIRepository_whenGetCitiesStreamUsingJdbcStream_thenExpectedRecordsShouldBeReturned() throws SQLException {
Stream<CityRecord> cityRecords = jdbcStreamAPIRepository
.getCitiesStreamUsingJdbcStream(connection);
List<CityRecord> cities = cityRecords.toList();
cityRecords.close();
assertThat(cities)
.containsExactly(
new CityRecord("London", "United Kingdom"),
new CityRecord("Sydney", "Australia"),
new CityRecord("Bucharest", "Romania"));
}
在這裡,我們在處理完所有項目後關閉流。此外,我們可以觀察到一條日誌訊息,表明所有資源已關閉:
[main] INFO com.baeldung.resultset.streams.JDBCStreamAPIRepository -- Resources closed
六、結論
在本文中,我們探討了使用 Stream API 操作ResultSet的幾種選項。當處理無法一次全部載入到記憶體中的大型資料集時,此方法特別有用。此外,如果我們在應用程式中遵循函數式風格,串流儲存庫將與我們的邏輯很好地保持一致。
像往常一樣,完整的源代碼可以在 GitHub 上找到。