做動(dòng)態(tài)網(wǎng)站有哪些技術(shù)路線seo網(wǎng)站推廣教程
為了更深入學(xué)習(xí)協(xié)程的底層實(shí)現(xiàn)原理,了解協(xié)程線程切換的根本本質(zhì)。也為了以后在工作中可以根據(jù)不同的需求場(chǎng)景,更加隨心所欲的使用不同的協(xié)程。
今天通過(guò) launch 跟蹤一下協(xié)程的執(zhí)行流程。
fun getData() {Trace.beginSection("getData");Log.e(TAG, "getData before " + Thread.currentThread().name)val demoScope: suspend CoroutineScope.() -> Unit = {Trace.beginSection("DispatchersIO");Log.e(TAG, "getData IO 1 " + Thread.currentThread().name)Thread.sleep(1000)Log.e(TAG, "getData IO 2 " + Thread.currentThread().name)Trace.endSection();}viewModelScope.launch(Dispatchers.IO, block = demoScope)
}
1. 流程圖
1.1 從 launch 源碼開(kāi)始
public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit
): Job {//1,先通過(guò)參數(shù)Context構(gòu)造一個(gè)新的CoroutineContextval newContext = newCoroutineContext(context)val coroutine = if (start.isLazy)LazyStandaloneCoroutine(newContext, block) elseStandaloneCoroutine(newContext, active = true)coroutine.start(start, coroutine, block)return coroutine
}
launch 方法有三個(gè)參數(shù)
-
context:常用的一般是 Dispatchers.Default,Dispatchers.Main,Dispatchers.Unconfined,Dispatchers.IO。
-
start:枚舉類型共四種:DEFAULT,LAZY,ATOMIC,UNDISPATCHED
-
block:就是 launch 執(zhí)行的協(xié)程體
1.2 我們來(lái)看 newCoroutineContext 方法
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {val combined = coroutineContext + context//1val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combinedreturn if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)debug + Dispatchers.Default else debug
}
剛開(kāi)始看到代碼 1 的+號(hào),頭都是蒙的,這是什么鬼?不是數(shù)字類型,為啥能加?
其實(shí)本質(zhì)就是調(diào)用了 CoroutineContext 的 plus,是操作符的重載
/*** Returns a context containing elements from this context and elements from other [context].* The elements from this context with the same key as in the other one are dropped.*/
public operator fun plus(context: CoroutineContext): CoroutineContext =if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creationcontext.fold(this) { acc, element ->//operation函數(shù)體。。。。。。。}
fold 函數(shù)比較難理解,我們先說(shuō)結(jié)論,就是把參數(shù) this 內(nèi)部與 context 的 key 一樣的 CoroutineContext 移除后,剩下的 CoroutineContext 與 context 組成新的 CoroutineContext 對(duì)象。下邊慢慢分析
CoroutineContext 的子類重寫 fold 函數(shù)的一共有三個(gè) EmptyCoroutineContext,CombinedContext,Element
-
上述代碼第 6 行已經(jīng)判斷過(guò) context 是 EmptyCoroutineContext。所以當(dāng)前的 context 不可能是 EmptyCoroutineContext。其 fold 方法直接返回 this。如下:
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
-
是 Element 時(shí)。acc 就是 fold 函數(shù)參數(shù)。element 就是 fold 函數(shù)調(diào)用者
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =operation(initial, this)
-
是 CombinedContext 比較復(fù)雜
internal class CombinedContext(private val left: CoroutineContext,private val element: Element
) : CoroutineContext, Serializable {public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =operation(left.fold(initial, operation), element)
}
要遞歸調(diào)用 fold 函數(shù),并重復(fù)調(diào)用 operation 函數(shù)。直到最后調(diào)用 Element,或者 EmptyCoroutineContext 的 fold 函數(shù)。
最終需要分析的都是 Element 的 fold 函數(shù)執(zhí)行情況
context.fold(this) { acc, element ->//acc就是fold函數(shù)參數(shù)。element就是fold函數(shù)調(diào)用者,當(dāng)前就是Dispatchers.IO//如果acc的key和element的key是相同,就返回新的EmptyCoroutineContext//否則就返回accval removed = acc.minusKey(element.key) if (removed === EmptyCoroutineContext) element else {// make sure interceptor is always last in the context (and thus is fast to get when present)//此時(shí)removed為acc的left,也就是SupervisorJob//獲得removed里key為ContinuationInterceptor.key的分發(fā)器。當(dāng)前為nullval interceptor = removed[ContinuationInterceptor]//合并removed和element。也就是SupervisorJob+Dispatchers.IOif (interceptor == null) CombinedContext(removed, element) else {val left = removed.minusKey(ContinuationInterceptor)if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) elseCombinedContext(CombinedContext(left, element), interceptor)}}
}
小結(jié)下:
newCoroutineContext 其實(shí)就是給自己傳遞的 context 添加一些附加技能。但是 key 相同的技能只包含一個(gè)
比如 ViewModel 中 viewModelScope 的 coroutineContext 的默認(rèn)值 SupervisorJob() + Dispatchers.Main.immediate。默認(rèn)主線程執(zhí)行,并保證如果其中的某個(gè)子協(xié)程出現(xiàn)異常,不會(huì)影響子協(xié)程
比如切換 dispatcher,當(dāng)前父協(xié)程 dispatcher 為 Dispatchers.Main.immediate,切換為 Dispatchers.IO
1.3 下面分析 StandaloneCoroutine 的 start 方法
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {initParentJob()start(block, receiver, this)
}
internal fun initParentJob() {//當(dāng)前的parentContext[job]就是SupervisorJobinitParentJobInternal(parentContext[Job])
}
/*** Initializes parent job.* It shall be invoked at most once after construction after all other initialization.*/
internal fun initParentJobInternal(parent: Job?) {assert { parentHandle == null }if (parent == null) {parentHandle = NonDisposableHandlereturn}//start保證parent狀態(tài)為isActiveparent.start() // make sure the parent is //...
}
CoroutineStart 的 start 就是如下的 invoke 函數(shù)
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =when (this) {DEFAULT -> block.startCoroutineCancellable(receiver, completion)ATOMIC -> block.startCoroutine(receiver, completion)UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)LAZY -> Unit // will start lazily}
通過(guò)這里可以大概猜測(cè)一下幾種 start 的區(qū)別。當(dāng)前我們只看 DEFAULT
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>,onCancellation: ((cause: Throwable) -> Unit)? = null
) =//runSafely就是添加了一個(gè)try catchrunSafely(completion) {createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)}
createCoroutineUnintercepted 在文件 kotlin.coroutines.intrinsics.intrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(receiver: R,completion: Continuation<T>
): Continuation<Unit> {val probeCompletion = probeCoroutineCreated(completion)//當(dāng)前對(duì)象是BaseContinuationImpl的子類return if (this is BaseContinuationImpl)//這個(gè)方法在哪?create(receiver, probeCompletion)else {createCoroutineFromSuspendFunction(probeCompletion) {(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)}}
}
create 方法在哪?需要反編譯代碼才能看的到
public final class MainViewModel extends ViewModel {public static final Companion Companion = new Companion(null);private static final String TAG = "MainViewModel";public final void getData() {Trace.beginSection("getData");StringBuilder stringBuilder = new StringBuilder();stringBuilder.append("getData before ");stringBuilder.append(Thread.currentThread().getName());Log.e("MainViewModel", stringBuilder.toString());MainViewModel$getData$eeeee$1 mainViewModel$getData$eeeee$1 = new MainViewModel$getData$eeeee$1(null);BuildersKt.launch$default(ViewModelKt.getViewModelScope(this), (CoroutineContext)Dispatchers.getIO(), null, mainViewModel$getData$eeeee$1, 2, null);}@Metadata(d1 = {"\000\022\n\002\030\002\n\002\020\000\n\002\b\002\n\002\020\016\n\000\b\003\030\0002\0020\001B\007\b\002¢\006\002\020\002R\016\020\003\032\0020\004XT¢\006\002\n\000¨\006\005"}, d2 = {"Lcom/haier/uhome/coroutine/ui/main/MainViewModel$Companion;", "", "()V", "TAG", "", "coroutine_debug"}, k = 1, mv = {1, 6, 0}, xi = 48)public static final class Companion {private Companion() {}}@Metadata(d1 = {"\000\n\n\000\n\002\020\002\n\002\030\002\020\000\032\0020\001*\0020\002H@"}, d2 = {"<anonymous>", "", "Lkotlinx/coroutines/CoroutineScope;"}, k = 3, mv = {1, 6, 0}, xi = 48)@DebugMetadata(c = "com.haier.uhome.coroutine.ui.main.MainViewModel$getData$eeeee$1", f = "MainViewModel.kt", i = {}, l = {}, m = "invokeSuspend", n = {}, s = {})static final class MainViewModel$getData$eeeee$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {int label;MainViewModel$getData$eeeee$1(Continuation<? super MainViewModel$getData$eeeee$1> param1Continuation) {super(2, param1Continuation);}public final Continuation<Unit> create(Object param1Object, Continuation<?> param1Continuation) {return (Continuation<Unit>)new MainViewModel$getData$eeeee$1((Continuation)param1Continuation);}//。。。。。。。}
}
可以看到我們的協(xié)程體其實(shí)是一個(gè)基礎(chǔ) SuspendLambda 的 class 對(duì)象。當(dāng)調(diào)用 create 時(shí),用參數(shù) probeCompletion 又構(gòu)造了一個(gè)新的協(xié)程體對(duì)象
SuspendLambda 的繼承關(guān)系如下:
SuspendLambda-->ContinuationImpl-->BaseContinuationImpl-->Continuation<Any?>, CoroutineStackFrame, Serializable
所以 intercepted()方法就是調(diào)用 ContinuationImpl 內(nèi)部實(shí)現(xiàn)的
public fun intercepted(): Continuation<Any?> =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }
context[ContinuationInterceptor]此時(shí)獲得的就是 Dispatchers.IO,
其 interceptContinuation 方法如下
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation)
把 continuation 封裝成了 DispatchedContinuation。其繼承關(guān)系如下:
DispatchedContinuation-->DispatchedTask-->SchedulerTask-->Task-->Runnable
需要注意的是 continuation 就是協(xié)程體。就是我們要執(zhí)行的內(nèi)容
1.4 繼續(xù)看 resumeCancellableWith 方法
在文件 kotlinx.coroutines.internal.DispatchedContinuation.kt
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(result: Result<T>,noinline onCancellation: ((cause: Throwable) -> Unit)?
) {val state = result.toState(onCancellation)//dispatcher就是協(xié)程代碼傳入的分發(fā)器,//判斷是否需要切換通過(guò)dispatcher執(zhí)行,當(dāng)前dispatcher.io,isDispatchNeeded是直接返回trueif (dispatcher.isDispatchNeeded(context)) {//代碼1_state = stateresumeMode = MODE_CANCELLABLEdispatcher.dispatch(context, this)} else {executeUnconfined(state, MODE_CANCELLABLE) {if (!resumeCancelled(state)) {resumeUndispatchedWith(result)}}}
}
dispatcher.dispatch()方法就把上邊生成的 runnable 放到了線程池隊(duì)列中
文件 kotlinx.coroutines.scheduling.Dispatcher.kt#LimitingDispatcher
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)private fun dispatch(block: Runnable, tailDispatch: Boolean) {var taskToSchedule = blockwhile (true) {// Commit in-flight tasks slotval inFlight = inFlightTasks.incrementAndGet()// Fast path, if parallelism limit is not reached, dispatch task and returnif (inFlight <= parallelism) {dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)return}//....}}
2. dispatche 具體是什么呢?
流程圖如下
2.1 其實(shí)是在 Dispatchers.IO 實(shí)例化時(shí)的參數(shù),DefaultScheduler 對(duì)象
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {val IO: CoroutineDispatcher = LimitingDispatcher(//這里實(shí)例化調(diào)度器對(duì)象this,systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),"Dispatchers.IO",TASK_PROBABLY_BLOCKING)//....}
而 DefaultScheduler 內(nèi)部實(shí)例化了一個(gè)線程池
2.2 在文件 kotlinx.coroutines.scheduling.Dispatcher.kt
//kotlinx.coroutines.scheduling.Dispatcher.kt#ExperimentalCoroutineDispatcher
override val executor: Executorget() = coroutineScheduler
private var coroutineScheduler = createScheduler()
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
dispatcher.dispatchWithContext,就是調(diào)用線程池的 dispatch,把任務(wù)放到 globalQueue 隊(duì)列里,我們看一下
在文件 kotlinx.coroutines.scheduling.CoroutineScheduler.kt
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {try {//coroutineScheduler就是線程池coroutineScheduler.dispatch(block, context, tailDispatch)} catch (e: RejectedExecutionException) {// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved// for testing purposes, so we don't have to worry about cancelling the affected Job here.// TaskContext shouldn't be lost here to properly invoke before/after taskDefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))}
}fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {trackTask() // this is needed for virtual time support//當(dāng)前block就繼承之Taskval task = createTask(block, taskContext)// try to submit the task to the local queue and act depending on the result//當(dāng)前線程池不是work,所以此時(shí)currentWorker返回為nullval currentWorker = currentWorker()//local放置失敗val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)if (notAdded != null) {//放到global隊(duì)列里if (!addToGlobalQueue(notAdded)) {// Global queue is closed in the last step of close/shutdown -- no more tasks should be acceptedthrow RejectedExecutionException("$schedulerName was terminated")}}
}
3. 任務(wù)具體如何執(zhí)行?
時(shí)序圖如下:
3.1 我們來(lái)看 kotlinx.coroutines.scheduling.CoroutineScheduler 文件
private fun runWorker() {var rescanned = falsewhile (!isTerminated && state != WorkerState.TERMINATED) {//通過(guò)上一步可以知道任務(wù)沒(méi)有放置到local隊(duì)列,mayHaveLocalTasks為falseval task = findTask(mayHaveLocalTasks)// Task found. Execute and repeatif (task != null) {rescanned = falseminDelayUntilStealableTaskNs = 0LexecuteTask(task)continue} else {mayHaveLocalTasks = false}//。。。。。。
}
private fun findAnyTask(scanLocalQueue: Boolean): Task? {/** Anti-starvation mechanism: probabilistically poll either local* or global queue to ensure progress for both external and internal tasks.*/if (scanLocalQueue) {val globalFirst = nextInt(2 * corePoolSize) == 0if (globalFirst) pollGlobalQueues()?.let { return it }localQueue.poll()?.let { return it }if (!globalFirst) pollGlobalQueues()?.let { return it }} else {//從glocal中取出任務(wù)pollGlobalQueues()?.let { return it }}return trySteal(blockingOnly = false)
}private fun pollGlobalQueues(): Task? {if (nextInt(2) == 0) {globalCpuQueue.removeFirstOrNull()?.let { return it }return globalBlockingQueue.removeFirstOrNull()} else {globalBlockingQueue.removeFirstOrNull()?.let { return it }return globalCpuQueue.removeFirstOrNull()}
}//參數(shù)task就是一個(gè)runnable
private fun executeTask(task: Task) {val taskMode = task.modeidleReset(taskMode)beforeTask(taskMode)//執(zhí)行task里的run方法runSafely(task)afterTask(taskMode)
}
3.2 Task 的 run 方法的實(shí)現(xiàn)在 kotlinx.coroutines.DispatchedTask 里
public final override fun run() {
// should have been set before dispatchingval taskContext = this.taskContextvar fatalException: Throwable? = nulltry {//...withCoroutineContext(context, delegate.countOrElement) {//。。。。continuation.resume(getSuccessfulResult(state))//。。。。。}} catch (e: Throwable) {// This instead of runCatching to have nicer stacktrace and debug experiencefatalException = e} finally {val result = runCatching { taskContext.afterTask() }handleFatalException(fatalException, result.exceptionOrNull())}
}
3.3 continuation.resume 在 kotlin.coroutines.Continuation.kt 文件
public inline fun <T> Continuation<T>.resume(value: T): Unit =resumeWith(Result.success(value))
3.4 最終執(zhí)行內(nèi)容在文件:kotlin.coroutines.jvm.internal.ContinuationImpl 里
public final override fun resumeWith(result: Result<Any?>) {// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resumevar current = thisvar param = resultwhile (true) {// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure// can precisely track what part of suspended callstack was already resumedprobeCoroutineResumed(current)with(current) {val completion = completion!! // fail fast when trying to resume continuation without completionval outcome: Result<Any?> =try {//執(zhí)行協(xié)程體內(nèi)容val outcome = invokeSuspend(param)if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // this state machine instance is terminatingif (completion is BaseContinuationImpl) {// unrolling recursion via loopcurrent = completionparam = outcome} else {// top-level completion reached -- invoke and returncompletion.resumeWith(outcome)return}}}
}
3.5 invokeSuspend 在哪呢?還是找不到!同樣需要反編譯查看。就是
public final class MainViewModel extends ViewModel {public static final Companion Companion = new Companion(null);private static final String TAG = "MainViewModel";public final void getData() {Trace.beginSection("getData");StringBuilder stringBuilder = new StringBuilder();stringBuilder.append("getData before ");stringBuilder.append(Thread.currentThread().getName());Log.e("MainViewModel", stringBuilder.toString());MainViewModel$getData$eeeee$1 mainViewModel$getData$eeeee$1 = new MainViewModel$getData$eeeee$1(null);BuildersKt.launch$default(ViewModelKt.getViewModelScope(this), (CoroutineContext)Dispatchers.getIO(), null, mainViewModel$getData$eeeee$1, 2, null);}@Metadata(d1 = {"\000\n\n\000\n\002\020\002\n\002\030\002\020\000\032\0020\001*\0020\002H@"}, d2 = {"<anonymous>", "", "Lkotlinx/coroutines/CoroutineScope;"}, k = 3, mv = {1, 6, 0}, xi = 48)@DebugMetadata(c = "com.haier.uhome.coroutine.ui.main.MainViewModel$getData$eeeee$1", f = "MainViewModel.kt", i = {}, l = {}, m = "invokeSuspend", n = {}, s = {})static final class MainViewModel$getData$eeeee$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {int label;public final Object invokeSuspend(Object param1Object) {IntrinsicsKt.getCOROUTINE_SUSPENDED();if (this.label == 0) {ResultKt.throwOnFailure(param1Object);Trace.beginSection("DispatchersIO");param1Object = new StringBuilder();param1Object.append("getData IO 1 ");param1Object.append(Thread.currentThread().getName());Log.e("MainViewModel", param1Object.toString());Thread.sleep(1000L);param1Object = new StringBuilder();param1Object.append("getData IO 2 ");param1Object.append(Thread.currentThread().getName());Log.e("MainViewModel", param1Object.toString());Trace.endSection();return Unit.INSTANCE;} throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}}
}
到此處協(xié)程 launch 內(nèi)容就執(zhí)行完了。
4. 總結(jié)
其底層使用的就是對(duì)線程池的封裝,把協(xié)程體封裝到 runnable 里,放到線程池執(zhí)行。使用了的線程池線程復(fù)用,不必頻繁的創(chuàng)建,銷毀線程等優(yōu)點(diǎn)。提升了性能
其他的 Dispatcher,我就不一一跟蹤了,有興趣的同學(xué)可以自己跟蹤一下。這里簡(jiǎn)單介紹下我的理解:
Dispatchers.Main:其內(nèi)部使用的 MainCoroutineDispatcher,把任務(wù)放到主線程的 handler 順序執(zhí)行
Dispatchers.Default:是一個(gè)使用 DefaultScheduler 的線程池,據(jù)說(shuō)比較適合做邏輯性任務(wù)(這個(gè)我看不出來(lái)😋)
Dispatchers.Unconfined:跟隨父協(xié)程的 context,直接執(zhí)行,不做線程切換
launch 主要邏輯不是很復(fù)雜,主要就是線程池的調(diào)度。難以跟蹤的原因大概是因?yàn)樵创a中到處在使用函數(shù)擴(kuò)展。再加上協(xié)程體的具體實(shí)現(xiàn)是 kotlin 編譯過(guò)程中生成的。所以花的時(shí)間比較多,需要有耐心!
5. 團(tuán)隊(duì)介紹
「三翼鳥(niǎo)數(shù)字化技術(shù)平臺(tái)-場(chǎng)景設(shè)計(jì)交互平臺(tái)」主要負(fù)責(zé)設(shè)計(jì)工具的研發(fā),包括營(yíng)銷設(shè)計(jì)工具、家電VR設(shè)計(jì)和展示、水電暖通前置設(shè)計(jì)能力,研發(fā)并沉淀素材庫(kù),構(gòu)建家居家裝素材庫(kù),集成戶型庫(kù)、全品類產(chǎn)品庫(kù)、設(shè)計(jì)方案庫(kù)、生產(chǎn)工藝模型,打造基于戶型和風(fēng)格的AI設(shè)計(jì)能力,快速生成算量和報(bào)價(jià);同時(shí)研發(fā)了門店設(shè)計(jì)師中心和項(xiàng)目中心,包括設(shè)計(jì)師管理能力和項(xiàng)目經(jīng)理管理能力。實(shí)現(xiàn)了場(chǎng)景全生命周期管理,同時(shí)為水,空氣,廚房等產(chǎn)業(yè)提供商機(jī)管理工具,從而實(shí)現(xiàn)了以場(chǎng)景貫穿的B端C端全流程系統(tǒng)。