使用 ConcurrentHashMap 讀寫
一、簡介
在本教程中,我們將學習如何使用ConcurrentHashMap
類以線程安全的方式讀取和寫入哈希表數據結構。
2.概述
ConcurrentHashMap
是ConcurrentMap
接口的一種實現,它是 Java 提供的線程安全集合之一。它由常規映射支持,並且與Hashtable
的工作方式類似,我們將在以下部分中介紹一些細微差別。
2.2.有用的方法
ConcurrentHashMap
API 規範提供了使用集合的實用方法.
在本教程中,我們將主要研究其中兩個:
-
get(K key)
:檢索給定key
處的元素。這就是我們的閱讀方法。 -
computeIfPresent(K key, BiFunction<K, V, V> remappingFunction)
:如果key
存在,則將remappingFunction
應用於給定key
處的值。
我們將在第 3 節中看到這些方法的實際應用。
2.2.為什麼使用ConcurrentHashMap
ConcurrentHashMap
和常規HashMap
之間的主要區別在於,第一個實現讀取的總並發和寫入的高並發。
讀取操作保證不會被阻塞或阻塞一個key。寫入操作被阻塞並阻塞映射Entry
級別的其他寫入。這兩個想法在我們想要實現高吞吐量和最終一致性的環境中很重要。
HashTable
和Collections.synchronizedMap
集合也實現讀寫並發。但是,它們的效率較低,因為它們鎖定了整個集合,而不是僅鎖定線程正在寫入的Entry
。
另一方面, ConcurrentHashMap
類鎖定在映射入口級別。因此,不會阻止其他線程寫入其他映射鍵。因此,要實現高吞吐量,與HashTable
和synchronizedMap
集合相比,多線程環境中的ConcurrentHashMap
是更好的選擇。
3. 線程安全操作
ConcurrentHashMap
實現了代碼需要被視為線程安全的大部分保證。這有助於避免 Java 中一些常見的並發陷阱。
為了說明ConcurrentHashMap
如何在多線程環境中工作,我們將使用一個 Java 測試來檢索和更新給定數字的頻率。讓我們首先定義測試的基本結構:
public class ConcurrentHashMapUnitTest {
private Map<Integer, Integer> frequencyMap;
@BeforeEach
public void setup() {
frequencyMap = new ConcurrentHashMap<>();
frequencyMap.put(0, 0);
frequencyMap.put(1, 0);
frequencyMap.put(2, 0);
}
@AfterEach
public void teardown() {
frequencyMap.clear();
}
private static void sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
上面的類定義了數字的頻率圖,一個用初始值填充它的setup
方法,一個清除其內容的teardown
方法,以及一個處理InterruptedException
的輔助方法sleep
。
3.1.讀
ConcurrentHashMap
允許完全並發讀取,這意味著任何給定數量的線程都可以同時讀取相同的鍵。這也意味著讀取不會阻塞,也不會被寫入操作阻塞。因此,從地圖中讀取可能會得到“舊的”或不一致的值。
讓我們看一個例子,一個線程寫入一個鍵,第二個線程在寫入完成前讀取,第三個線程在寫入完成後讀取:
@Test
public void givenOneThreadIsWriting_whenAnotherThreadReads_thenGetCorrectValue() throws Exception {
ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
Runnable writeAfter1Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
sleep(1);
return frequencyMap.get(k) + 1;
});
Callable<Integer> readNow = () -> frequencyMap.get(1);
Callable<Integer> readAfter1001Ms = () -> {
TimeUnit.MILLISECONDS.sleep(1001);
return frequencyMap.get(1);
};
threadExecutor.submit(writeAfter1Sec);
List<Future<Integer>> results = threadExecutor.invokeAll(asList(readNow, readAfter1001Ms));
assertEquals(0, results.get(0).get());
assertEquals(1, results.get(1).get());
if (threadExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
threadExecutor.shutdown();
}
}
讓我們仔細看看上面的代碼中發生了什麼:
- 我們首先定義一個具有一個寫入線程和兩個讀取線程的
ExecutorService
。寫入操作需要一秒鐘才能完成。因此,在此之前的任何讀取都應該得到舊結果。之後的任何讀取(在本例中恰好是一毫秒之後)都應該獲得更新後的值。 - 然後,我們使用
invokeAll
調用所有讀取線程,並按順序將結果收集到列表中。因此,列表的位置零指的是第一次讀取,位置一指的是第二次讀取。 - 最後,我們使用
assertEquals
驗證已完成任務的結果並關閉ExecutorService
。
從該代碼中,我們得出結論,即使其他線程同時寫入同一資源,讀取也不會被阻塞。如果我們將讀取和寫入想像為事務,則ConcurrentHashMap
實現讀取的最終一致性。這意味著我們不會總是讀取一致的值(最新的值),但是一旦映射停止接收寫入,讀取就會再次變得一致。查看此事務簡介以獲取有關最終一致性的更多詳細信息。
提示:如果您還想使讀取阻塞並被其他讀取阻塞,請不要使用get()
方法。相反,您可以實現一個標識BiFunction
,它返回給定鍵上未修改的值,並將該函數傳遞給computeIfPresent
方法。使用它,我們將犧牲讀取速度來防止讀取舊值或不一致值時出現問題。
3.2.寫作
如前所述, ConcurrentHashMap
實現了寫入的部分並發,它會阻止其他對同一 map key 的寫入,並允許寫入不同的 key。這對於在多線程環境中實現高吞吐量和寫入一致性至關重要。為了說明一致性,讓我們定義一個測試,其中兩個線程寫入同一資源並檢查地圖如何處理:
@Test
public void givenOneThreadIsWriting_whenAnotherThreadWritesAtSameKey_thenWaitAndGetCorrectValue() throws Exception {
ExecutorService threadExecutor = Executors.newFixedThreadPool(2);
Callable<Integer> writeAfter5Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
sleep(5);
return frequencyMap.get(k) + 1;
});
Callable<Integer> writeAfter1Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
sleep(1);
return frequencyMap.get(k) + 1;
});
List<Future<Integer>> results = threadExecutor.invokeAll(asList(writeAfter5Sec, writeAfter1Sec));
assertEquals(1, results.get(0).get());
assertEquals(2, results.get(1).get());
if (threadExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
threadExecutor.shutdown();
}
}
上面的測試顯示了兩個寫入線程被提交給ExecutorService.
第一個線程需要五秒鐘寫入,第二個線程需要一秒鐘寫入。第一個線程獲取鎖並阻止映射鍵 1 處的任何其他寫入活動。因此,第二個線程必須等待五秒鐘,直到第一個線程釋放鎖。第一次寫入完成後,第二個線程獲取最新的值並在一秒鐘內更新它。
ExecutorService
的結果列表按任務提交順序排列,因此第一個元素應返回 1,第二個元素應返回 2。
ConcurrentHashMap
的另一個用例是實現不同映射鍵中寫入的高吞吐量。讓我們用另一個單元測試來說明這一點,該單元測試使用兩個寫入線程來更新映射中的不同鍵:
@Test
public void givenOneThreadIsWriting_whenAnotherThreadWritesAtDifferentKey_thenNotWaitAndGetCorrectValue() throws Exception {
ExecutorService threadExecutor = Executors.newFixedThreadPool(2);
Callable<Integer> writeAfter5Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
sleep(5);
return frequencyMap.get(k) + 1;
});
AtomicLong time = new AtomicLong(System.currentTimeMillis());
Callable<Integer> writeAfter1Sec = () -> frequencyMap.computeIfPresent(2, (k, v) -> {
sleep(1);
time.set((System.currentTimeMillis() - time.get()) / 1000);
return frequencyMap.get(k) + 1;
});
threadExecutor.invokeAll(asList(writeAfter5Sec, writeAfter1Sec));
assertEquals(1, time.get());
if (threadExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
threadExecutor.shutdown();
}
}
該測試驗證第二個線程不需要等待第一個線程完成,因為寫入發生在不同的映射鍵上。因此,第二次寫入只需一秒鐘即可完成。在ConcurrentHashMap,
線程可以同時工作在不同的 map entry 中,並發寫操作比其他線程安全結構更快。
4。結論
在本文中,我們了解瞭如何寫入和讀取ConcurrentHashMap
以實現寫入和讀取的高吞吐量以及讀取的最終一致性。與往常一樣,源代碼可在 GitHub 上獲得。