Java併發BlockingQueue接口

java.util.concurrent.BlockingQueue接口是Queue接口的子接口,另外還支持諸如在檢索元素之前等待隊列變爲非空的操作,並在存儲元素之前等待隊列中的空間變得可用 。

BlockingQueue接口中的方法

序號

方法

描述

1

boolean add(E e)

將指定的元素插入到此隊列中,如果可以立即執行此操作,而不會違反容量限制,在成功時返回true,並且如果當前沒有空間可用,則拋出IllegalStateException

2

boolean contains(Object o)

如果此隊列包含指定的元素,則返回true

3

int drainTo(Collection<? super E> c)

從該隊列中刪除所有可用的元素,並將它們添加到給定的集合中。

4

int drainTo(Collection<? super E> c, int maxElements)

最多從該隊列中刪除給定數量的可用元素,並將它們添加到給定的集合中。

5

boolean offer(E e)

將指定的元素插入到此隊列中,如果可以立即執行此操作而不違反容量限制,則成功返回true,如果當前沒有空間可用,則返回false

6

boolean offer(E e, long timeout, TimeUnit unit)

將指定的元素插入到此隊列中,等待指定的等待時間(如有必要)才能使空間變得可用。

7

E poll(long timeout, TimeUnit unit)

檢索並刪除此隊列的頭,等待指定的等待時間(如有必要)使元素變爲可用。

8

void put(E e)

將指定的元素插入到此隊列中,等待空間/容量可用。

9

int remainingCapacity()

返回此隊列可理想地(在沒有內存或資源約束的情況下)接受而不阻止的附加元素數,如果沒有內在限制則返回Integer.MAX_VALUE

10

boolean remove(Object o)

從該隊列中刪除指定元素的單個實例(如果存在)。

11

E take()

檢索並刪除此隊列的頭,如有必要,等待元素可用。

實例

以下TestThread程序顯示了基於線程的環境中BlockingQueue接口的使用。

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);

      new Thread(producer).start();
      new Thread(consumer).start();

      Thread.sleep(4000);
   }  


   static class Producer implements Runnable {

      private BlockingQueue<Integer> queue;

      public Producer(BlockingQueue queue){
         this.queue = queue;
      }

      @Override
      public void run() {
         Random random = new Random();

         try {
            int result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }       
   }

   static class Consumer implements Runnable {

      private BlockingQueue<Integer> queue;

      public Consumer(BlockingQueue queue){
         this.queue = queue;
      }

      @Override
      public void run() {
         try {
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

這將產生以下結果 -

Added: 73
Removed: 73
Added: 19
Removed: 19
Added: 47
Removed: 47