哦哇資訊網

Java併發程式設計的藝術——Phaser的執行機制以及原始碼剖析

由 一個即將退役的碼農 發表于 美食2023-02-07

帶著BAT大廠的面試問題去理解Phaser工具

請帶著這些問題繼續後文,會很大程度上幫助你更好的理解Phaser工具。

Phaser主要用來解決什麼問題?

Phaser與CyclicBarrier和CountDownLatch的區別是什麼?

如果用CountDownLatch來實現Phaser的功能應該怎麼實現?

Phaser執行機制是什麼樣的?

給一個Phaser使用的示例?

Phaser執行機制

Registration(註冊)

跟其他barrier不同,在phaser上註冊的parties會隨著時間的變化而變化。任務可以隨時註冊(使用方法register,bulkRegister註冊,或者由構造器確定初始parties),並且在任何抵達點可以隨意地撤銷註冊(方法arriveAndDeregister)。就像大多數基本的同步結構一樣,註冊和撤銷隻影響內部count;不會建立更深的內部記錄,所以任務不能查詢他們是否已經註冊。(不過,可以透過繼承來實現類似的記錄)

Synchronization(同步機制)

和CyclicBarrier一樣,Phaser也可以重複await。方法arriveAndAwaitAdvance的效果類似CyclicBarrier。await。phaser的每一代都有一個相關的phase number,初始值為0,當所有註冊的任務都到達phaser時phase+1,到達最大值(Integer。MAX_VALUE)之後清零。使用phase number可以獨立控制 到達phaser 和 等待其他執行緒 的動作,透過下面兩種型別的方法:

Arrival(到達機制)

arrive和arriveAndDeregister方法記錄到達狀態。這些方法不會阻塞,但是會返回一個相關的arrival phase number;也就是說,phase number用來確定到達狀態。當所有任務都到達給定phase時,可以執行一個可選的函式,這個函式透過重寫onAdvance方法實現,通常可以用來控制終止狀態。重寫此方法類似於為CyclicBarrier提供一個barrierAction,但比它更靈活。

Waiting(等待機制)

awaitAdvance方法需要一個表示arrival phase number的引數,並且在phaser前進到與給定phase不同的phase時返回。和CyclicBarrier不同,即使等待執行緒已經被中斷,awaitAdvance方法也會一直等待。中斷狀態和超時時間同樣可用,但是當任務等待中斷或超時後未改變phaser的狀態時會遭遇異常。如果有必要,在方法forceTermination之後可以執行這些異常的相關的handler進行恢復操作,Phaser也可能被ForkJoinPool中的任務使用,這樣在其他任務阻塞等待一個phase時可以保證足夠的並行度來執行任務。

Termination(終止機制)

可以用isTerminated方法檢查phaser的終止狀態。在終止時,所有同步方法立刻返回一個負值。在終止時嘗試註冊也沒有效果。當呼叫onAdvance返回true時Termination被觸發。當deregistration操作使已註冊的parties變為0時,onAdvance的預設實現就會返回true。也可以重寫onAdvance方法來定義終止動作。forceTermination方法也可以釋放等待執行緒並且允許它們終止。

Tiering(分層結構)

Phaser支援分層結構(樹狀構造)來減少競爭。註冊了大量parties的Phaser可能會因為同步競爭消耗很高的成本, 因此可以設定一些子Phaser來共享一個通用的parent。這樣的話即使每個操作消耗了更多的開銷,但是會提高整體吞吐量。 在一個分層結構的phaser裡,子節點phaser的註冊和取消註冊都透過父節點管理。子節點phaser透過構造或方法register、bulkRegister進行首次註冊時,在其父節點上註冊。子節點phaser透過呼叫arriveAndDeregister進行最後一次取消註冊時,也在其父節點上取消註冊。

Monitoring(狀態監控)

由於同步方法可能只被已註冊的parties呼叫,所以phaser的當前狀態也可能被任何呼叫者監控。在任何時候,可以透過getRegisteredParties獲取parties數,其中getArrivedParties方法返回已經到達當前phase的parties數。當剩餘的parties(透過方法getUnarrivedParties獲取)到達時,phase進入下一代。這些方法返回的值可能只表示短暫的狀態,所以一般來說在同步結構裡並沒有啥卵用。

Phaser原始碼詳解

核心引數

private volatile long state;/** * The parent of this phaser, or null if none */private final Phaser parent;/** * The root of phaser tree。 Equals this if not in a tree。 */private final Phaser root;//等待執行緒的棧頂元素,根據phase取模定義為一個奇數header和一個偶數headerprivate final AtomicReference evenQ;private final AtomicReference oddQ; @pdai: 程式碼已經複製到剪貼簿

state狀態說明:

Phaser使用一個long型state值來標識內部狀態:

低0-15位表示未到達parties數;

中16-31位表示等待的parties數;

中32-62位表示phase當前代;

高63位表示當前phaser的終止狀態。

注意: 子Phaser的phase在沒有被真正使用之前,允許滯後於它的root節點。這裡在後面原始碼分析的reconcileState方法裡會講解。 Qnode是Phaser定義的內部等待佇列,用於在阻塞時記錄等待執行緒及相關資訊。實現了ForkJoinPool的一個內部介面ManagedBlocker,上面已經說過,Phaser也可能被ForkJoinPool中的任務使用,這樣在其他任務阻塞等待一個phase時可以保證足夠的並行度來執行任務(透過內部實現方法isReleasable和block)。

函式列表

//構造方法public Phaser() { this(null, 0);}public Phaser(int parties) { this(null, parties);}public Phaser(Phaser parent) { this(parent, 0);}public Phaser(Phaser parent, int parties)//註冊一個新的partypublic int register()//批次註冊public int bulkRegister(int parties)//使當前執行緒到達phaser,不等待其他任務到達。返回arrival phase numberpublic int arrive() //使當前執行緒到達phaser並撤銷註冊,返回arrival phase numberpublic int arriveAndDeregister()/* * 使當前執行緒到達phaser並等待其他任務到達,等價於awaitAdvance(arrive())。 * 如果需要等待中斷或超時,可以使用awaitAdvance方法完成一個類似的構造。 * 如果需要在到達後取消註冊,可以使用awaitAdvance(arriveAndDeregister())。 */public int arriveAndAwaitAdvance()//等待給定phase數,返回下一個 arrival phase numberpublic int awaitAdvance(int phase)//阻塞等待,直到phase前進到下一代,返回下一代的phase numberpublic int awaitAdvance(int phase) //響應中斷版awaitAdvancepublic int awaitAdvanceInterruptibly(int phase) throws InterruptedExceptionpublic int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException//使當前phaser進入終止狀態,已註冊的parties不受影響,如果是分層結構,則終止所有phaserpublic void forceTermination()

方法 - register()

//註冊一個新的partypublic int register() { return doRegister(1);}private int doRegister(int registrations) { // adjustment to state long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this。parent; int phase; for (;;) { long s = (parent == null) ? state : reconcileState(); int counts = (int)s; int parties = counts >>> PARTIES_SHIFT;//獲取已註冊parties數 int unarrived = counts & UNARRIVED_MASK;//未到達數 if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); phase = (int)(s >>> PHASE_SHIFT);//獲取當前代 if (phase < 0) break; if (counts != EMPTY) { // not 1st registration if (parent == null || reconcileState() == s) { if (unarrived == 0) // wait out advance root。internalAwaitAdvance(phase, null);//等待其他任務到達 else if (UNSAFE。compareAndSwapLong(this, stateOffset, s, s + adjust))//更新註冊的parties數 break; } } else if (parent == null) { // 1st root registration long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE。compareAndSwapLong(this, stateOffset, s, next))//更新phase break; } else { //分層結構,子phaser首次註冊用父節點管理 synchronized (this) { // 1st sub registration if (state == s) { // recheck under lock phase = parent。doRegister(1);//分層結構,使用父節點註冊 if (phase < 0) break; // finish registration whenever parent registration // succeeded, even when racing with termination, // since these are part of the same “transaction”。 //由於在同一個事務裡,即使phaser已終止,也會完成註冊 while (!UNSAFE。compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) {//更新phase s = state; phase = (int)(root。state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } break; } } } } return phase;}

說明: register方法為phaser新增一個新的party,如果onAdvance正在執行,那麼這個方法會等待它執行結束再返回結果。如果當前phaser有父節點,並且當前phaser上沒有已註冊的party,那麼就會交給父節點註冊。

register和bulkRegister都由doRegister實現,大概流程如下:

如果當前操作不是首次註冊,那麼直接在當前phaser上更新註冊parties數

如果是首次註冊,並且當前phaser沒有父節點,說明是root節點註冊,直接更新phase

如果當前操作是首次註冊,並且當前phaser由父節點,則註冊操作交由父節點,並更新當前phaser的phase

上面說過,子Phaser的phase在沒有被真正使用之前,允許滯後於它的root節點。非首次註冊時,如果Phaser有父節點,則呼叫reconcileState()方法解決root節點的phase延遲傳遞問題, 原始碼如下:

private long reconcileState() { final Phaser root = this。root; long s = state; if (root != this) { int phase, p; // CAS to root phase with current parties, tripping unarrived while ((phase = (int)(root。state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) && !UNSAFE。compareAndSwapLong (this, stateOffset, s, s = (((long)phase << PHASE_SHIFT) | ((phase < 0) ? (s & COUNTS_MASK) : (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : ((s & PARTIES_MASK) | p)))))) s = state; } return s;}

當root節點的phase已經advance到下一代,但是子節點phaser還沒有,這種情況下它們必須透過更新未到達parties數 完成它們自己的advance操作(如果parties為0,重置為EMPTY狀態)。

回到register方法的第一步,如果當前未到達數為0,說明上一代phase正在進行到達操作,此時呼叫internalAwaitAdvance()方法等待其他任務完成到達操作,原始碼如下:

//阻塞等待phase到下一代private int internalAwaitAdvance(int phase, QNode node) { // assert root == this; releaseWaiters(phase-1); // ensure old queue clean boolean queued = false; // true when node is enqueued int lastUnarrived = 0; // to increase spins upon change int spins = SPINS_PER_ARRIVAL; long s; int p; while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { if (node == null) { // spinning in noninterruptible mode int unarrived = (int)s & UNARRIVED_MASK;//未到達數 if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; boolean interrupted = Thread。interrupted(); if (interrupted || ——spins < 0) { // need node to record intr //使用node記錄中斷狀態 node = new QNode(this, phase, false, false, 0L); node。wasInterrupted = interrupted; } } else if (node。isReleasable()) // done or aborted break; else if (!queued) { // push onto queue AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node。next = head。get(); if ((q == null || q。phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq queued = head。compareAndSet(q, node); } else { try { ForkJoinPool。managedBlock(node);//阻塞給定node } catch (InterruptedException ie) { node。wasInterrupted = true; } } } if (node != null) { if (node。thread != null) node。thread = null; // avoid need for unpark() if (node。wasInterrupted && !node。interruptible) Thread。currentThread()。interrupt(); if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) return abortWait(phase); // possibly clean up on abort } releaseWaiters(phase); return p;}

簡單介紹下第二個引數node,如果不為空,則說明等待執行緒需要追蹤中斷狀態或超時狀態。以doRegister中的呼叫為例,不考慮執行緒爭用,internalAwaitAdvance大概流程如下:

首先呼叫releaseWaiters喚醒上一代所有等待執行緒,確保舊佇列中沒有遺留的等待執行緒。

迴圈SPINS_PER_ARRIVAL指定的次數或者當前執行緒被中斷,建立node記錄等待執行緒及相關資訊。

繼續迴圈呼叫ForkJoinPool。managedBlock執行被阻塞的任務

繼續迴圈,阻塞任務執行成功被釋放,跳出迴圈

最後喚醒當前phase的執行緒

方法 - arrive()

//使當前執行緒到達phaser,不等待其他任務到達。返回arrival phase numberpublic int arrive() { return doArrive(ONE_ARRIVAL);}private int doArrive(int adjust) { final Phaser root = this。root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; //獲取未到達數 int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); if (UNSAFE。compareAndSwapLong(this, stateOffset, s, s-=adjust)) {//更新state if (unarrived == 1) {//當前為最後一個未到達的任務 long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { if (onAdvance(phase, nextUnarrived))//檢查是否需要終止phaser n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; UNSAFE。compareAndSwapLong(this, stateOffset, s, n); releaseWaiters(phase);//釋放等待phase的執行緒 } //分層結構,使用父節點管理arrive else if (nextUnarrived == 0) { //propagate deregistration phase = parent。doArrive(ONE_DEREGISTER); UNSAFE。compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else phase = parent。doArrive(ONE_ARRIVAL); } return phase; } }}

說明: arrive方法手動調整到達數,使當前執行緒到達phaser。arrive和arriveAndDeregister都呼叫了doArrive實現,大概流程如下:

首先更新state(state - adjust);

如果當前不是最後一個未到達的任務,直接返回phase

如果當前是最後一個未到達的任務: 如果當前是root節點,判斷是否需要終止phaser,CAS更新phase,最後釋放等待的執行緒; 如果是分層結構,並且已經沒有下一代未到達的parties,則交由父節點處理doArrive邏輯,然後更新state為EMPTY。

方法 - arriveAndAwaitAdvance()

public int arriveAndAwaitAdvance() { // Specialization of doArrive+awaitAdvance eliminating some reads/paths final Phaser root = this。root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);//獲取未到達數 if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); if (UNSAFE。compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) {//更新state if (unarrived > 1) return root。internalAwaitAdvance(phase, null);//阻塞等待其他任務 if (root != this) return parent。arriveAndAwaitAdvance();//子Phaser交給父節點處理 long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (onAdvance(phase, nextUnarrived))//全部到達,檢查是否可銷燬 n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE;//計算下一代phase n |= (long)nextPhase << PHASE_SHIFT; if (!UNSAFE。compareAndSwapLong(this, stateOffset, s, n))//更新state return (int)(state >>> PHASE_SHIFT); // terminated releaseWaiters(phase);//釋放等待phase的執行緒 return nextPhase; } }}

說明: 使當前執行緒到達phaser並等待其他任務到達,等價於awaitAdvance(arrive())。如果需要等待中斷或超時,可以使用awaitAdvance方法完成一個類似的構造。如果需要在到達後取消註冊,可以使用awaitAdvance(arriveAndDeregister())。效果類似於CyclicBarrier。await。大概流程如下:

更新state(state - 1);

如果未到達數大於1,呼叫internalAwaitAdvance阻塞等待其他任務到達,返回當前phase

如果為分層結構,則交由父節點處理arriveAndAwaitAdvance邏輯

如果未到達數<=1,判斷phaser終止狀態,CAS更新phase到下一代,最後釋放等待當前phase的執行緒,並返回下一代phase。

方法 - awaitAdvance(int phase)

public int awaitAdvance(int phase) { final Phaser root = this。root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) return root。internalAwaitAdvance(phase, null); return p;}//響應中斷版awaitAdvancepublic int awaitAdvanceInterruptibly(int phase) throws InterruptedException { final Phaser root = this。root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) { QNode node = new QNode(this, phase, true, false, 0L); p = root。internalAwaitAdvance(phase, node); if (node。wasInterrupted) throw new InterruptedException(); } return p;}

說明: awaitAdvance用於阻塞等待執行緒到達,直到phase前進到下一代,返回下一代的phase number。方法很簡單,不多贅述。awaitAdvanceInterruptibly方法是響應中斷版的awaitAdvance,不同之處在於,呼叫阻塞時會記錄執行緒的中斷狀態。

TAG: phaseintPhaserroot