kotiln 线程池也是一个神奇的物种啊。
| kotlin 线程池用法 | 线程池原理 |
|---|---|
| kotlin 线程池的用法为在使用默认的 Dispatchers.IO 或者其他时,他们本身底层是基于线程池做的协程支持,这里的协程类似于 Java 线程池里的 Runnable 类似,都是一个个任务 | 和 Java 线程池类似但是实现不同,有主动 steal 的动作 |
kotlin 线程池是怎么进去的

kotlin 线程池解析
一样滴,kotlin 线程池也是 task 的历险记 .1 .1
task 进入魔鬼池
.1 .2
// 陷入了协程中的本质,这里是分发 block
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
// 首先包装为 task
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
// 首先是获取当下的 worker 是那个
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
// 这里确认没有添加到一个具体的 worker 的队列里面就会抛出拒绝异常,因为这个 global 的 queue 也拒绝添加了,证明现在全忙且队列已满 global queue 是使用 LockFreeTaskQueue 实现的,但是因为这个场景下我们是多个消费者,因此也是有锁的
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
// 给 cpu 密集型工作发信号,这内部会尝试创建 worker
signalCpuWork()
} else {
// 给阻塞工作滴 worker 发信号
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
// 这个方法定义是可以为空的时候调用的如果 worker 为空那么就直接返回 task
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
// //
}
.1
.1
负责消化 task 的魔鬼 worker
.1 .1
internal inner class Worker private constructor() : Thread() {
init {
isDaemon = true
}
// guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
@Volatile // volatile for push/pop operation into parkedWorkersStack
var indexInArray = 0
set(index) {
name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
field = index
}
constructor(index: Int) : this() {
indexInArray = index
}
inline val scheduler get() = this@CoroutineScheduler
@JvmField
val localQueue: WorkQueue = WorkQueue()
/**
* Worker state. **Updated only by this worker thread**.
* By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
* Is used locally by the worker to maintain its own invariants.
*/
@JvmField
var state = WorkerState.DORMANT
/**
* Worker control state responsible for worker claiming, parking and termination.
* List of states:
* [PARKED] -- worker is parked and can self-terminate after a termination deadline.
* [CLAIMED] -- worker is claimed by an external submitter.
* [TERMINATED] -- worker is terminated and no longer usable.
*/
val workerCtl = atomic(CLAIMED)
/**
* It is set to the termination deadline when started doing [park] and it reset
* when there is a task. It servers as protection against spurious wakeups of parkNanos.
worker 什么时候到达他生命的尽头,在创建函数哪里我们可以看到是默认最多活 60s 的
*/
private var terminationDeadline = 0L
/** worker 是否到堆栈中待命了 */
@Volatile
var nextParkedWorker: Any? = NOT_IN_STACK
/*
* The delay until at least one task in other worker queues will become stealable.
等待啥时候可以 steal 别人的 task呀,我好饿。
*/
private var minDelayUntilStealableTaskNs = 0L
private var rngState = Random.nextInt()
/**
* Tries to acquire CPU token if worker doesn't have one
获取 cpu 时间的控制权
* @return whether worker acquired (or already had) CPU token
*/
private fun tryAcquireCpuPermit(): Boolean = when {
state == WorkerState.CPU_ACQUIRED -> true
this@CoroutineScheduler.tryAcquireCpuPermit() -> {
state = WorkerState.CPU_ACQUIRED
true
}
else -> false
}
/**
* Releases CPU token if worker has any and changes state to [newState].
* Returns `true` if CPU permit was returned to the pool
*/
internal fun tryReleaseCpu(newState: WorkerState): Boolean {
val previousState = state
val hadCpu = previousState == WorkerState.CPU_ACQUIRED
if (hadCpu) releaseCpuPermit()
if (previousState != newState) state = newState
return hadCpu
}
// 这里直接等于 runWorker 了
override fun run() = runWorker()
@JvmField
var mayHaveLocalTasks = false
// 魔鬼开动
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
// 找到我的食物了
val task = findTask(mayHaveLocalTasks)
// Task found. Execute and repeat
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
// 消化他
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
// 等待 steal 的时间不为 0,那么就会释放 cpu 并且清空 interrupt 标志,然后阻塞 minDelayUntilStealableTaskNs 这段时间
if (minDelayUntilStealableTaskNs != 0L) {
if (!rescanned) {
rescanned = true
} else {
rescanned = false
// 释放 cpu
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L
}
continue
}
// 这里是 park 自己,如果 park 期间没人唤醒,那么我们就要消亡了
tryPark()
}
// 释放 cpu 资源
tryReleaseCpu(WorkerState.TERMINATED)
}
// Counterpart to "tryUnpark"
private fun tryPark() {
// 如果不在栈中那么就把它放到栈中,等待被调用
if (!inStack()) {
parkedWorkersStackPush(this)
return
}
assert { localQueue.size == 0 }
// 更新状态为 PARKED
workerCtl.value = PARKED // Update value once
// 如果在栈中那么就去释放资源并且进入 park 状态
while (inStack()) { // Prevent spurious wakeups
if (isTerminated || state == WorkerState.TERMINATED) break
tryReleaseCpu(WorkerState.PARKING)
interrupted() // Cleanup interruptions
park()
}
}
private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
// 我要吃,我具体怎么吃,第一步 reset idle、第二步 before task 通知、第三步 run Safely、第四步 after task 收尾
private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}
private fun beforeTask(taskMode: Int) {
if (taskMode == TASK_NON_BLOCKING) return
// Always notify about new work when releasing CPU-permit to execute some blocking task
if (tryReleaseCpu(WorkerState.BLOCKING)) {
// 发信号可以消耗任务了
signalCpuWork()
}
}
private fun afterTask(taskMode: Int) {
if (taskMode == TASK_NON_BLOCKING) return
decrementBlockingTasks()
val currentState = state
// Shutdown sequence of blocking dispatcher
if (currentState !== WorkerState.TERMINATED) {
assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
state = WorkerState.DORMANT
}
}
/*
* Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes.
* ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on Ktor benchmarks
*/
// 提高 steal 效率
internal fun nextInt(upperBound: Int): Int {
var r = rngState
r = r xor (r shl 13)
r = r xor (r shr 17)
r = r xor (r shl 5)
rngState = r
val mask = upperBound - 1
// Fast path for power of two bound
if (mask and upperBound == 0) {
return r and mask
}
return (r and Int.MAX_VALUE) % upperBound
}
// park 住了,park 期间每人呼唤我那我就要 die 了啊
private fun park() {
// set termination deadline the first time we are here (it is reset in idleReset)
if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
// actually park
LockSupport.parkNanos(idleWorkerKeepAliveNs)
// try terminate when we are idle past termination deadline
// note that comparison is written like this to protect against potential nanoTime wraparound
if (System.nanoTime() - terminationDeadline >= 0) {
terminationDeadline = 0L // if attempt to terminate worker fails we'd extend deadline again
tryTerminateWorker()
}
}
/**
* Stops execution of current thread and removes it from [createdWorkers].
*/
private fun tryTerminateWorker() {
synchronized(workers) {
// Make sure we're not trying race with termination of scheduler
if (isTerminated) return
// Someone else terminated, bail out
if (createdWorkers <= corePoolSize) return
/*
* See tryUnpark for state reasoning.
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
*/
if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return
/*
* At this point this thread is no longer considered as usable for scheduling.
* We need multi-step choreography to reindex workers.
*
* 1) Read current worker's index and reset it to zero.
*/
val oldIndex = indexInArray
indexInArray = 0
/*
* Now this worker cannot become the top of parkedWorkersStack, but it can
* still be at the stack top via oldIndex.
*
* 2) Update top of stack if it was pointing to oldIndex and make sure no
* pending push/pop operation that might have already retrieved oldIndex could complete.
*/
parkedWorkersStackTopUpdate(this, oldIndex, 0)
/*
* 3) Move last worker into an index in array that was previously occupied by this worker,
* if last worker was a different one (sic!).
*/
val lastIndex = decrementCreatedWorkers()
if (lastIndex != oldIndex) {
val lastWorker = workers[lastIndex]!!
workers[oldIndex] = lastWorker
lastWorker.indexInArray = oldIndex
/*
* Now lastWorker is available at both indices in the array, but it can
* still be at the stack top on via its lastIndex
*
* 4) Update top of stack lastIndex -> oldIndex and make sure no
* pending push/pop operation that might have already retrieved lastIndex could complete.
*/
parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
}
/*
* 5) It is safe to clear reference from workers array now.
*/
workers[lastIndex] = null
}
state = WorkerState.TERMINATED
}
// It is invoked by this worker when it finds a task
private fun idleReset(mode: Int) {
terminationDeadline = 0L // reset deadline for termination
if (state == WorkerState.PARKING) {
assert { mode == TASK_PROBABLY_BLOCKING }
state = WorkerState.BLOCKING
}
}
// 给我找食物啊
fun findTask(scanLocalQueue: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
// If we can't acquire a CPU permit -- attempt to find blocking task
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
return task ?: trySteal(blockingOnly = true)
}
// 找任何的 task 给我吃
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
/*
* Anti-starvation mechanism: probabilistically poll either local
* or global queue to ensure progress for both external and internal tasks.
*/
if (scanLocalQueue) {
val globalFirst = nextInt(2 * corePoolSize) == 0
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
} else {
pollGlobalQueues()?.let { return it }
}
return trySteal(blockingOnly = false)
}
// 为第二种 找食物的 task 提供原料
private fun pollGlobalQueues(): Task? {
if (nextInt(2) == 0) {
globalCpuQueue.removeFirstOrNull()?.let { return it }
return globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()?.let { return it }
return globalCpuQueue.removeFirstOrNull()
}
}
private fun trySteal(blockingOnly: Boolean): Task? {
assert { localQueue.size == 0 }
val created = createdWorkers
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
// 核心数少于 2 没有几个 Thread ,我没地方偷东西吃呀
if (created < 2) {
return null
}
var currentIndex = nextInt(created)
var minDelay = Long.MAX_VALUE
repeat(created) {
++currentIndex
if (currentIndex > created) currentIndex = 1
// 从 魔鬼从中顺序选择
val worker = workers[currentIndex]
if (worker !== null && worker !== this) {
assert { localQueue.size == 0 }
// 我是偷 blocking 的吃还是偷别的捏
val stealResult = if (blockingOnly) {
// WorkQueue 里面的 tryStealBlockingFrom 方法
localQueue.tryStealBlockingFrom(victim = worker.localQueue)
} else {
localQueue.tryStealFrom(victim = worker.localQueue)
}
if (stealResult == TASK_STOLEN) {
// 偷成功了 那就从 queue 里面取出来给我吃呀嘿嘿嘿
return localQueue.poll()
} else if (stealResult > 0) {
// delay 多久再去偷
minDelay = min(minDelay, stealResult)
}
}
}
minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
return null
}
}
.1 .1
未完待续的分析
.1 .1
LockFreeTaskQueue
WorkingQueue
state
localQueue
indexInArray .1 .1
核心
.1 核心是 atomic workerCtl 存储状态,以及改变状态。
2 1
管道失序
在 linux 或者 mac 下使用管道之后在使用 grep 会发现内容不仅仅是 history 的会被 grep,全部文件都会,原因是我在 grep 的时候增加了 -rns 参数。