在 JVM 之間共享記憶體
一、簡介
在本教程中,我們將展示如何在同一台電腦上運行的兩個或多個 JVM 之間共享記憶體。此功能可以實現非常快速的進程間通信,因為我們可以在沒有任何 I/O 操作的情況下移動資料塊。
2. 共享記憶體如何運作?
在任何現代作業系統中運行的進程都會獲得所謂的虛擬記憶體空間。我們稱之為virtual
,因為雖然它看起來像一個大的、連續的、私有的可尋址記憶體空間,但實際上,它是由分佈在整個實體 RAM 上的pages
組成的。在這裡, page
只是作業系統術語,指的是連續記憶體區塊,其大小範圍取決於所使用的特定 CPU 架構。對於 x86-84,頁面可以小至 4KB,也可以大至 1 GB。
在給定時間,只有一小部分虛擬空間實際上映射到實體頁。隨著時間的推移,進程開始為其任務消耗更多內存,作業系統開始分配更多物理頁並將它們映射到虛擬空間。當記憶體需求超過實體可用記憶體時,作業系統將開始將當時未使用的頁面交換到輔助存儲,以為請求騰出空間。
共享記憶體區塊的行為就像常規記憶體一樣,但是與常規記憶體相反,它不是單一進程私有的。當進程更改此區塊內任何位元組的內容時,有權存取相同共享記憶體的任何其他進程都會立即「看到」此變更。
這是共享記憶體的常見用途清單:
- 調試器(有沒有想過調試器如何檢查另一個進程中的變數?)
- 進程間通訊
- 進程之間共享唯讀內容(例如:動態庫程式碼)
- 各種各樣的黑客;^)
3. 共享記憶體和記憶體映射文件
顧名思義,記憶體映射文件是一種常規文件,其內容直接映射到進程虛擬記憶體中的連續區域。這意味著我們可以讀取和/或更改其內容,而無需明確使用 I/O 操作。作業系統將偵測對映射區域的任何寫入,並安排後台 I/O 操作來保存修改的資料。
由於無法保證此後台操作何時發生,因此作業系統還提供系統呼叫來刷新任何掛起的變更。這對於資料庫重做日誌等用例很重要,但對於我們的進程間通訊(簡稱 IPC)場景來說不需要。
記憶體映射檔案通常被資料庫伺服器用來實現高吞吐量的 I/O 操作,但我們也可以使用它們來引導基於共享記憶體的 IPC 機制。基本想法是,所有需要共享資料的進程都映射同一個文件,瞧,它們現在擁有一個共享記憶體區域。
4. 在 Java 中建立記憶體映射文件
在 Java 中,我們使用FileChannel
的map()
方法將檔案的某個區域映射到記憶體中,該方法傳回一個MappedByteBuffer
,讓我們可以存取其內容:
MappedByteBuffer createSharedMemory(String path, long size) {
try (FileChannel fc = (FileChannel)Files.newByteChannel(new File(path).toPath(),
EnumSet.of(
StandardOpenOption.CREATE,
StandardOpenOption.SPARSE,
StandardOpenOption.WRITE,
StandardOpenOption.READ))) {
return fc.map(FileChannel.MapMode.READ_WRITE, 0, size);
}
catch( IOException ioe) {
throw new RuntimeException(ioe);
}
}
這裡SPARSE
選項的使用非常相關。只要底層作業系統和檔案系統支持,我們就可以映射相當大的記憶體區域,而無需實際消耗磁碟空間。
現在,讓我們建立一個簡單的演示應用程式。 Producer
應用程式將分配足夠大的共享記憶體來容納 64KB 資料加上 SHA1 雜湊值(20 個位元組)。接下來,它將啟動一個循環,用隨機資料填充緩衝區,然後是 SHA1 雜湊值。我們將連續重複此操作 30 秒,然後退出:
// ... SHA1 digest initialization omitted
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
Random rnd = new Random();
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
System.out.println("Starting producer iterations...");
while(System.currentTimeMillis() - start < 30000) {
for (int i = 0; i < capacity - hashLen; i++) {
byte value = (byte) (rnd.nextInt(256) & 0x00ff);
digest.update(value);
shm.put(i, value);
}
// Write hash at the end
byte[] hash = digest.digest();
shm.put(capacity - hashLen, hash);
iterations++;
}
System.out.printf("%d iterations run\n", iterations);
為了測試我們確實可以共享內存,我們還將創建一個Consumer
應用程序,它將讀取緩衝區的內容,計算其哈希值,並將其與Producer
生成的哈希值進行比較。我們將重複此過程 30 秒。在每次迭代時,還將計算緩衝區內容的雜湊值並將其與緩衝區末端的雜湊值進行比較:
// ... digest initialization omitted
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
System.out.println("Starting consumer iterations...");
long matchCount = 0;
long mismatchCount = 0;
byte[] expectedHash = new byte[hashLen];
while (System.currentTimeMillis() - start < 30000) {
for (int i = 0; i < capacity - 20; i++) {
byte value = shm.get(i);
digest.update(value);
}
byte[] hash = digest.digest();
shm.get(capacity - hashLen, expectedHash);
if (Arrays.equals(hash, expectedHash)) {
matchCount++;
} else {
mismatchCount++;
}
iterations++;
}
System.out.printf("%d iterations run. matches=%d, mismatches=%d\n", iterations, matchCount, mismatchCount);
為了測試我們的記憶體共享方案,讓我們同時啟動兩個程式。這是它們在 3Ghz、四核心 Intel I7 機器上運行時的輸出:
# Producer output
Starting producer iterations...
11722 iterations run
# Consumer output
Starting consumer iterations...
18893 iterations run. matches=11714, mismatches=7179
我們可以看到,在許多情況下,消費者發現預期的計算值是不同的。歡迎來到並發問題的奇妙世界!
5. 同步共享記憶體訪問
我們所看到的問題的根本原因是我們需要同步對共享記憶體緩衝區的存取。 Consumer
必須等待Producer
完成哈希寫入才能開始讀取資料。另一方面, Producer
也必須等待Consumer
消費完資料後才能再次寫入。
對於常規的多執行緒應用程式來說,解決這個問題沒什麼大不了的。標準庫提供了幾個同步原語,使我們能夠控制在給定時間誰可以寫入共享記憶體。
然而,我們的場景是多 JVM 場景,因此這些標準方法都不適用。那麼,我們該做什麼呢?嗯,簡短的回答是我們必須作弊。我們可以訴諸作業系統特定的機制,例如信號量,但這會阻礙我們應用程式的可移植性。此外,這意味著使用 JNI 或 JNA,這也會使事情變得複雜。
輸入Unsafe
。儘管它的名字有點可怕,但這個標準庫類別提供了我們實現簡單鎖定機制所需的功能: compareAndSwapInt()
方法。
該方法實現了一個帶有四個參數的原子測試和設定原語。儘管文件中沒有明確說明,但它不僅可以針對 Java 對象,還可以針對原始記憶體位址。對於後者,我們在第一個參數中傳遞null
,這使得它將offset
參數視為虛擬記憶體位址。
當我們呼叫這個方法時,它會先檢查目標位址處的值,並將其與expected
進行比較。如果相等,則將位置內容修改為新值並傳回true
表示成功。如果該位置的值與expected
不同,則不會發生任何情況,且該方法傳回false.
更重要的是,這種原子操作即使在多核心架構中也能保證工作,這是同步多個執行緒的關鍵功能。
讓我們建立一個SpinLock
類,利用此方法來實現(非常!)簡單的鎖定機制:
//... package and imports omitted
public class SpinLock {
private static final Unsafe unsafe;
// ... unsafe initialization omitted
private final long addr;
public SpinLock(long addr) {
this.addr = addr;
}
public boolean tryLock(long maxWait) {
long deadline = System.currentTimeMillis() + maxWait;
while (System.currentTimeMillis() < deadline ) {
if (unsafe.compareAndSwapInt(null, addr, 0, 1)) {
return true;
}
}
return false;
}
public void unlock() {
unsafe.putInt(addr, 0);
}
}
此實作缺乏關鍵功能,例如在釋放鎖之前檢查它是否擁有鎖,但它足以滿足我們的目的。
好的,那麼我們如何取得用於儲存鎖定狀態的記憶體位址呢?這必須是共享記憶體緩衝區內的位址,以便兩個進程都可以使用它,但MappedByteBuffer
類別不會公開實際的記憶體位址。
檢查map()
回傳的對象,我們可以看到它是一個DirectByteBuffer.
這個類別有一個名為address()
的公共方法,它回傳的正是我們想要的。不幸的是,這個類別是包私有的,所以我們不能使用簡單的轉換來存取這個方法。
為了繞過這個限制,我們將再次作弊並使用反射來呼叫此方法:
private static long getBufferAddress(MappedByteBuffer shm) {
try {
Class<?> cls = shm.getClass();
Method maddr = cls.getMethod("address");
maddr.setAccessible(true);
Long addr = (Long) maddr.invoke(shm);
if (addr == null) {
throw new RuntimeException("Unable to retrieve buffer's address");
}
return addr;
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
}
在這裡,我們使用setAccessible()
使address()
方法可透過Method
句柄呼叫。但是,請注意,從 Java 17 開始,除非我們明確使用運行時–add-opens
標誌,否則此技術將無法運作。
6. 為Producer
和Consumer
添加同步
現在我們有了鎖機制,我們先將它應用到Producer
上。出於本演示的目的,我們假設Producer
始終在Consumer.
我們需要這個,以便我們可以初始化緩衝區,清除其內容,包括我們將與SpinLock:
public static void main(String[] args) throws Exception {
// ... digest initialization omitted
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
// Cleanup lock area
shm.putInt(0, 0);
long addr = getBufferAddress(shm);
System.out.println("Starting producer iterations...");
long start = System.currentTimeMillis();
long iterations = 0;
Random rnd = new Random();
int capacity = shm.capacity();
SpinLock lock = new SpinLock(addr);
while(System.currentTimeMillis() - start < 30000) {
if (!lock.tryLock(5000)) {
throw new RuntimeException("Unable to acquire lock");
}
try {
// Skip the first 4 bytes, as they're used by the lock
for (int i = 4; i < capacity - hashLen; i++) {
byte value = (byte) (rnd.nextInt(256) & 0x00ff);
digest.update(value);
shm.put(i, value);
}
// Write hash at the end
byte[] hash = digest.digest();
shm.put(capacity - hashLen, hash);
iterations++;
}
finally {
lock.unlock();
}
}
System.out.printf("%d iterations run\n", iterations);
}
與非同步版本相比,只有細微的變化:
- 檢索與
MappedByteBufer
關聯的記憶體位址 - 使用此位址建立
SpinLock
實例。鎖定使用int
,因此它將佔用緩衝區的四個初始位元組 - 使用
SpinLock
實例來保護以隨機資料及其雜湊填充緩衝區的程式碼
現在,讓我們對Consumer
端套用類似的變更:
private static void main(String args[]) throws Exception {
// ... digest initialization omitted
MappedByteBuffer shm = createSharedMemory("some_path.dat", 64*1024 + 20);
long addr = getBufferAddress(shm);
System.out.println("Starting consumer iterations...");
Random rnd = new Random();
long start = System.currentTimeMillis();
long iterations = 0;
int capacity = shm.capacity();
long matchCount = 0;
long mismatchCount = 0;
byte[] expectedHash = new byte[hashLen];
SpinLock lock = new SpinLock(addr);
while (System.currentTimeMillis() - start < 30000) {
if (!lock.tryLock(5000)) {
throw new RuntimeException("Unable to acquire lock");
}
try {
for (int i = 4; i < capacity - hashLen; i++) {
byte value = shm.get(i);
digest.update(value);
}
byte[] hash = digest.digest();
shm.get(capacity - hashLen, expectedHash);
if (Arrays.equals(hash, expectedHash)) {
matchCount++;
} else {
mismatchCount++;
}
iterations++;
} finally {
lock.unlock();
}
}
System.out.printf("%d iterations run. matches=%d, mismatches=%d\n", iterations, matchCount, mismatchCount);
}
透過這些更改,我們現在可以運行兩側並將它們與先前的結果進行比較:
# Producer output
Starting producer iterations...
8543 iterations run
# Consumer output
Starting consumer iterations...
8607 iterations run. matches=8607, mismatches=0
正如預期的那樣,報告的迭代計數將低於非同步版本。主要原因是我們大部分時間都在持有鎖的代碼的臨界區內度過。無論哪個程式持有鎖都會阻止另一方執行任何操作。
如果我們比較第一個案例報告的平均迭代次數,它將與我們這次獲得的迭代總和大致相同。這說明鎖機製本身增加的開銷是最小的。
六,結論
在本教程中,我們探討如何在同一台電腦上執行的兩個 JVM 之間共用記憶體區域。我們可以使用此處介紹的技術作為高吞吐量、低延遲進程間通訊庫的基礎。
與往常一樣,所有程式碼都可以在 GitHub 上取得。