在Spring中實作Bulk和Batch API
1. 概述
實作標準 REST API 涵蓋了大多數典型用例。然而,基於 REST 的架構風格在處理任何批次或批次操作時存在一些限制。
在本教程中,我們將學習如何在微服務中應用批次和批次操作。此外,我們將實作一些自訂的寫入導向的批次和批次 API。
2. 批量API介紹
術語“批量”和“批次”操作通常可以互換使用。然而,兩者之間存在著嚴格的區別。
通常,批次操作意味著對相同類型的多個條目執行相同的操作。一種簡單的方法是透過為每個請求呼叫相同的 API 來應用批次操作。它可能太慢並且浪費資源。相反,我們可以在一次往返中處理多個條目。
我們可以透過在一次呼叫中對相同類型的多個條目應用相同的操作來實現批次操作。這種對專案集合進行操作的方式減少了整體延遲並提高了應用程式效能。為了實現,我們可以重複使用單一條目上使用的現有端點,或為批次方法建立單獨的路由。
批次操作通常意味著對多種資源類型執行不同的操作。批次 API 是在一次呼叫中對資源執行的各種操作的集合。這些資源操作可能沒有任何一致性。潛在地,每個請求路由可以彼此獨立。
簡而言之,「批次」一詞意味著批次不同的請求。
我們沒有很多明確的標準或規範來實施批量或批量操作。此外,許多流行的框架(例如 Spring)沒有對批量操作的內建支援。
儘管如此,在本教程中,我們將了解使用常見 REST 構造的批次和批次操作的自訂實作。
3.Spring中的範例應用
假設我們需要建立一個支援批量和批次操作的簡單微服務。
3.1. Maven 依賴項
首先,讓我們包含spring-boot-starter-web和[spring-boot-starter-validation](https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-validation)依賴項:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<version>3.1.5</version>
</dependency>
透過上述spring-boot-starter-validation依賴項,我們在應用程式中啟用了輸入資料驗證。我們需要它來驗證批次和批次請求的大小。
3.2.實施第一個 Spring 服務
我們將實作一個在儲存庫上建立、更新和刪除資料的服務。
首先,我們對Customer類別進行建模:
public class Customer implements Serializable {
private String id;
private String name;
private String email;
private String address;
// standard getters and setters
}
接下來,讓我們使用createCustomers()方法實作CustomerService類,以在記憶體儲存庫中儲存多個Customer物件:
@Service
public class CustomerService {
private final Map<String, Customer> customerRepoMap = new HashMap<>();
public List<Customer> createCustomers(List<Customers> customers) {
return customers.stream()
.map(this::createCustomer)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
}
}
然後,我們將實作createCustomer()方法來建立單一Customer物件:
public Optional<Customer> createCustomer(Customer customer) {
if (!customerRepoMap.containsKey(customer.getEmail()) && customer.getId() == 0) {
Customer customerToCreate = new Customer(customerRepoMap.size() + 1,
customer.getName(), customer.getEmail());
customerToCreate.setAddress(customer.getAddress());
customerRepoMap.put(customerToCreate.getEmail(), customerToCreate);
return Optional.of(customerToCreate);
}
return Optional.empty();
}
在上面的方法中,我們僅在儲存庫中不存在客戶時才建立客戶,否則我們將傳回一個空物件。
同樣,我們將實作一個方法來更新現有的Customer詳細資料:
private Optional<Customer> updateCustomer(Customer customer) {
Customer customerToUpdate = customerRepoMap.get(customer.getEmail());
if (customerToUpdate != null && customerToUpdate.getId() == customer.getId()) {
customerToUpdate.setName(customer.getName());
customerToUpdate.setAddress(customer.getAddress());
}
return Optional.ofNullable(customerToUpdate);
}
最後,我們將實作deleteCustomer()方法來從儲存庫中刪除現有的Customer :
public Optional<Customer> deleteCustomer(Customer customer) {
Customer customerToDelete = customerRepoMap.get(customer.getEmail());
if (customerToDelete != null && customerToDelete.getId() == customer.getId()) {
customerRepoMap.remove(customer.getEmail());
}
return Optional.ofNullable(customerToDelete);
}
3.3.實施第二次Spring服務
我們還實現另一個在儲存庫中獲取和創建地址資料的服務。
首先,我們定義Address class :
public class Address implements Serializable {
private int id;
private String street;
private String city;
//standard getters and setters
}
然後,讓我們使用createAddress()方法來實作AddressService class :
public Address createAddress(Address address) {
Address createdAddress = null;
String addressUniqueKey = address.getStreet().concat(address.getCity());
if (!addressRepoMap.containsKey(addressUniqueKey)) {
createdAddress = new Address(addressRepoMap.size() + 1,
address.getStreet(), address.getCity());
addressRepoMap.put(addressUniqueKey, createdAddress);
}
return createdAddress;
}
4. 使用現有端點實作批量 API
現在讓我們建立一個 API 來支援批次和單一項目建立操作。
4.1.實施批次控制器
我們將實作一個帶有端點的BulkController類,以便在一次呼叫中建立單一或多個客戶。
首先,我們將以 JSON 格式定義批次請求:
[
{
"name": "<name>",
"email": "<email>",
"address": "<address>"
}
]
透過這種方法,我們可以使用自訂HTTP標頭( X-ActionType )來處理批次操作,以區分批次操作或單項操作。
然後,我們將在BulkController類別中實作*bulkCreateCustomers()*方法並使用上面的CustomerService's方法:
@PostMapping(path = "/customers/bulk")
public ResponseEntity<List<Customer>> bulkCreateCustomers(
@RequestHeader(value="X-ActionType") String actionType,
@RequestBody @Valid @Size(min = 1, max = 20) List<Customer> customers) {
List<Customer> customerList = actionType.equals("bulk") ?
customerService.createCustomers(customers) :
singletonList(customerService.createCustomer(customers.get(0)).orElse(null));
return new ResponseEntity<>(customerList, HttpStatus.CREATED);
}
在上面的程式碼中,我們使用X-ActionType標頭來接受任何批次請求。此外,我們也使用Size註解新增了輸入請求大小驗證。程式碼決定是將整個清單傳遞給createCustomers()還是僅將元素0傳遞給createCustomer() 。
不同的建立函數傳回一個清單或單一Optional ,因此我們將後者轉換為List ,以使HTTP回應在兩種情況下都相同。
4.2.驗證批量 API
我們將運行應用程式並透過執行上述端點來驗證批次操作:
$ curl -i --request POST 'http://localhost:8080/api/customers/bulk' \
--header 'X-ActionType: bulk' \
--header 'Content-Type: application/json' \
--data-raw '[
{
"name": "test1",
"email": "[email protected]",
"address": "address1"
},
{
"name": "test2",
"email": "[email protected]",
"address": "address2"
}
]'
對於創建的客戶,我們將得到以下成功回應:
HTTP/1.1 201
[{"id":1,"name":"test1","email":"[email protected]","address":"address1"},
{"id":1,"name":"test2","email":"[email protected]","address":"address2"},
...
接下來,我們將實作另一種批次操作方法。
5. 使用不同端點實作批次 API
在批次 API 中對相同資源執行不同的操作並不常見。不過,讓我們看看最靈活的方法,看看如何實現。
我們可以實作原子批量操作,其中整個請求在單一事務中成功或失敗。或者,我們可以允許成功的更新獨立於失敗的更新而發生,並提供指示更新是完全成功還是部分成功的回應。我們將實施其中的第二個。
5.1.定義請求和回應模型
讓我們考慮在一次呼叫中建立、更新和刪除多個客戶的用例。
我們將批次請求定義為 JSON 格式:
[
{
"bulkActionType": "<CREATE OR UPDATE OR DELETE>",
"customers": [
{
"name": "<name>",
"email": "<email>",
"address": "<address>"
}
]
}
]
首先,我們將上述 JSON 格式建模到CustomerBulkRequest class :
public class CustomerBulkRequest {
private BulkActionType bulkActionType;
private List<Customer> customers;
//standard getters and setters
}
接下來,我們將實作BulkActionType enum :
public enum BulkActionType {
CREATE, UPDATE, DELETE
}
然後,我們將CustomerBulkResponse class定義為 HTTP 回應物件:
public class CustomerBulkResponse {
private BulkActionType bulkActionType;
private List<Customer> customers;
private BulkStatus status;
//standard getters and setters
}
最後,我們將定義BulkStatus枚舉來指定每個操作的傳回狀態:
public enum BulkStatus {
PROCESSED, PARTIALLY_PROCESSED, NOT_PROCESSED
}
5.2.實施批次控制器
我們將實作一個批次 API,該 API 接受批次請求並基於bulkActionType enum進行處理,然後一起傳回批次狀態和客戶資料。
首先,我們將在BulkController類別中建立一個EnumMap並將BulkActionType enum對應到自己的CustomerService's Function :
@RestController
@RequestMapping("/api")
@Validated
public class BulkController {
private final CustomerService customerService;
private final EnumMap<BulkActionType, Function<Customer, Optional<Customer>>> bulkActionFuncMap =
new EnumMap<>(BulkActionType.class);
public BulkController(CustomerService customerService) {
this.customerService = customerService;
bulkActionFuncMap.put(BulkActionType.CREATE, customerService::createCustomer);
bulkActionFuncMap.put(BulkActionType.UPDATE, customerService::updateCustomer);
bulkActionFuncMap.put(BulkActionType.DELETE, customerService::deleteCustomer);
}
}
這個EnumMap提供了請求類型和我們需要滿足它的CustomerService上的方法之間的綁定。它幫助我們避免冗長的switch或if語句。
我們可以將針對操作類型從EnumMap傳回的Function傳遞給Customer物件流上的map()方法:
List<Customer> customers = customerBulkRequest.getCustomers().stream()
.map(bulkActionFuncMap.get(customerBulkRequest.getBulkActionType()))
...
由於我們所有的Function物件都從Customer映射到Optional<Customer> ,這本質上是使用流中的map()操作來執行批量請求,將生成的Customer留在流中(如果可用)。
讓我們將其放在完整的控制器方法中:
@PostMapping(path = "/customers/bulk")
public ResponseEntity<List<CustomerBulkResponse>> bulkProcessCustomers(
@RequestBody @Valid @Size(min = 1, max = 20)
List<CustomerBulkRequest> customerBulkRequests) {
List<CustomerBulkResponse> customerBulkResponseList = new ArrayList<>();
customerBulkRequests.forEach(customerBulkRequest -> {
List<Customer> customers = customerBulkRequest.getCustomers().stream()
.map(bulkActionFuncMap.get(customerBulkRequest.getBulkActionType()))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
BulkStatus bulkStatus = getBulkStatus(customerBulkRequest.getCustomers(),
customers);
customerBulkResponseList.add(CustomerBulkResponse.getCustomerBulkResponse(customers,
customerbulkRequest.getBulkActionType(), bulkStatus));
});
return new ResponseEntity<>(customerBulkResponseList, HttpStatus.Multi_Status);
}
此外,我們將完成getBulkStatus方法,以根據建立的客戶數量傳回特定的bulkStatus enum :
private BulkStatus getBulkStatus(List<Customer> customersInRequest,
List<Customer> customersProcessed) {
if (!customersProcessed.isEmpty()) {
return customersProcessed.size() == customersInRequest.size() ?
BulkStatus.PROCESSED :
BulkStatus.PARTIALLY_PROCESSED;
}
return BulkStatus.NOT_PROCESSED;
}
我們應該注意,也可以考慮為每個操作之間的**任何衝突新增輸入驗證**。
5.3.驗證批量 API
我們將運行該應用程式並呼叫上述端點,即 /customers/bulk:
$ curl -i --request POST 'http://localhost:8080/api/customers/bulk' \
--header 'Content-Type: application/json' \
--data-raw '[
{
"bulkActionType": "CREATE",
"customers": [
{
"name": "test4",
"email": "[email protected]",
...
}
]
},
{
"bulkActionType": "UPDATE",
"customers": [
...
]
},
{
"bulkActionType": "DELETE",
"customers": [
...
]
}
]'
現在讓我們驗證一下成功的回應:
HTTP/1.1 207
[{"customers":[{"id":4,"name":"test4","email":"[email protected]","address":"address4"}],"status":"PROCESSED","bulkType":"CREATE"},
...
接下來,我們將實作一個批次 API,用於處理客戶和地址,並將它們捆綁在一個批次呼叫中。
6. 實作批量 API
通常,批次 API 請求是具有自己的方法、資源 URL 和負載的子請求的集合。
我們將實作一個批次 API,用於建立和更新兩種資源類型。當然,我們可以包含其他操作,例如刪除操作。但是,為了簡單起見,我們將只考慮POST和PATCH方法。
6.1.實施批次請求模型
首先,我們將以 JSON 格式定義混合資料請求模型:
[
{
"method": "POST",
"relativeUrl": "/address",
"data": {
"street": "<street>",
"city": "<city>"
}
},
{
"method": "PATCH",
"relativeUrl": "/customer",
"data": {
"id": "<id>",
"name": "<name>",
"email": "<email>",
"address": "<address>"
}
}
]
我們將以上 JSON 結構實作為BatchRequest class :
public class BatchRequest {
private HttpMethod method;
private String relativeUrl;
private JsonNode data;
//standard getters and setters
}
6.2.實施批次控制器
我們將實作批量 API 來建立地址並在單一請求中更新客戶的地址。為了簡單起見,我們將在同一個微服務中編寫此 API。在另一種架構模式中,我們可能會選擇在並行呼叫各個端點的不同微服務中實現它。
對於上面的BatchRequest類,我們在將JsonNode反序列化為特定類型class時會遇到問題。我們可以透過使用ObjectMapper's convertValue方法將JsonNode轉換為強型別物件來輕鬆解決這個問題。
對於批次 API,我們將根據 BatchRequest 類別中請求的HttpMethod和relativeUrl參數呼叫AddressService或CustomerService方法BatchRequest class.
我們將實作批次端點 在BatchController class中:
@PostMapping(path = "/batch")
public String batchUpdateCustomerWithAddress(
@RequestBody @Valid @Size(min = 1, max = 20) List<BatchRequest> batchRequests) {
batchRequests.forEach(batchRequest -> {
if (batchRequest.getMethod().equals(HttpMethod.POST) &&
batchRequest.getRelativeUrl().equals("/address")) {
addressService.createAddress(objectMapper.convertValue(batchRequest.getData(),
Address.class));
} else if (batchRequest.getMethod().equals(HttpMethod.PATCH) &&
batchRequest.getRelativeUrl().equals("/customer")) {
customerService.updateCustomer(objectMapper.convertValue(batchRequest.getData(),
Customer.class));
}
});
return "Batch update is processed";
}
6.3.驗證批量 API
我們將執行上面的/batch端點:
$ curl -i --request POST 'http://localhost:8080/api/batch' \
--header 'Content-Type: application/json' \
--data-raw '[
{
"method": "POST",
"relativeUrl": "/address",
"data": {
"street": "test1",
"city": "test"
}
},
{
"method": "PATCH",
"relativeUrl": "/customer",
"data": {
"id": "1",
"name": "test1",
"email": "[email protected]",
"address": "address2"
}
}
]'
我們將驗證以下回應:
HTTP/1.1 200
Batch update is processed
七、結論
在本文中,我們學習如何在 Spring 應用程式中應用批次和批次操作。我們也了解了它們的功能以及差異。
對於批次操作,我們在兩個不同的 API 中實作它,一個重複使用現有的POST端點來建立多個資源,另一種方法會建立一個單獨的端點以允許對相同類型的多個資源進行多個操作。
我們還實作了批次 API,允許我們對不同的資源應用不同的操作。批次 API 使用HttpMethod和relativeUrl以及有效負載組合不同的子請求。
與往常一樣,範例程式碼可以在 GitHub 上找到。