Java SynchronousQueue指南

1.概述

在本文中,我們將從java.util.concurrent包中查看*SynchronousQueue* 。

簡而言之,此實現使我們能夠以線程安全的方式在線程之間交換信息。

2. API概述

SynchronousQueue僅具有兩個受支持的操作: take()put(),它們都處於阻塞狀態

例如,當我們想向隊列添加元素時,我們需要調用put()方法。該方法將阻塞,直到其他線程調用take()方法,表明已準備好接受元素。

儘管SynchronousQueue具有隊列的接口,但我們應該將其視為兩個線程之間單個元素的交換點,其中一個線程傳遞一個元素,另一個線程傳遞該元素。

3.使用共享變量實現切換

為了了解為什麼SynchronousQueue如此有用,我們將使用兩個線程之間的共享變量來實現一個邏輯,接下來,我們將使用SynchronousQueue重寫該邏輯,使我們的代碼更簡單易讀。

假設我們有兩個線程(生產者和消費者),並且當生產者設置共享變量的值時,我們希望將該事實告知消費者線程。接下來,使用者線程將從共享變量中獲取值。

我們將使用CountDownLatch來協調這兩個線程,以防止使用者訪問尚未設置的共享變量的值的情況。

我們將定義一個sharedState變量和一個CountDownLatch ,它們將用於協調處理:

ExecutorService executor = Executors.newFixedThreadPool(2);

 AtomicInteger sharedState = new AtomicInteger();

 CountDownLatch countDownLatch = new CountDownLatch(1);

生產者將一個隨機整數保存到sharedState變量,並在countDownLatch上執行countDown()方法向消費者表明它可以從sharedState中獲取值

Runnable producer = () -> {

 Integer producedElement = ThreadLocalRandom

 .current()

 .nextInt();

 sharedState.set(producedElement);

 countDownLatch.countDown();

 };

使用者將使用await()方法等待**countDownLatch 。當生產者發出設置了變量的信號時,消費者將從sharedState獲取它

Runnable consumer = () -> {

 try {

 countDownLatch.await();

 Integer consumedElement = sharedState.get();

 } catch (InterruptedException ex) {

 ex.printStackTrace();

 }

 };

最後但並非最不重要的一點,讓我們開始我們的程序:

executor.execute(producer);

 executor.execute(consumer);



 executor.awaitTermination(500, TimeUnit.MILLISECONDS);

 executor.shutdown();

 assertEquals(countDownLatch.getCount(), 0);

它將產生以下輸出:

Saving an element: -1507375353 to the exchange point

 consumed an element: -1507375353 from the exchange point

我們可以看到,有很多代碼可以實現簡單的功能,例如在兩個線程之間交換元素。在下一節中,我們將嘗試使其更好。

4. 使用SynchronousQueue實現切換

現在,讓我們實現與上一節相同的功能,但要使用SynchronousQueue。它具有雙重作用,因為我們可以使用它來在線程之間交換狀態並協調該動作,因此我們不需要使用SynchronousQueue之外的任何東西

首先,我們將定義一個隊列:

ExecutorService executor = Executors.newFixedThreadPool(2);

 SynchronousQueue<Integer> queue = new SynchronousQueue<>();

生產者將調用put()方法,該方法將阻塞,直到其他線程從隊列中取出一個元素為止:

Runnable producer = () -> {

 Integer producedElement = ThreadLocalRandom

 .current()

 .nextInt();

 try {

 queue.put(producedElement);

 } catch (InterruptedException ex) {

 ex.printStackTrace();

 }

 };

使用者將使用take()方法簡單地檢索該元素:

Runnable consumer = () -> {

 try {

 Integer consumedElement = queue.take();

 } catch (InterruptedException ex) {

 ex.printStackTrace();

 }

 };

接下來,我們將開始我們的程序:

executor.execute(producer);

 executor.execute(consumer);



 executor.awaitTermination(500, TimeUnit.MILLISECONDS);

 executor.shutdown();

 assertEquals(queue.size(), 0);

它將產生以下輸出:

Saving an element: 339626897 to the exchange point

 consumed an element: 339626897 from the exchange point

我們可以看到,將SynchronousQueue用作線程之間的交換點,這比上一個示例使用共享狀態和CountDownLatch更好,也更容易理解

5.結論

在本快速教程中,我們研究了SynchronousQueue構造。我們創建了一個程序,該程序使用共享狀態在兩個線程之間交換數據,然後重寫該程序以利用SynchronousQueue構造。這用作協調生產者和使用者線程的交換點。

所有這些示例和代碼段的實現都可以在GitHub項目中找到–這是一個Maven項目,因此應該很容易直接導入和運行。