更新時(shí)間:2025-05-09 09:13:12作者:佚名
翻譯:費(fèi)隆
協(xié)議:CC BY-NC-SA 4.0
歡迎來到我的Java8并發(fā)教程的第二部分。本指南將教您如何在Java 8中使用簡單易懂的代碼示例在Java 8中進(jìn)行編程。這是一系列教程的第二部分。在接下來的15分鐘內(nèi),您將學(xué)習(xí)如何通過同步關(guān)鍵字,鎖和信號(hào)量同步訪問共享變量。
本文中顯示的中心概念也適用于Java的較舊版本,但是代碼示例適用于Java 8permit是什么意思?怎么讀,并嚴(yán)重依賴Lambda表達(dá)式和新的并發(fā)功能。如果您還不熟悉Lambda,我建議您先閱讀我的Java 8教程。
為簡單起見,本教程的代碼示例使用此處定義的兩個(gè)輔助功能睡眠(秒)和停止(執(zhí)行程序)。
同步
在上一章中,我們學(xué)會(huì)了如何通過執(zhí)行器服務(wù)同時(shí)執(zhí)行代碼。當(dāng)我們編寫這種多線程代碼時(shí),我們需要特別注意同時(shí)訪問共享變量。假設(shè)我們打算增加一個(gè)可以通過多個(gè)線程同時(shí)訪問的整數(shù)。
我們使用rezement()方法定義計(jì)數(shù)字段以添加計(jì)數(shù):
int count = 0;
void increment() {
? ?count = count + 1;
}
當(dāng)多個(gè)線程同時(shí)調(diào)用此方法時(shí),我們將遇到大麻煩:
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
? ?.forEach(i -> executor.submit(this::increment));
stop(executor);
System.out.println(count); ?// 9965
我們看不到計(jì)數(shù)為10000的結(jié)果,并且每次執(zhí)行上述代碼的實(shí)際結(jié)果都不同。原因是我們?cè)诓煌木€程上共享可變變量,并且沒有用于可變?cè)L問的同步機(jī)制,從而創(chuàng)造了種族條件。
添加一個(gè)值需要三個(gè)步驟:(1)讀取當(dāng)前值,(2)將此值添加到一個(gè),(3)將新值寫入變量。如果兩個(gè)線程同時(shí)執(zhí)行,則有可能同時(shí)執(zhí)行兩個(gè)線程,并且將讀取相同的當(dāng)前值。這將導(dǎo)致寫作無效,因此實(shí)際結(jié)果將很小。在上面的示例中,異步對(duì)計(jì)數(shù)的并發(fā)訪問丟失了35個(gè)增量操作,但是在自己執(zhí)行代碼時(shí),您會(huì)看到不同的結(jié)果。
幸運(yùn)的是,Java很久以前就支持了與同步關(guān)鍵字的線程同步。在增加計(jì)數(shù)時(shí),我們可以使用同步固定上述比賽條件。
synchronized void incrementSync() {
? ?count = count + 1;
}
當(dāng)我們同時(shí)調(diào)用regrementSync()時(shí),我們獲得了10000的預(yù)期結(jié)果。不再次出現(xiàn)比賽條件,并且在每個(gè)代碼執(zhí)行中的結(jié)果穩(wěn)定:
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10000)
? ?.forEach(i -> executor.submit(this::incrementSync));
stop(executor);
System.out.println(count); ?// 10000
同步關(guān)鍵字也可以在語句塊中使用:
void incrementSync() {
? ?synchronized (this) {
? ? ? ?count = count + 1;
? ?}
}
Java在內(nèi)部使用所謂的“監(jiān)視器”(也稱為顯示器鎖或固有鎖)來管理同步。監(jiān)視器綁定到對(duì)象,例如,當(dāng)使用同步方法時(shí),每個(gè)方法都會(huì)為相應(yīng)的對(duì)象共享同一監(jiān)視器。
所有隱式監(jiān)視器都實(shí)現(xiàn)了重進(jìn)入功能。重新進(jìn)入意味著鎖定與當(dāng)前線綁定。線程可以安全地獲取相同的鎖多次,而無需創(chuàng)建僵局(例如,同步方法調(diào)用同一對(duì)象的另一種同步方法)。
鎖
并發(fā)API支持各種顯式鎖,由鎖定接口指定這些鎖以替換同步隱式鎖。鎖支持多種細(xì)粒度控制方法,因此它們比隱式監(jiān)視器具有更多的開銷。
標(biāo)準(zhǔn)JDK中提供了一些鎖的實(shí)現(xiàn),并在以下各章中顯示。
重新進(jìn)入
重新輸入鎖類是一種靜音類,其行為與通過同步但功能擴(kuò)展的隱式監(jiān)視器相同。就像其名稱一樣,此鎖會(huì)像隱式監(jiān)視器一樣實(shí)現(xiàn)重新進(jìn)入功能。
使用Reentrantlock后,讓我們看一下上面的示例。
ReentrantLock lock = new ReentrantLock();
int count = 0;
void increment() {
? ?lock.lock();
? ?try {
? ? ? ?count++;
? ?} finally {
? ? ? ?lock.unlock();
? ?}
}
可以通過鎖()獲得鎖,并通過unlock()釋放。將您的代碼包裹在一個(gè)嘗試的障礙物中以確保在特殊情況下解鎖,這一點(diǎn)非常重要。此方法是線程安全的,就像同步復(fù)制品一樣。如果另一個(gè)線程已經(jīng)收到鎖,則再次調(diào)用鎖()將阻止當(dāng)前線程,直到鎖定鎖定為止。只有一個(gè)線程可以在任何給定時(shí)間內(nèi)獲取鎖。
鎖定到顆??丶С侄喾N方法,如以下示例:
executor.submit(() -> {
? ?lock.lock();
? ?try {
? ? ? ?sleep(1);
? ?} finally {
? ? ? ?lock.unlock();
? ?}
});
executor.submit(() -> {
? ?System.out.println("Locked: " + lock.isLocked());
? ?System.out.println("Held by me: " + lock.isHeldByCurrentThread());
? ?boolean locked = lock.tryLock();
? ?System.out.println("Lock acquired: " + locked);
});
stop(executor);
第一個(gè)任務(wù)獲得鎖后的一秒鐘,第二個(gè)任務(wù)獲取了有關(guān)鎖當(dāng)前狀態(tài)的不同信息。
Locked: true
Held by me: false
Lock acquired: false
Trylock()方法是鎖定()方法的替代方法,該方法試圖在不阻止當(dāng)前線程的情況下固定鎖定。在訪問任何共享的可突變變量之前,必須使用布爾結(jié)果來檢查是否已獲取鎖定。
ReadWritelock
ReadWritelock接口指定了另一種類型的鎖定,包括一對(duì)鎖定鎖,用于讀寫訪問。讀寫鎖的想法是,只要沒有線程編寫變量,同時(shí)讀取可變變量通常是安全的。因此,只要沒有螺紋固定寫鎖定,就可以同時(shí)由多個(gè)線程保存讀取鎖。這可以改善性能和吞吐量,因?yàn)樽x取比寫作更頻繁。
ExecutorService executor = Executors.newFixedThreadPool(2);
Map map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();
executor.submit(() -> {
? ?lock.writeLock().lock();
? ?try {
? ? ? ?sleep(1);
? ? ? ?map.put("foo", "bar");
? ?} finally {
? ? ? ?lock.writeLock().unlock();
? ?}
});
暫停一秒鐘后,上面的示例首先獲取寫鎖以在地圖上添加新值。在完成此任務(wù)之前,啟動(dòng)了另外兩個(gè)任務(wù),試圖閱讀地圖中的元素并暫停一秒鐘:
Runnable readTask = () -> {
? ?lock.readLock().lock();
? ?try {
? ? ? ?System.out.println(map.get("foo"));
? ? ? ?sleep(1);
? ?} finally {
? ? ? ?lock.readLock().unlock();
? ?}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
執(zhí)行此代碼示例時(shí)permit是什么意思?怎么讀,您會(huì)注意到兩個(gè)讀取任務(wù)需要等待寫任務(wù)完成。寫入鎖定后,將同時(shí)執(zhí)行兩個(gè)讀取任務(wù),并同時(shí)打印結(jié)果。他們不需要等待彼此完成,因?yàn)橹灰獩]有其他線程獲得寫鎖,就可以同步獲得讀取鎖。
Stampedlock
Java 8帶有一個(gè)名為Stampedlock的新鎖,它也支持讀寫鎖,就像上面的示例一樣。與ReadWritelock不同,Stampedlock的鎖定方法返回表示為長的標(biāo)記。您可以使用這些標(biāo)記釋放鎖定,或檢查鎖是否有效。此外,Stampedlock支持另一種稱為樂觀鎖定的模式。
讓我們使用Stampedlock而不是ReadWritelock重寫上面的示例:
ExecutorService executor = Executors.newFixedThreadPool(2);
Map map = new HashMap<>();
StampedLock lock = new StampedLock();
executor.submit(() -> {
? ?long stamp = lock.writeLock();
? ?try {
? ? ? ?sleep(1);
? ? ? ?map.put("foo", "bar");
? ?} finally {
? ? ? ?lock.unlockWrite(stamp);
? ?}
});
Runnable readTask = () -> {
? ?long stamp = lock.readLock();
? ?try {
? ? ? ?System.out.println(map.get("foo"));
? ? ? ?sleep(1);
? ?} finally {
? ? ? ?lock.unlockRead(stamp);
? ?}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
通過ReadLock()或Writelock()獲取讀取鎖或?qū)戞i定的標(biāo)簽,該標(biāo)簽可在以后在最后塊中解鎖。請(qǐng)記住,Stampedlock不會(huì)實(shí)現(xiàn)重新進(jìn)入功能。每個(gè)鎖定的呼叫都會(huì)返回一個(gè)新標(biāo)簽,并在沒有可用鎖定時(shí)將其阻止,即使同一線程已經(jīng)接管了鎖。因此,您需要額外的注意不要僵局。
像以前的ReadWritelock示例一樣,兩個(gè)讀取任務(wù)都需要等待發(fā)布寫鎖。然后,兩個(gè)讀取任務(wù)同時(shí)將信息打印到控制臺(tái)網(wǎng)校頭條,因?yàn)橹灰獩]有線程獲得寫鎖,多個(gè)讀取操作就不會(huì)互相阻止。
以下示例顯示了樂觀的鎖:
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
? ?long stamp = lock.tryOptimisticRead();
? ?try {
? ? ? ?System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
? ? ? ?sleep(1);
? ? ? ?System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
? ? ? ?sleep(2);
? ? ? ?System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
? ?} finally {
? ? ? ?lock.unlock(stamp);
? ?}
});
executor.submit(() -> {
? ?long stamp = lock.writeLock();
? ?try {
? ? ? ?System.out.println("Write Lock acquired");
? ? ? ?sleep(2);
? ?} finally {
? ? ? ?lock.unlock(stamp);
? ? ? ?System.out.println("Write done");
? ?}
});
stop(executor);
通過調(diào)用TryOptimisticRead()獲得樂觀的讀取鎖,該鎖總是在不阻止當(dāng)前線程的情況下返回標(biāo)簽,而不管鎖定是否實(shí)際可用。如果已收到寫鎖,則返回的標(biāo)記等于0。您需要始終檢查標(biāo)記是否有效。
執(zhí)行上述代碼將產(chǎn)生以下輸出:
Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false
樂觀的鎖僅在獲得鎖后是有效的。與普通讀取鎖不同,樂觀的鎖不會(huì)阻止其他線程同時(shí)獲得寫鎖。在第一個(gè)線程暫停一秒鐘后,第二個(gè)線程在不等待釋放樂觀的讀鎖的情況下獲取寫鎖。目前,樂觀的讀鎖不再有效。即使釋放寫鎖,樂觀的讀取鎖仍然處于無效狀態(tài)。
因此,使用樂觀的鎖時(shí),您需要在訪問任何共享變量后每次檢查鎖定,以確保讀取鎖定仍然有效。
有時(shí),將讀取鎖轉(zhuǎn)換為寫入鎖定是非常實(shí)用的,而無需重新解鎖和鎖定。為此目的,StampedLock提供了TryConvertTowritelock()方法,如以下目的:
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
? ?long stamp = lock.readLock();
? ?try {
? ? ? ?if (count == 0) {
? ? ? ? ? ?stamp = lock.tryConvertToWriteLock(stamp);
? ? ? ? ? ?if (stamp == 0L) {
? ? ? ? ? ? ? ?System.out.println("Could not convert to write lock");
? ? ? ? ? ? ? ?stamp = lock.writeLock();
? ? ? ? ? ?}
? ? ? ? ? ?count = 23;
? ? ? ?}
? ? ? ?System.out.println(count);
? ?} finally {
? ? ? ?lock.unlock(stamp);
? ?}
});
stop(executor);
第一個(gè)任務(wù)將獲取讀取鎖,并將計(jì)數(shù)字段的當(dāng)前值打印到控制臺(tái)。但是,如果當(dāng)前值為零,我們希望將其分配給23。我們首先需要將讀取鎖轉(zhuǎn)換為寫入鎖定,以避免從其他線程中破壞潛在的并發(fā)訪問。對(duì)TryConvertTowriteLock()的調(diào)用不會(huì)阻止,但可能會(huì)返回零標(biāo)記,表明當(dāng)前沒有寫鎖。在這種情況下,我們調(diào)用Writelock()阻止當(dāng)前線程,直到有一個(gè)可用的寫鎖。
信號(hào)
除鎖外,并發(fā)API還支持計(jì)數(shù)信號(hào)量。但是,鎖通常用于互斥變量或資源的互斥訪問,信號(hào)量可以維護(hù)整體訪問權(quán)限。這在某些不同的情況下非常有用,例如,當(dāng)您需要限制程序的一部分并發(fā)訪問總數(shù)時(shí)。
這是一個(gè)示例,演示了如何限制通過睡眠模擬的長期運(yùn)行任務(wù)的訪問(5):
ExecutorService executor = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(5);
Runnable longRunningTask = () -> {
? ?boolean permit = false;
? ?try {
? ? ? ?permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
? ? ? ?if (permit) {
? ? ? ? ? ?System.out.println("Semaphore acquired");
? ? ? ? ? ?sleep(5);
? ? ? ?} else {
? ? ? ? ? ?System.out.println("Could not acquire semaphore");
? ? ? ?}
? ?} catch (InterruptedException e) {
? ? ? ?throw new IllegalStateException(e);
? ?} finally {
? ? ? ?if (permit) {
? ? ? ? ? ?semaphore.release();
? ? ? ?}
? ?}
}
IntStream.range(0, 10)
? ?.forEach(i -> executor.submit(longRunningTask));
stop(executor);
執(zhí)行人可以同時(shí)運(yùn)行10個(gè)任務(wù),但是我們使用Size 5的信號(hào)量,因此我們將同時(shí)訪問5限制為5。在特殊情況下,使用try-Finally代碼塊合理地釋放信號(hào)量很重要。
執(zhí)行上述代碼會(huì)產(chǎn)生以下結(jié)果:
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
信號(hào)量限制了對(duì)長達(dá)5個(gè)線程模擬(5)模擬的長期運(yùn)行任務(wù)的訪問。隨后的每個(gè)TryAcquire()調(diào)用將打印結(jié)果,該結(jié)果無法在等待一秒鐘的等待時(shí)間之后獲得控制臺(tái)的信號(hào)量。
這是我系列并發(fā)教程的第二部分。將來會(huì)發(fā)布更多零件,因此請(qǐng)等待。和以前一樣,您可以在GitHub上找到此文檔的所有示例代碼,因此請(qǐng)隨時(shí)訂購此存儲(chǔ)庫并自己嘗試。