使用 ConcurrentHashMap 讀寫
一、簡介
在本教程中,我們將學習如何使用ConcurrentHashMap類以線程安全的方式讀取和寫入哈希表數據結構。
2.概述
ConcurrentHashMap是ConcurrentMap接口的一種實現,它是 Java 提供的線程安全集合之一。它由常規映射支持,並且與Hashtable的工作方式類似,我們將在以下部分中介紹一些細微差別。
2.2.有用的方法
ConcurrentHashMap API 規範提供了使用集合的實用方法.在本教程中,我們將主要研究其中兩個:
-   
get(K key):檢索給定key處的元素。這就是我們的閱讀方法。 -   
computeIfPresent(K key, BiFunction<K, V, V> remappingFunction):如果key存在,則將remappingFunction應用於給定key處的值。 
我們將在第 3 節中看到這些方法的實際應用。
2.2.為什麼使用ConcurrentHashMap
ConcurrentHashMap和常規HashMap之間的主要區別在於,第一個實現讀取的總並發和寫入的高並發。
讀取操作保證不會被阻塞或阻塞一個key。寫入操作被阻塞並阻塞映射Entry級別的其他寫入。這兩個想法在我們想要實現高吞吐量和最終一致性的環境中很重要。
HashTable和Collections.synchronizedMap集合也實現讀寫並發。但是,它們的效率較低,因為它們鎖定了整個集合,而不是僅鎖定線程正在寫入的Entry 。
另一方面, ConcurrentHashMap類鎖定在映射入口級別。因此,不會阻止其他線程寫入其他映射鍵。因此,要實現高吞吐量,與HashTable和synchronizedMap集合相比,多線程環境中的ConcurrentHashMap是更好的選擇。
3. 線程安全操作
ConcurrentHashMap實現了代碼需要被視為線程安全的大部分保證。這有助於避免 Java 中一些常見的並發陷阱。
為了說明ConcurrentHashMap如何在多線程環境中工作,我們將使用一個 Java 測試來檢索和更新給定數字的頻率。讓我們首先定義測試的基本結構:
public class ConcurrentHashMapUnitTest {
 private Map<Integer, Integer> frequencyMap;
 @BeforeEach
 public void setup() {
 frequencyMap = new ConcurrentHashMap<>();
 frequencyMap.put(0, 0);
 frequencyMap.put(1, 0);
 frequencyMap.put(2, 0);
 }
 @AfterEach
 public void teardown() {
 frequencyMap.clear();
 }
 private static void sleep(int timeout) {
 try {
 TimeUnit.SECONDS.sleep(timeout);
 } catch (InterruptedException e) {
 throw new RuntimeException(e);
 }
 }
 }
上面的類定義了數字的頻率圖,一個用初始值填充它的setup方法,一個清除其內容的teardown方法,以及一個處理InterruptedException的輔助方法sleep 。
3.1.讀
ConcurrentHashMap允許完全並發讀取,這意味著任何給定數量的線程都可以同時讀取相同的鍵。這也意味著讀取不會阻塞,也不會被寫入操作阻塞。因此,從地圖中讀取可能會得到“舊的”或不一致的值。
讓我們看一個例子,一個線程寫入一個鍵,第二個線程在寫入完成前讀取,第三個線程在寫入完成後讀取:
@Test
 public void givenOneThreadIsWriting_whenAnotherThreadReads_thenGetCorrectValue() throws Exception {
 ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
 Runnable writeAfter1Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
 sleep(1);
 return frequencyMap.get(k) + 1;
 });
 Callable<Integer> readNow = () -> frequencyMap.get(1);
 Callable<Integer> readAfter1001Ms = () -> {
 TimeUnit.MILLISECONDS.sleep(1001);
 return frequencyMap.get(1);
 };
 threadExecutor.submit(writeAfter1Sec);
 List<Future<Integer>> results = threadExecutor.invokeAll(asList(readNow, readAfter1001Ms));
 assertEquals(0, results.get(0).get());
 assertEquals(1, results.get(1).get());
 if (threadExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
 threadExecutor.shutdown();
 }
 }
讓我們仔細看看上面的代碼中發生了什麼:
-  我們首先定義一個具有一個寫入線程和兩個讀取線程的
ExecutorService。寫入操作需要一秒鐘才能完成。因此,在此之前的任何讀取都應該得到舊結果。之後的任何讀取(在本例中恰好是一毫秒之後)都應該獲得更新後的值。 -  然後,我們使用
invokeAll調用所有讀取線程,並按順序將結果收集到列表中。因此,列表的位置零指的是第一次讀取,位置一指的是第二次讀取。 -  最後,我們使用
assertEquals驗證已完成任務的結果並關閉ExecutorService。 
從該代碼中,我們得出結論,即使其他線程同時寫入同一資源,讀取也不會被阻塞。如果我們將讀取和寫入想像為事務,則ConcurrentHashMap實現讀取的最終一致性。這意味著我們不會總是讀取一致的值(最新的值),但是一旦映射停止接收寫入,讀取就會再次變得一致。查看此事務簡介以獲取有關最終一致性的更多詳細信息。
提示:如果您還想使讀取阻塞並被其他讀取阻塞,請不要使用get()方法。相反,您可以實現一個標識BiFunction ,它返回給定鍵上未修改的值,並將該函數傳遞給computeIfPresent方法。使用它,我們將犧牲讀取速度來防止讀取舊值或不一致值時出現問題。
3.2.寫作
如前所述, ConcurrentHashMap實現了寫入的部分並發,它會阻止其他對同一 map key 的寫入,並允許寫入不同的 key。這對於在多線程環境中實現高吞吐量和寫入一致性至關重要。為了說明一致性,讓我們定義一個測試,其中兩個線程寫入同一資源並檢查地圖如何處理:
@Test
 public void givenOneThreadIsWriting_whenAnotherThreadWritesAtSameKey_thenWaitAndGetCorrectValue() throws Exception {
 ExecutorService threadExecutor = Executors.newFixedThreadPool(2);
 Callable<Integer> writeAfter5Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
 sleep(5);
 return frequencyMap.get(k) + 1;
 });
 Callable<Integer> writeAfter1Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
 sleep(1);
 return frequencyMap.get(k) + 1;
 });
 List<Future<Integer>> results = threadExecutor.invokeAll(asList(writeAfter5Sec, writeAfter1Sec));
 assertEquals(1, results.get(0).get());
 assertEquals(2, results.get(1).get());
 if (threadExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
 threadExecutor.shutdown();
 }
 }
上面的測試顯示了兩個寫入線程被提交給ExecutorService.第一個線程需要五秒鐘寫入,第二個線程需要一秒鐘寫入。第一個線程獲取鎖並阻止映射鍵 1 處的任何其他寫入活動。因此,第二個線程必須等待五秒鐘,直到第一個線程釋放鎖。第一次寫入完成後,第二個線程獲取最新的值並在一秒鐘內更新它。
ExecutorService的結果列表按任務提交順序排列,因此第一個元素應返回 1,第二個元素應返回 2。
ConcurrentHashMap的另一個用例是實現不同映射鍵中寫入的高吞吐量。讓我們用另一個單元測試來說明這一點,該單元測試使用兩個寫入線程來更新映射中的不同鍵:
@Test
 public void givenOneThreadIsWriting_whenAnotherThreadWritesAtDifferentKey_thenNotWaitAndGetCorrectValue() throws Exception {
 ExecutorService threadExecutor = Executors.newFixedThreadPool(2);
 Callable<Integer> writeAfter5Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
 sleep(5);
 return frequencyMap.get(k) + 1;
 });
 AtomicLong time = new AtomicLong(System.currentTimeMillis());
 Callable<Integer> writeAfter1Sec = () -> frequencyMap.computeIfPresent(2, (k, v) -> {
 sleep(1);
 time.set((System.currentTimeMillis() - time.get()) / 1000);
 return frequencyMap.get(k) + 1;
 });
 threadExecutor.invokeAll(asList(writeAfter5Sec, writeAfter1Sec));
 assertEquals(1, time.get());
 if (threadExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
 threadExecutor.shutdown();
 }
 }
該測試驗證第二個線程不需要等待第一個線程完成,因為寫入發生在不同的映射鍵上。因此,第二次寫入只需一秒鐘即可完成。在ConcurrentHashMap,線程可以同時工作在不同的 map entry 中,並發寫操作比其他線程安全結構更快。
4。結論
在本文中,我們了解瞭如何寫入和讀取ConcurrentHashMap以實現寫入和讀取的高吞吐量以及讀取的最終一致性。與往常一樣,源代碼可在 GitHub 上獲得。