Java 中的智能批處理
1. 概述
在本教程中,我們將了解智能批處理模式。我們將首先了解微批處理及其優缺點,然後我們將了解智能批處理如何緩解其問題。我們還將使用簡單的 Java 數據結構來查看這兩種模式的一些示例。
2. 微配料
我們可以考慮將微批處理作為智能批處理模式的基礎。儘管較差,但它是我們構建智能批處理的基礎。
2.1.什麼是微批處理?
微批處理是一種優化技術,適用於工作負載由突發小任務組成的系統。儘管它們的計算開銷很小,但它們具有某種支持每秒少量請求的操作,例如對 I/O 設備的寫入。
當我們採用微批處理模式時,我們避免單獨處理傳入的任務。相反,我們將它們聚合成一批,一旦足夠大,我們就一起處理它們。
通過這種分組技術,我們可以優化資源利用率,特別是在 I/O 設備方面。這種方法可以幫助我們減少逐一處理突發小任務所帶來的延遲。
2.2.它是如何工作的?
實現微批處理的最簡單方法是將傳入任務緩存在集合(例如Queue
中。一旦集合超過特定大小(由目標系統的屬性決定),我們就會收集達到該限制的所有任務並一起處理它們。
讓我們創建一個最小的MicroBatcher
類:
class MicroBatcher {
Queue<String> tasksQueue = new ConcurrentLinkedQueue<>();
Thread batchThread;
int executionThreshold;
int timeoutThreshold;
MicroBatcher(int executionThreshold, int timeoutThreshold, Consumer<List<String>> executionLogic) {
batchThread = new Thread(batchHandling(executionLogic));
batchThread.setDaemon(true);
batchThread.start();
this.executionThreshold = executionThreshold;
this.timeoutThreshold = timeoutThreshold;
}
void submit(String task) {
tasksQueue.add(task);
}
Runnable batchHandling(Consumer<List<String>> executionLogic) {
return () -> {
while (!batchThread.isInterrupted()) {
long startTime = System.currentTimeMillis();
while (tasksQueue.size() < executionThreshold && (System.currentTimeMillis() - startTime) < timeoutThreshold) {
Thread.sleep(100);
}
List<String> tasks = new ArrayList<>(executionThreshold);
while (tasksQueue.size() > 0 && tasks.size() < executionThreshold) {
tasks.add(tasksQueue.poll());
}
executionLogic.accept(tasks);
}
};
}
}
我們的批處理程序類有兩個重要的字段: tasksQueue,
和batchThread
。
作為我們的Queue
實現,我們選擇ConcurrentLinkedQueue
,因為它提供並發訪問並且可以根據需要增長。這是所有提交的任務所在的位置。在我們的例子中,我們將它們表示為簡單的String
對象,我們將其作為參數提供給我們在外部定義的executionLogic
。
此外,我們的MicroBatcher
有一個用於批處理的專用Thread
。需要注意的是,任務的提交和處理必須在不同的線程中完成。這種解耦是延遲最小化最重要的部分。這是因為我們只讓一個線程發出慢速請求,而其餘線程可以根據需要盡快提交任務,因為它們不會被操作阻塞。
最後,我們定義executionThreshold
和timeoutThreshold
。第一個確定在執行任務之前必須緩衝的任務數量。其值取決於目標操作。例如,如果我們正在寫入網絡設備,則閾值應等於最大數據包大小。第二個是我們在處理任務之前等待緩衝的最長時間,即使尚未達到executionThreshold
。
2.3.優點和缺點
通過使用微批量模式,我們獲得了很多好處。首先,它提高了我們的吞吐量,因為無論執行狀態如何都會提交任務,這意味著我們的系統響應速度更快。
此外,通過調整微批量處理程序,我們可以正確利用底層資源(例如磁盤存儲)並將其飽和到最佳水平。
最後,它很好地符合現實世界的流量,現實世界的流量很少是均勻的,並且通常是突發的。
然而,這種實現的最重要的缺點之一是,當系統沒有負載時,例如在晚上,即使是單個請求在處理之前也被迫等待timeoutThreshold
。這會導致資源利用不足,最重要的是,導致糟糕的用戶體驗。
3. 智能配料
輸入智能批處理,這是微批處理的修改版本。不同之處在於,我們省略了timeoutThreshold
,而不是等待隊列填滿任務,而是立即執行任意數量的任務,直到executionThreshold
。
通過這個簡單的更改,我們避免了上述的低流量延遲問題,同時仍然保留微批處理的所有優點。原因是,通常,處理一批任務所需的時間足以讓隊列填滿下一批任務。因此,我們可以實現最佳的資源使用,並避免阻塞單個任務的執行,以防萬一這就是所有待處理的任務。
讓我們將MicroBatcher
轉換為SmartBatcher
:
class SmartBatcher {
BlockingQueue<String> tasksQueue = new LinkedBlockingQueue<>();
Thread batchThread;
int executionThreshold;
boolean working = false;
SmartBatcher(int executionThreshold, Consumer<List<String>> executionLogic) {
batchThread = new Thread(batchHandling(executionLogic));
batchThread.setDaemon(true);
batchThread.start();
this.executionThreshold = executionThreshold;
}
Runnable batchHandling(Consumer<List<String>> executionLogic) {
return () -> {
while (!batchThread.isInterrupted()) {
List<String> tasks = new ArrayList<>(executionThreshold);
while(tasksQueue.drainTo(tasks, executionThreshold) == 0) {
Thread.sleep(100);
}
working = true;
executionLogic.accept(tasks);
working = false;
}
};
}
}
我們在新的實施中改變了三件事。首先,我們刪除了timeoutThreshold
。其次,我們將Queue
實現更改為BlockingQueue
。它們支持drainTo()
方法,該方法非常適合我們的需求。最後,我們利用這個方法來簡化我們的batchHandling()
邏輯。
5. 無批處理與批處理比較
讓我們創建一個具有簡單場景的應用程序類,以針對批處理方法測試簡單方法:
class BatchingApp {
public static void main(String[] args) throws Exception {
final Path testPath = Paths.get("./test.txt");
testPath.toFile().createNewFile();
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(100);
Set<Future> futures = new HashSet<>();
for (int i = 0; i < 50000; i++) {
futures.add(executorService.submit(() -> {
Files.write(testPath, Collections.singleton(Thread.currentThread().getName()), StandardOpenOption.APPEND);
}));
}
long start = System.currentTimeMillis();
for (Future future : futures) {
future.get();
}
System.out.println("Time: " + (System.currentTimeMillis() - start));
executorService.shutdown();
}
}
我們為 I/O 操作選擇了簡單的文件寫入。我們創建一個test.txt
文件並使用100
線程向其中寫入50000
行。儘管控制台中顯示的時間取決於目標硬件,但這裡有一個示例:
Time (ms): 4968
即使嘗試使用不同的線程數,時間仍然在4500
毫秒左右。看來我們已經達到了硬件的極限。
現在讓我們切換到SmartBatcher
:
class BatchingApp {
public static void main(String[] args) throws Exception {
final Path testPath = Paths.get("./testio.txt");
testPath.toFile().createNewFile();
SmartBatcher batcher = new SmartBatcher(10, strings -> {
List<String> content = new ArrayList<>(strings);
content.add("-----Batch Operation-----");
Files.write(testPath, content, StandardOpenOption.APPEND);
});
for (int i = 0; i < 50000; i++) {
batcher.submit(Thread.currentThread().getName() + "-1");
}
long start = System.currentTimeMillis();
while (!batcher.finished());
System.out.println("Time: " + (System.currentTimeMillis() - start));
}
}
我們向SmartBatcher
添加了finished()
方法來檢查所有任務何時完成:
boolean finished() {
return tasksQueue.isEmpty() && !working;
}
這是顯示的新時間:
Time (ms): 1053
即使executionThreshold
10,我們也實現了五倍的改進。將閾值增加到100
可將時間減少到約 150 毫秒,幾乎比簡單方法快 50 倍。
正如我們所看到的,採用一種利用底層硬件特性的簡單技術可以極大地提高我們的應用程序性能。我們應該始終牢記我們的系統正在做什麼以及它正在處理的流量。
5. 結論
在本文中,我們概述了任務批處理技術,特別是微批處理和智能批處理。我們看到了潛在的用例、微批處理的優缺點,以及智能批處理如何減輕其缺點。最後,我們比較了簡單任務執行和批量執行之間的比較。
與往常一樣,本文的源代碼可在 GitHub 上獲取。