JobSupport 看名字就像 Java 里面的 LockSupport,同样地,在 Java 里面这个 LockSupport 是核心,现在 Kotlin 则是 JobSupport ,只是这里的明显看到说是要废弃这个 Api 也不知道后面怎么准备,只能拭目以待了。

关键词们 用法解析
_state 表明当前 Job 状态
isActive 是否是激活的状态
parentHandle 父句柄
start 开始工作
completionCause 异常发生原因

Support 核心

主要是就是可以看到状态的驱动,根据不同的状态生命周期去完成事件驱动。 这里实现了很多 Job 定义的方法,例如 Join / Cancel / await 等等, Join 内部会首先调用 joinInternal 去判断状态,如果这个没有返回想要的状态,那么会继续陷入 joinSuspend 中, Cancel 则调用的 cancelInternal 去进行 Cancel ,内部继续调用到 cancelImpl 中, 然后根据状态去判定调用 makeCancelling 还是 cancelMakeCompleting , afterCompletion 在这些方法里面有的是通知别的角色当前角色状态的改变,有的是 进行收尾。 Start 方法会调用到 startInternal 然后 _state 会通过 cas 去置换状态,置换成功之后会调用 onStartInternal 通知这个启动成功。 parentCancelled 也会调用到自己的 cancel 这个就是协程取消传递的原因,理论上就是递归终止,由 parent 触发,但是子协程会逐级取消自己的执行。

// 子协程的 Cancel 则会区分情况,如果是正常 Cancel 也就是通过 CancellationException 异常去处理的,那么就不理,否则就会取消自己并且通报是否有异常发生
    public open fun childCancelled(cause: Throwable): Boolean {
        if (cause is CancellationException) return true
        return cancelImpl(cause) && handlesException
    }

// 协程核心,这里调度告知最后结果
    public final override fun invokeOnCompletion(
        onCancelling: Boolean,
        invokeImmediately: Boolean,
        handler: CompletionHandler
    ): DisposableHandle {
        var nodeCache: JobNode<*>? = null
        loopOnState { state ->
            when (state) {
                is Empty -> { // EMPTY_X state -- no completion handlers
                    if (state.isActive) {
                        // try move to SINGLE state
                        val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                        if (_state.compareAndSet(state, node)) return node
                    } else
                        promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
                }
                is Incomplete -> {
                    val list = state.list
                    if (list == null) { // SINGLE/SINGLE+
                        promoteSingleToNodeList(state as JobNode<*>)
                    } else {
                        var rootCause: Throwable? = null
                        var handle: DisposableHandle = NonDisposableHandle
                        if (onCancelling && state is Finishing) {
                            synchronized(state) {
                                // check if we are installing cancellation handler on job that is being cancelled
                                rootCause = state.rootCause // != null if cancelling job
                                // We add node to the list in two cases --- either the job is not being cancelled
                                // or we are adding a child to a coroutine that is not completing yet
                                if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
                                    // Note: add node the list while holding lock on state (make sure it cannot change)
                                    val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                                    if (!addLastAtomic(state, list, node)) return@loopOnState // retry
                                    // just return node if we don't have to invoke handler (not cancelling yet)
                                    if (rootCause == null) return node
                                    // otherwise handler is invoked immediately out of the synchronized section & handle returned
                                    handle = node
                                }
                            }
                        }
                        if (rootCause != null) {
                            // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
                            if (invokeImmediately) handler.invokeIt(rootCause)
                            return handle
                        } else {
                            val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
                            if (addLastAtomic(state, list, node)) return node
                        }
                    }
                }
                else -> { // is complete
                    // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
                    // because we play type tricks on Kotlin/JS and handler is not necessarily a function there
                    if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
                    return NonDisposableHandle
                }
            }
        }
    }    

// await() 实现
    internal suspend fun awaitInternal(): Any? {
        // fast-path -- check state (avoid extra object creation)
        while (true) { // lock-free loop on state
            val state = this.state
            if (state !is Incomplete) {
                // already complete -- just return result
                if (state is CompletedExceptionally) { // Slow path to recover stacktrace
                    recoverAndThrow(state.cause)
                }
                return state.unboxState()

            }
            if (startInternal(state) >= 0) break // break unless needs to retry
        }
        return awaitSuspend() // slow-path
    }

// 这里是核心的定义
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    // 这里封装了回调的结果。 
    public fun resumeWith(result: Result<T>)
}    

// delay 的定义
    public suspend fun delay(time: Long) {
        if (time <= 0) return // don't delay
        // 调用了 suspendCancellableCoroutine 传入了 scheduleResumeAfterDelay
        return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) }
    }

// 使用了 Java 底层的能力。
    @InlineOnly
internal inline fun unpark(thread: Thread) {
    timeSource?.unpark(thread) ?: LockSupport.unpark(thread)
}