使用 Aeron 的 UDP 訊息傳遞
一、簡介
在本文中,我們將介紹Aeron ,這是一個由 Adaptive Financial Consulting 維護的多語言庫,旨在實現應用程式之間高效的 UDP 訊息傳遞。它專為性能而設計,旨在實現高吞吐量、低延遲和容錯。
2. 依賴關係
在使用 Aeron 之前,我們需要在建造中包含最新版本,在撰寫本文時為1.44.1 。
如果我們使用 Maven,我們可以將其依賴項包含在pom.xml中:
<dependency>
<groupId>io.aeron</groupId>
<artifactId>aeron-all</artifactId>
<version>1.44.1</version>
</dependency>
或者,如果我們使用 Gradle,我們可以將其包含在build.gradle中:
implementation("io.aeron:aeron-all:1.44.1")
此時,我們已準備好開始在我們的應用程式中使用它。
請注意,目前 Aeron 的某些部分無法在 Java 16 或更高版本中開箱即用。這是由於 JPMS 阻止了特定的互動。
3. 媒體驅動程式
Aeron 在應用程式和傳輸之間進行一定程度的間接工作。這被稱為媒體驅動程序,因為它是我們的應用程式和傳輸媒體之間的互動。
每個 Aeron 進程都與媒體驅動程式交互,並透過該驅動程式與其他進程交互——無論是在同一台機器上還是遠端。它透過檔案系統執行此互動。我們需要將媒體驅動程式和所有應用程式指向磁碟上的相同目錄,其中儲存各個方面。請注意,我們只能同時為任何給定目錄運行一個媒體驅動程式。嘗試運行多個將會失敗。
當我們想保持簡單時,我們可以運行應用程式中嵌入的媒體驅動程式:
MediaDriver mediaDriver = MediaDriver.launch();
這將啟動具有所有預設設定的媒體驅動程式。特別是,這將使用預設媒體驅動程式目錄運行。
我們還有一種專為嵌入式使用而設計的替代啟動方法。這與以前完全相同,只是它產生一個隨機目錄以確保同一台電腦上的多個實例不會發生衝突:
MediaDriver mediaDriver = MediaDriver.launchEmbedded();
在這兩種情況下,我們還可以提供MediaDriver.Context物件來進一步配置媒體驅動程式:
MediaDriver.Context context = new MediaDriver.Context();
context.threadingMode(ThreadingMode.SHARED);
MediaDriver mediaDriver = MediaDriver.launch(context);
執行此操作時,我們需要在完成後關閉媒體驅動程式。此介面實作了AutoCloseable,因此我們可以使用 try-with-resources 模式來管理它。
或者,我們可以將媒體驅動程式作為外部應用程式運行。我們可以使用作為依賴項包含的aeron-all.jar JAR 檔案來執行此操作:
$ java -cp aeron-all-1.44.1.jar io.aeron.driver.MediaDriver
這將與上面的MediaDriver.launch()完全相同。
4.Aeron API 客戶端
我們透過Aeron類別使用 Aeron 執行所有 API 互動。我們需要建立一個新實例並將其指向我們的媒體驅動程式。只需建立一個新實例,就會指向預設位置的媒體驅動程式 - 就像我們使用MediaDriver.launch()啟動它一樣:
Aeron aeron = Aeron.connect();
或者,我們可以提供Aeron.Context物件來配置連接,包括指定媒體驅動程式運行的目錄:
Aeron.Context ctx = new Aeron.Context();
ctx.aeronDirectoryName(mediaDriver.aeronDirectoryName());
Aeron aeron = Aeron.connect(ctx);
如果我們的媒體驅動程式位於非標準目錄中,包括如果我們使用MediaDriver.launchEmbedded(),則必須執行此操作。如果我們指向的目錄沒有正在運行的媒體驅動程序,則Aeron.connect()呼叫將被阻止,直到它出現為止。
我們可以根據需要將任意數量的 Aeron 用戶端連接到相同媒體驅動程式。通常,這些應用程式來自不同的應用程序,但如果需要,它們也可以來自同一個應用程式。但是,如果我們這樣做,那麼我們還需要使用Aeron.Context的新實例:
Aeron.Context ctx1 = new Aeron.Context();
ctx1.aeronDirectoryName(mediaDriver.aeronDirectoryName());
aeron1 = Aeron.connect(ctx1);
System.out.println("Aeron 1 connected: " + aeron1);
Aeron.Context ctx2 = new Aeron.Context();
ctx2.aeronDirectoryName(mediaDriver.aeronDirectoryName());
aeron2 = Aeron.connect(ctx2);
System.out.println("Aeron 2 connected: " + aeron2);
與MediaDriver一樣, Aeron實例是AutoCloseable.這意味著我們可以用 try-with-resources 模式包裝它,以確保我們正確關閉它。
5. 發送和接收訊息
現在我們已經有了 Aeron API 用戶端,我們準備好使用它來發送和接收訊息。
5.1.緩衝器
Aeron 將所有訊息(傳送和接收)表示為DirectBuffer實例。最終,這些只不過是一組字節,但它們為我們提供了一組使用一組標準類型的方法。
當我們發送訊息時,我們需要根據自己的資料自行建構緩衝區。為此,我們最好使用UnsafeBuffer實例 - 之所以命名是因為它使用sun.misc.Unsafe從底層緩衝區讀取和寫入值。建立它需要一個位元組數組或一個ByteBuffer實例,然後我們可以使用BufferUtil.allocateDirectAligned()來幫助最有效地實現它:
UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
一旦我們獲得了緩衝區,我們就擁有了一系列的getXyz()和putXyz()方法,我們可以使用它們來操作緩衝區中的資料:
// Put a string into the buffer starting at index 0.
int length = buffer.putStringWithoutLengthUtf8(0, message);
// Read a string of the given length from the buffer starting from the given offset.
String message = buffer.getStringWithoutLengthUtf8(offset, length);
請注意,我們需要自己管理緩衝區中的偏移量。每當我們將資料放入緩衝區時,它都會傳回寫入資料的長度,以便我們可以計算下一個偏移量。當我們從緩衝區讀取時,我們需要知道長度是多少。
5.2.頻道和串流
使用 Aeron 發送和接收資料是使用透過特定通道傳輸的已識別流來完成的。
我們將通道指定為特定格式的 URI,告訴 Aeron 如何傳輸訊息。然後,我們的媒體驅動程式使用它與我們的傳輸媒體進行交互,確保它正確地發送和接收訊息。流被簡單地標識為數字。唯一的要求是同一通訊的兩端使用相同的流ID。
最簡單的此類通道是aeron:ipc,它使用媒體驅動程式內的共享記憶體進行傳輸和接收。請注意,只有當雙方使用相同的媒體驅動程式且不允許連網時,這才有效。
更有用的是,我們可以使用aeron:udp來使用 UDP 發送和接收。這使我們能夠在任何可以連接的地方與任何其他應用程式進行通訊。特別是,我們的應用程式將與媒體驅動程式通信,然後媒體驅動程式將相互通信:
當指定UDP通道時,我們至少需要包含主機和連接埠。在接收端,這是我們監聽的地方,在發送端,這是我們發送訊息的地方。例如, aeron:udp?endpoint=localhost:20121將透過 UDP 在localhost:20121上發送和接收訊息。
5.3.訂閱
一旦我們的媒體驅動程式和 Aeron 客戶端設定完畢,我們就可以接收訊息了。我們透過在特定通道上建立對特定流的訂閱,然後輪詢訊息來實現此目的。
新增訂閱足以讓媒體驅動程式設定一切以便能夠接收我們的訊息。我們使用Aeron實例上的addSubscription()方法來執行此操作:
Subscription subscription = aeron.addSubscription("aeron:udp?endpoint=localhost:20121", 1001);
和以前一樣,當我們不再使用它時,我們需要關閉它,以便媒體驅動程式知道停止偵聽訊息。像往常一樣,這是AutoCloseable,因此我們可以使用 try-with-resources 來管理它。
當我們訂閱後,我們需要接收訊息。 Aeron 透過輪詢機制執行此操作,讓我們完全控制它何時處理訊息。要輪詢訊息,我們需要提供一個FragmentHandler來處理收到的訊息。如果我們想要內聯所有程式碼,我們可以使用 lambda 來實作它;如果我們想要重複使用它,我們可以將其作為實作介面的單獨類別來實作:
FragmentHandler fragmentHandler = (buffer, offset, length, header) -> {
String data = buffer.getStringWithoutLengthUtf8(offset, length);
System.out.printf("Message from session %d (%d@%d) <<%s>>%n",
header.sessionId(), length, offset, data);
};
Aeron 使用緩衝區、資料開始的偏移量以及接收到的資料的長度來呼叫此函數。然後我們可以根據應用程式的需要來處理這個緩衝區。
當我們準備好輪詢新訊息時,我們使用Subscription.poll()方法:
int fragmentsRead = subscription.poll(fragmentHandler, 10);
在這裡,我們提供了FragmentHandler實例以及嘗試接收單一訊息時要考慮的訊息片段的數量。請注意,我們一次最多會收到一條訊息,即使媒體驅動程式中有許多訊息可用。但是,如果沒有可用的訊息,則會立即返回,並且如果收到的訊息太大,我們可能只會收到其中的一部分。
5.4.刊物
我們訊息傳遞的另一面是發送訊息。我們使用Publication來實現此目的,它可以將訊息發送到特定通道上的特定流。
我們可以使用Aeron.addPublication()方法新增出版品。然後我們需要等待它連接,這要求接收端有一個訂閱準備好接收訊息:
ConcurrentPublication publication = aeron.addPublication("aeron:udp?endpoint=localhost:20121", 1001);
while (!publication.isConnected()) {
TimeUnit.MILLISECONDS.sleep(100);
}
如果沒有連接,它將立即無法發送訊息,而不是等待某人添加訂閱。
和以前一樣,當我們不再使用它時,我們需要關閉它,以便媒體驅動程式可以釋放任何分配的資源。像往常一樣,這是AutoCloseable,因此我們可以使用 try-with-resources 來管理它。
一旦我們有了關聯的出版物,我們就可以向它提供訊息。這些始終作為填充的緩衝區提供,然後將其發送到連接的訂閱者:
UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(256, 64));
buffer.putStringWithoutLengthUtf8(0, message);
long result = publication.offer(buffer, 0, message.length());
如果訊息已傳送,我們將傳回一個指示傳輸位元組數的值,如果緩衝區太大,該值可能小於我們預期發送的位元組數。或者,它可能會向我們返回一組錯誤代碼中的一個,所有這些錯誤代碼都是負數,因此很容易與成功案例區分開來:
-
Publication.NOT_CONNECTED– 發布未連接到訂閱者。 -
Publication.BACK_PRESSURED– 來自訂閱者的背壓意味著我們現在無法發送更多訊息。 -
Publication.ADMIN_ACTION– 某些管理操作(例如日誌輪替)導致傳送失敗。在這種情況下,立即重試通常是安全的。 -
Publication.CLOSED–Publication實例已關閉。 -
Publication.MAX_POSITION_EXCEEDED– 媒體驅動程式內的緩衝區已滿。通常,我們可以透過關閉Publication並建立新出版物來解決此問題。
六,結論
我們已經了解了 Aeron、如何設定它以及如何使用它在應用程式之間進行訊息傳遞。這個庫可以做更多的事情,所以為什麼不嘗試看看呢?
所有範例都可以在 GitHub 上取得。