線程通信

在現實生活中,如果一個人團隊正在共同完成任務,那麼他們之間應該有通信,以便正確完成任務。 同樣的比喻也適用於線程。 在編程中,要減少處理器的理想時間,我們創建了多個線程,併爲每個線程分配不同的子任務。 因此,必須有一個通信設施,他們應該互相溝通交流,以同步的方式完成工作。

考慮以下與線程通信相關的重要問題 -

  • 沒有性能增益 - 如果無法在線程和進程之間實現適當的通信,那麼併發性和並行性的性能收益是沒有用的。
  • 完成任務 - 如果線程之間沒有適當的相互通信機制,分配的任務將無法正常完成。
  • 比進程間通信更高效 - 線程間通信比進程間通信更高效且更易於使用,因爲進程內的所有線程共享相同的地址空間,並且不需要使用共享內存。

線程安全通信的Python數據結構

多線程代碼出現了將信息從一個線程傳遞到另一個線程的問題。 標準的通信原語不能解決這個問題。 因此,需要實現我們自己的組合對象,以便在線程之間共享對象以使通信線程安全。 以下是一些數據結構,它們在進行一些更改後提供線程安全通信 -

1. Set

爲了以線程安全的方式使用set數據結構,需要擴展set類來實現我們自己的鎖定機制。

以下是一個擴展類的Python示例 -

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()

   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

在上面的例子中,定義了一個名爲extend_class的類對象,它繼承自Python集合類。 在這個類的構造函數中創建一個鎖對象。 現在有兩個函數 - add()delete()。 這些函數被定義並且是線程安全的。 它們都依賴於超類功能以及一個鍵異常。

修飾器
這是線程安全通信的另一個關鍵方法是使用裝飾器。

示例

考慮一個Python示例,展示如何使用裝飾器和mminus;

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

在上面的例子中,已經定義了一個名爲lock_decorator的裝飾器方法,該方法從Python方法類繼承。 然後在這個類的構造函數中創建一個鎖對象。 現在有兩個函數 - add()delete()。 這些函數被定義並且是線程安全的。 他們都依靠超類功能和一個鍵異常。

2. list

列表數據結構對於臨時內存存儲而言是線程安全,快速以及簡單的結構。在Cpython中,GIL可以防止對它們的併發訪問。當我們知道列表是線程安全的,但是數據在哪裏呢。實際上,該列表的數據不受保護。例如,如果另一個線程試圖做同樣的事情,則L.append(x)不保證能夠返回預期的結果。這是因爲儘管append()是一個原子操作並且是線程安全的,但另一個線程試圖以併發方式修改列表數據,因此可以看到競爭條件對輸出的副作用。

爲了解決這類問題並安全地修改數據,我們必須實現一個適當的鎖定機制,這進一步確保多個線程不會潛在競爭條件。爲了實現適當的鎖定機制,可以像前面的例子那樣擴展這個類。

列表上的其他一些原子操作如下所示 -

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

這裏 -

  • LL1L2都是列表
  • DD1D2是字典
  • xy是對象
  • ij是整數

3. 隊列

如果清單數據不受保護,我們可能不得不面對後果。 可能會得到或刪除錯誤的數據項,競爭條件。 這就是爲什麼建議使用隊列數據結構的原因。 一個真實世界的排隊示例可以是單車道單向道路,車輛首先進入,首先退出。 售票窗口和公共汽車站的隊列中可以看到更多真實世界的例子。
線程通信

隊列是默認的線程安全數據結構,我們不必擔心實現複雜的鎖定機制。 Python提供了應用程序中使用不同類型隊列的模塊。

隊列類型

在本節中,我們將獲得關於不同類型的隊列的信息。 Python提供了三種從queue模塊使用的隊列選項 -

  • 正常隊列(FIFO,先進先出)
  • 後進先出,後進先出
  • 優先級

我們將在隨後的章節中瞭解不同的隊列。

正常隊列(FIFO,先進先出)
它是Python提供的最常用的隊列實現。 在這種先排隊的機制中,首先得到服務。 FIFO也被稱爲正常隊列。 FIFO隊列可以表示如下 -

線程通信

FIFO隊列的Python實現

在python中,FIFO隊列可以用單線程和多線程來實現。

具有單線程的FIFO隊列
要實現單線程的FIFO隊列,Queue類將實現一個基本的先進先出容器。 使用put()將元素添加到序列的一個「結尾」,並使用get()從另一端移除元素。

示例

以下是用單線程實現FIFO隊列的Python程序 -

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

執行上面示例代碼,得到以下結果 -

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

輸出結果顯示上面的程序使用單個線程來說明這些元素將按照它們插入的順序從隊列中移除。

具有多個線程的FIFO隊列

爲了實現多線程的FIFO,需要從queue模塊擴展來定義myqueue()函數。 get()put()方法的工作方式與上面討論的一樣,只用單線程實現FIFO隊列。 然後爲了使它成爲多線程,我們需要聲明和實例化線程。 這些線程將以FIFO方式使用隊列。

示例
以下是用於實現具有多個線程的FIFO隊列的Python程序 -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

執行上面示例代碼,得到以下結果 -

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

4. LIFO,後進先出隊列

隊列使用與FIFO(先進先出)隊列完全相反的類比。 在這個隊列機制中,最後一個將首先獲得服務。 這與實現堆棧數據結構相似。 LIFO隊列在實施深度優先搜索時非常有用,如人工智能算法。

LIFO隊列的Python實現
在python中,LIFO隊列可以用單線程和多線程來實現。

單線程的LIFO隊列
要用單線程實現LIFO隊列,Queue類將使用結構Queue.LifoQueue來實現基本的後進先出容器。 現在,在調用put()時,將元素添加到容器的頭部,並使用get()從頭部移除。

示例
以下是用單線程實現LIFO隊列的Python程序 -

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")

執行上面示例代碼,得到以下結果 -

item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

輸出顯示上述程序使用單個線程來說明元素將以插入的相反順序從隊列中移除。

帶有多個線程的LIFO隊列

這個實現與使用多線程實現FIFO隊列相似。 唯一的區別是需要使用Queue類,該類將使用結構Queue.LifoQueue來實現基本的後進先出容器。

示例
以下是用於實現具有多個線程的LIFO隊列的Python程序 -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

執行上面示例代碼,得到以下結果 -

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

優先隊列

在FIFO和LIFO隊列中,項目順序與插入順序有關。 但是,有很多情況下優先級比插入順序更重要。 讓我們考慮一個真實世界的例子。 假設機場的安保人員正在檢查不同類別的人員。 VVIP的人員,航空公司工作人員,海關人員,類別可能會優先檢查,而不是像到平民那樣根據到達情況進行檢查。

需要考慮優先隊列的另一個重要方面是如何開發任務調度器。 一種常見的設計是在隊列中優先處理最具代理性的任務。 該數據結構可用於根據隊列的優先級值從隊列中提取項目。

優先隊列的Python實現
在python中,優先級隊列可以用單線程和多線程來實現。

單線程優先隊列

要實現單線程優先隊列,Queue類將使用結構Queue.PriorityQueue在優先級容器上實現任務。 現在,在調用put()時,元素將添加一個值,其中最低值將具有最高優先級,並因此使用get()首先檢索。

示例

考慮下面的Python程序來實現單線程的優先級隊列 -

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

執行上面示例代碼,得到以下結果 -

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

在上面的輸出中,可以看到隊列已經存儲了基於優先級的項目 - 較少的值具有高優先級。

具有多線程的優先隊列

該實現類似於具有多個線程的FIFO和LIFO隊列的實現。 唯一的區別是需要使用Queue類通過使用結構Queue.PriorityQueue來初始化優先級。 另一個區別是隊列的生成方式。 在下面給出的例子中,它將生成兩個相同的數據集。

示例
以下Python程序有助於實現具有多個線程的優先級隊列 -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

執行上面示例代碼,得到以下結果 -

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue