快捷搜索:  汽车  科技

程序并发时序图:并发控制工具相位器Phaser实现原理

程序并发时序图:并发控制工具相位器Phaser实现原理publicintarriveAndAwaitAdvance(){ for(;;){ longs=state; intphase=(int)(s>>>PHASE_SHIFT); intcounts=(int)s; intunarrived=(counts==EMPTY)?0:(counts&UNARRIVED_MASK); if(STATE.compareAndSet(this s s-=1)){ if(unarrived>1) returnthis.internalAwaitAdvance(phase); longn=s&PARTIES_MASK; intnextUnarrived=(int)n>>>PARTIES_SHIFT; if(onAdvance(phase nextUnarrived)) n|=TERMINATION_BIT; else

Phaser示意图

为方便理解Phaser的内部状态我们看下面的示意图,刚开始通过 new Phaser(3) 创建一个相位器对象,该对象有三个参与者。假如又调用了register()方法则相位器一共有4个参与者,这些参与者都处于未到达状态。然后某个线程先执行arriveAndAwaitAdvance方法后变为已到达状态,接着第二第三个参与者都变为已到达状态。假如第四个线程执行了arriveAndDeregister方法则第四个参与者到达后将进行反注册,此时只剩三个参与者。接着将进入下一阶段,相位器会自动调用onAdvance方法判断是否终止相位器,如果还未终止则进入下一阶段,此时三个参与者又处于未到达状态。

程序并发时序图:并发控制工具相位器Phaser实现原理(1)

示意图

共享状态的表示

相位器的底层实现需要对共享变量进行维护,并且需要使用硬件基本的CAS操作,相位器需要维护的共享变量包括终止状态、当前阶段数、参与者数量以及未到达数量等四个属性。这样就要求我们要保证相位器内部属性的一致性,我们当然可以定义四个变量来分别表示相位器的四个属性,但为了方便维护共享变量,可以使用一个long类型作为共享变量,这样就能很方便进行硬件基本的原子更新。所以现在的重点工作就是对64位的long类型进行划分,0-15位表示未到达参与者数量、16-31位表示参与者数量、32-62位表示当前阶段数、63位表示终止状态。

程序并发时序图:并发控制工具相位器Phaser实现原理(2)

状态表示

实现原理

通过前面的相位器相关概念的介绍以及三个例子,我们已经学会了相位器的使用,接下去我们往深一层看相位器是如何实现的。以下分析的代码出自JDK源码但却并非完全相同,作者去掉了大量的非核心(并发性能优化)代码,只保留了最核心的代码,这样能保证代码简洁,以便能使更好地理解实现原理。

我们先看Phaser类包含的一些属性以及构造方法。state变量即是我们前面讨论到的共享状态,它是一个long型,我们会使用不同的位范围来表示相位器的内部属性。而且通过VarHandle对象来管理共享状态state,通过它能方便地进行CAS操作。MAX_PHASE=Integer.MAX_VALUE表示允许的最大阶段数,PARTIES_SHIFT=16表示参与者数量偏移,PHASE_SHIFT=32表示当前阶段偏移,UNARRIVED_MASK=0xffff表示未到达掩码,PARTIES_MASK=0xffff0000L表示参与者掩码,TERMINATION_BIT=1L<<63 表示终止位,EMPTY=1表示相位器是空的。提供两种构造函数,可传入参与者数量,不带参数时默认参与数为0,初始时当前阶段为0,然后通过偏移将当前阶段和参与者数量存放到共享变量state中。

publicclassPhaser{ privatestaticfinalVarHandleSTATE; static{ try{ MethodHandles.Lookupl=MethodHandles.lookup(); STATE=l.findVarHandle(Phaser.class "state" long.class); }catch(ReflectiveOperationExceptione){ thrownewExceptionInInitializerError(e); } } privatevolatilelongstate; privatestaticfinalintMAX_PHASE=Integer.MAX_VALUE; privatestaticfinalintPARTIES_SHIFT=16; privatestaticfinalintPHASE_SHIFT=32; privatestaticfinalintUNARRIVED_MASK=0xffff; privatestaticfinallongPARTIES_MASK=0xffff0000L; privatestaticfinallongTERMINATION_BIT=1L<<63; privatestaticfinalintEMPTY=1; publicPhaser(){ this(0); } publicPhaser(intparties){ intphase=0; this.state=(parties==0)?(long)EMPTY :((long)phase<<PHASE_SHIFT)|((long)parties<<PARTIES_SHIFT) |((long)parties); } ... }

接着看相位器的register注册方法,核心是通过自旋来实现无锁更新state,需要按照不同位范围进行状态更新。其中counts是state的低32位,然后通过偏移和掩码能够得到参与者数量和未到达数量,而state右移32位则得到当前阶段数。然后通过 counts != EMPTY 来判断相位器是否为空,为空时则设置参与者数量为1,然后通过compareAndSet设置state变量,成功则退出自旋。不为空时则分两种情况:一种是如果未到达数量等于0则调用internalAwaitAdvance方法等待下一个阶段,其中Thread.onSpinWait()是JVM提供的等待方法。第二种情况则将参与者数量和未到达数量都加1,然后通过compareAndSet方法设置state变量。

publicintregister(){ longadjust=((long)1<<PARTIES_SHIFT)|1; intphase; for(;;){ intcounts=(int)state; intparties=counts>>>PARTIES_SHIFT; intunarrived=counts&UNARRIVED_MASK; phase=(int)(state>>>PHASE_SHIFT); if(counts!=EMPTY){ if(unarrived==0) this.internalAwaitAdvance(phase); elseif(STATE.compareAndSet(this state state adjust)) break; }else{ longnext=((long)phase<<PHASE_SHIFT)|adjust; if(STATE.compareAndSet(this state next)) break; } } returnphase; } privateintinternalAwaitAdvance(intphase){ intp; while((p=(int)(state>>>PHASE_SHIFT))==phase) Thread.onSpinWait(); returnp; }

继续看arriveAndAwaitAdvance方法,核心逻辑是通过自旋将未到达数量减一并等待其它参与者到来后一起进阶。通过偏移得到当前阶段数和未到达数,然后将未到达数减一并通过compareAndSet设置state。成功设置后判断如果未到达数量大于1则调用internalAwaitAdvance等待其它参与者一起进阶,然后调用onAdvance方法判断是否要终止相位器,记得前面我们的onAdvance例子吗?就是这里触发的。根据情况分别设置终止位和未到达状态,主要通过或运算。最后将当前阶段数进行加一,并尝试通过compareAndSet设置state,只有一个线程能够设置成功,最终返回值为新的当前阶段数。

publicintarriveAndAwaitAdvance(){ for(;;){ longs=state; intphase=(int)(s>>>PHASE_SHIFT); intcounts=(int)s; intunarrived=(counts==EMPTY)?0:(counts&UNARRIVED_MASK); if(STATE.compareAndSet(this s s-=1)){ if(unarrived>1) returnthis.internalAwaitAdvance(phase); longn=s&PARTIES_MASK; intnextUnarrived=(int)n>>>PARTIES_SHIFT; if(onAdvance(phase nextUnarrived)) n|=TERMINATION_BIT; elseif(nextUnarrived==0) n|=EMPTY; else n|=nextUnarrived; intnextPhase=(phase 1)&MAX_PHASE; n|=(long)nextPhase<<PHASE_SHIFT; if(!STATE.compareAndSet(this s n)) return(int)(state>>>PHASE_SHIFT); returnnextPhase; } } }

arriveAndDeregister方法核心逻辑是通过自旋将未到达数量和参与者数量都减一,注意此方法不进行阻塞等待。通过偏移即掩码得到当前阶段数、未到达数,然后将state的参与者数量和未到达数量分别减一,并通过compareAndSet设置state。如果未到达数量等于1,则说明该线程是最后一个到达的参与者,此时会调用onAdvance方法判断是否要终止相位器。最后通过或运算设置终止位、未到达数、当前阶段数,并通过compareAndSet设置state。

publicintarriveAndDeregister(){ for(;;){ longs=state; intphase=(int)(s>>>PHASE_SHIFT); if(phase<0) returnphase; intcounts=(int)s; intunarrived=(counts==EMPTY)?0:(counts&UNARRIVED_MASK); if(STATE.compareAndSet(this s s-=(1|1<<PARTIES_SHIFT))){ if(unarrived==1){ longn=s&PARTIES_MASK; intnextUnarrived=(int)n>>>PARTIES_SHIFT; if(onAdvance(phase nextUnarrived)) n|=TERMINATION_BIT; elseif(nextUnarrived==0) n|=EMPTY; else n|=nextUnarrived; intnextPhase=(phase 1)&MAX_PHASE; n|=(long)nextPhase<<PHASE_SHIFT; booleanresult=STATE.compareAndSet(this s n); } returnphase; } } }

最后再看onAdvance和isTerminated两个方法。onAdvance方法逻辑是注册的参与者数量是否为0,返回true表示需要终止相位器。isTerminated方法逻辑是直接判断state的值是否小于0,小于0则最高位为1,也就是long的符号位为1,即小于0。

protectedbooleanonAdvance(intphase intregisteredParties){ returnregisteredParties==0; } publicbooleanisTerminated(){ returnthis.state<0L; }

更多Java并发原理可关注作者下面的专栏:

作者简介:笔名seaboat,擅长人工智能、计算机科学、数学原理、基础算法。出版书籍:图解数据结构与算法、Tomcat内核设计剖析、图解Java并发原理、人工智能原理科普。

猜您喜欢: