用於批次的DataLoader
1.概述
在編寫循環內重複呼叫資料庫或 API 的應用程式時,經常會遇到效能問題。這是一種常見的模式,但效率非常低;每次呼叫都會增加延遲,並為系統帶來不必要的負載。
為了解決這個問題,一個簡單而優雅的方法是使用 DataLoader ,
它可以批次處理和快取我們的請求。
在本教程中,我們將透過一個完整的工作範例介紹 DataLoader 是什麼、它如何幫助優化資料擷取任務,以及如何在 Java 中使用它。
2.什麼是DataLoader?
DataLoader 是 Facebook 開發的實用程序,用於最佳化 GraphQL API 呼叫。它無需多次呼叫資料庫,而是將多個單獨的資料請求合併為一個請求。
雖然 DataLoader 最初是為 GraphQL 創建的,但它可以與 REST API、微服務中的服務到服務呼叫或任何可以透過批次優化重複資料提取的情況一起使用。
要將其嵌入到我們的專案中,我們需要包含java-dataloader
庫。對於 Maven,我們將依賴項新增到pom.xml
中:
<dependency>
<groupId>com.graphql-java</groupId>
<artifactId>java-dataloader</artifactId>
<version>3.2.0</version>
</dependency>
如前所述,它完全通用,適用於任何需要多次重複呼叫後端的 Java 應用程式。它只需修改資料檢索方式,即可在不改變業務邏輯的情況下提高效能。
我們已經意識到了問題所在。如果沒有 DataLoader,循環逐一載入使用者資料會導致多次單獨的資料庫訪問,從而犧牲效能:
userRepository.findById(1);
userRepository.findById(2);
userRepository.findById(3);
每一行都會觸發自己的 SQL 查詢,從而產生三次資料庫呼叫。
使用 DataLoader,我們可以用load()
和dispatch()
呼叫來取代它們:
dataLoader.load(1);
dataLoader.load(2);
dataLoader.load(3);
dataLoader.dispatch();
dataLoader
的dispatch()
方法將我們的load()
呼叫合併為單一資料庫查詢。接下來,我們將了解設定和使用 DataLoader 的具體細節。
3. 使用DataLoader進行批次處理
現在我們了解了 DataLoader 存在的原因,讓我們透過建立一個從真實資料庫載入使用者的 Java 應用程式來看它的實際作用。我們將使用H2
(記憶體 SQL 資料庫),並使用 JPA 建立一個簡單的 Spring Boot 專案。
3.1. 建立一個簡單的User
實體類
首先,我們需要一個簡單、輕量的User
類別來表示我們的資料。在本例中,我們載入使用者的 ID 和名稱:
@Entity
@Table(name = "users")
@Getter
@AllArgsConstructor
public class User {
private final String id;
private final String name;
}
我們在這裡使用了 Lombok。 @Getter
註解會自動為 ID 和 name 產生 getter 方法,而@AllArgsConstructor
則會建立一個接受這兩個欄位的建構子。由於這兩個欄位都被標記為 final,因此它們只能在物件建立期間設定一次。這確保了不變性,使我們的資料模型執行緒安全,非常適合併發或非同步場景,例如使用 DataLoader 進行批次載入。
3.2. 建立UserRepository
類
UserRepository
是一個簡單的 JPA 儲存庫,它擴充了JpaRepository<User, String>
。它繼承了findAllById()
方法,該方法允許我們在單一查詢中有效地檢索多個使用者實體:
public interface UserRepository extends JpaRepository<User, String> {
}
UserRepository
負責直接資料庫訪問,而UserService
負責管理業務邏輯和非同步執行。透過使用 Spring Data JPA 內建的findAllById()
方法,我們可以有效率地批次處理資料庫查詢。
3.3. 建立UserService
類
現在,我們需要一種方法來一次獲取所有用戶,所以我們建立了一個UserService
類別。它接受一個使用者 ID 列表,並使用 Java 的CompletableFuture
非同步取得對應的使用者記錄:
@Service
public class UserService {
private final UserRepository userRepository;
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
public CompletableFuture<List<User>> getUsersByIds(List<String> ids) {
return CompletableFuture.supplyAsync(() -> userRepository.findAllById(ids));
}
}
getUsersByIds()
方法確保資料庫呼叫不會阻塞主執行緒。它還能使服務做好非阻塞批量操作的準備,這在使用 DataLoader 時至關重要。
在底層,該服務將實際的資料庫互動委託給UserRepository
。
3.4. DataLoader配置
最後,我們可以使用我們的服務來載入資料了。這個配置類別將UserService
包裝在BatchLoader
中,這使得DataLoader
能夠有效率地批次處理和解析使用者請求。
現在,讓我們更詳細地了解BatchLoader
是 DataLoader 函式庫中的核心介面。它定義了在一次呼叫中獲取多個鍵的資料。我們的函數接收一個鍵列表,並且必須傳回一個 CompletableFuture<ListBatchLoader
CompletableFuture<List<V>>
的項與傳入鍵的順序相同;這對於 DataLoader 正確匹配結果至關重要:
@Component
public class UserDataLoader {
private final UserService userService;
public UserDataLoader(UserService userService) {
this.userService = userService;
}
public DataLoader<String, User> createUserLoader() {
BatchLoader<String, User> userBatchLoader = ids -> {
return userService.getUsersByIds(ids)
.thenApply(users -> {
Map<String, User> userMap = users.stream()
.collect(Collectors.toMap(User::getId, user -> user));
return ids.stream().map(userMap::get).collect(Collectors.toList());
});
};
return DataLoaderFactory.newDataLoader(userBatchLoader);
}
}
UserDataLoader
類別接受UserService
作為相依性。它公開createUserLoader()
方法,該方法建構並傳回一個DataLoader
物件。然後,我們定義一個BatchLoader<String, User>
。這是批次邏輯的核心。
每當DataLoader
收到多個load(userId)
調用時,它都會收集這些調用,等待當前進程完成,然後在此批次載入器上使用ids
List
調用load()
。我們利用UserService.getUsersByIds()
和完整的 ID 清單一次批次處理所有呼叫。然後,我們將User
物件清單按 ID 轉換為Map
,以便能夠按照傳入 ID 清單的順序傳回使用者清單。
最後,我們將這個批次載入器傳遞給DataLoaderFactory.newDataLoader()
,這將傳回一個功能齊全的DataLoader<String, User>
實例。這樣一來,只需很少的樣板程式碼,就能在後台實現批次處理、快取和高效的用戶資料取得。
3.5. 其他類型的批次載入器
除了標準的BatchLoader<K, V>
之外, DataLoader
也提供:
-
MappedBatchLoader<K, V> –
傳回Map<K, V>
而不是列表,這樣我們就不需要手動排序了 -
BatchLoaderWithContext<K, V, C>
– 為我們提供了一個額外的上下文參數,如果我們的批次函數需要請求範圍的信息,則該參數非常有用 -
MappedBatchLoaderWithContext<K, V, C>
– 結合了映射和上下文功能
簡而言之, BatchLoader
是支援 DataLoader 批次魔法的引擎,而選擇正確的變體取決於我們希望如何建立批次函數輸出。
4. 證明資料庫只會被呼叫一次
現在,有了使用DataLoader
執行批次的所有程式碼,我們最終可以使用它來載入一批使用者。
我們將把UserService
當作偵察 bean 注入:
@SpyBean
private UserService userService;
這使我們能夠連接到即時資料庫,同時也能夠對我們服務的函數呼叫進行斷言。
接下來,讓我們將三個User
實體預先載入到資料庫中並建立我們的DataLoader
:
@BeforeEach
void setUp() {
userRepository.deleteAll();
User user1 = new User("101", "User_101");
User user2 = new User("102", "User_102");
User user3 = new User("103", "User_103");
userRepository.saveAll(Arrays.asList(user1, user2, user3));
userDataLoader = new DataLoader<>(userService::getUsersByIds);
DataLoaderRegistry registry = new DataLoaderRegistry();
registry.register("userDataLoader", userDataLoader);
}
最後我們實作DataLoader
邏輯:
@Test
void whenLoadingUsers_thenBatchLoaderIsInvokedAndResultsReturned() {
CompletableFuture<User> userFuture1 = userDataLoader.load("101");
CompletableFuture<User> userFuture2 = userDataLoader.load("102");
CompletableFuture<User> userFuture3 = userDataLoader.load("103");
userDataLoader.dispatchAndJoin();
verify(userService, times(1)).getUsersByIds(anyList());
assertThat(userFuture1.join().getName()).isEqualTo("User_101");
assertThat(userFuture2.join().getName()).isEqualTo("User_102");
assertThat(userFuture3.join().getName()).isEqualTo("User_103");
}
我們將三個load()
呼叫排入佇列,這些呼叫不會立即執行。一旦我們呼叫dispatchAndJoin()
, DataLoader
就會將這些呼叫批次處理,並使用我們所有的 ID 觸發底層的批次函數。
然後我們使用future.get()
來檢索每個用戶,它會阻塞直到結果準備好。
為了驗證DataLoader
的批次能力,我們結合使用verify()
和times()
來斷言getUsersByIds()
只被呼叫一次。這證明即使我們請求了三個不同的用戶,服務方法也只被呼叫了一次,這意味著所有請求都被一起批次處理了。
最後,我們斷言傳回的使用者與預先載入的實體匹配,並按照預期的順序接收。
5. 結論
在本文中,我們看到重複的資料庫或服務呼叫很快就會成為瓶頸。而這正是DataLoader
閃光點。它能夠有效率地批次處理和快取調用,從而減少負載、縮短回應時間並簡化程式碼。無論我們建構的是 GraphQL API、REST 端點或微服務,引入DataLoader
都能帶來顯著的提升。
與往常一樣,本文中提供的程式碼可在 GitHub 上找到。