Kotlin协程

协程是什么

协程是一个与线程并列的概念,二者都是用来管理并发的,不过概念模型不太一样,有的编程语言用线程管理并发,有的使用协程,有的都用,所以kotlin的协程就是一个并发管理工具,定位和线程是一样的。因为kotlin是一门中间语言,它的代码最终还是要编译成Java字节码,这里讲都是Jvm上的协程,而Jvm提供的只有线程那一套,kotlin作为上层语言,是不可能绕过线程来创建新的并发实现的,那它是怎么办的呢?

实际上,kotlin协程的底层依然是通过Java的线程实现的,它把线程包起来,封装成一套新的API来让我们管理并发,它是一个用Java线程来实现的并发管理工具库。那问题就来了,我都有线程了,为什么要使用协程这个上层包装?

因为它包的好,虽然底层是线程,但协程比线程要好用。好用在哪呢?

协程有很多比线程好用的点,但其中最重要的一点就在于它能用线性的结构来写异步代码。

分类

按调用栈分类

通常我们提及调用栈,指的就是函数调用栈,是一种用来保保存函数调用时的状态信息的数据结构。

由于协程需要支持挂起、恢复,因此对于挂起点的状态保存就显得极其关键。类似地,线程会因为CPU调度权的切换而被中断,它的中断状态会保存在调用栈当中,因而协程的实现也可以按照是
否开辟相应的调用栈来分类

  • 有栈协程(Stackful Coroutine):每一个协程都有自己的调用栈,有点类似于线程的调用栈,这种情况下的协程实现其实很大程度上接近线程,主要的不同体现在调度上。
  • 无栈协程(Stackless Coroutine):协程没有自己的调用栈,挂起点的状态通过状态机或者闭包等语法来实现。

有栈协程的优点是可以在任意函数调用层级的任意位置挂起并转移调度权,例如Lua的协程。在这方面多数无栈协程就显得力不从心了,例如Python的Generator。通常,有栈协程总是会给协程开辟一块栈内存,因此内存开销也大大增加,而无栈协程在内存方面就比较有优势了。

当然也有反例。Go语言的goroutine可以认为是有栈协程的一个实现,不过Go运行时在这里做了大量优化,它的栈内存可以根据需要进行扩容和缩容,最小一般为内存页长4KB,比内核线程的栈空间(通常是MB级别)要小得多,可见它在内存方面相对轻量Kotlin的协程通常被认为是一种无栈协程的实现,它的控制流转依靠对协程体本身编译生成的状态机的状态流转来实现,变量保存也是通过闭包语法来实现的。不过,Kotlin的协程可以在挂起函数范围内的任意调用层次挂起,换句话说,我们启动一个Kotlin协程,可以在其中任意嵌套suspend函数,而这又恰恰是有栈协程最重要的特性之一。

Kotlin通过suspend函数嵌套调用的方式可以实现任意挂起函数调用层次的挂起。当然,想要在任意位置挂起,就需要对原有的函数进行增强。以Kotlin为例,这种情况下最终的协程实现就不需要挂起函数了,普通函数就相当于挂起函数。不过Kotlin的协程设计并没有采取这样的方案,其原因如下。

  • 实现这样的特性需要对普通函数的调用机制进行修改和增强,Kotlin所支持的所有运行环境(包括Java虚拟机、Node.js等)也都要提供相应的支持。这一点可以参考Java的协程项目Loom。
  • 对于普通函数的增强调度切换协程很多时候变成了隐式的行为,至少不怎么明显,例如goroutine,一个API调用之后究竞会发生什么就成了运行时提供的“黑魔法”。
  • 如果想要避免隐式调度,可以在设计API时保留基本的yield和resume作为协程转移调度权的手段供开发者调用,但这样又显得不够实用,需要进一步封装以达到易用的效果。

Kotlin协程的实现很好地平衡了这一点,既避免了对运行环境的过分依赖,又能满足协程在任意挂起函数调用层次挂起的需求。

与开发者通过调用API显式地挂起协程相比,任意位置的挂起也可以用于运行时对协程执行的干预,这种挂起方式对于开发者不可见,因此是一种隐式的挂起操作。Go语言的goroutine可以通过对channel的读写来实现挂起和恢复。除了这种显式的调度权切换之外,Go运行时还会对长期占用调度权的goroutine进行隐式挂起并将调度权转移给其他goroutine,这实际上就是我们熟悉的抢占式调度了。

关于协程实现究竟属于有栈协程还是无栈协程的问题,实际上争论较多,争议点主要是调用栈本身的定义及协程实现形式上的差异。从狭义上讲,调用栈就是我们熟知的普通函数的调用栈;从广义上讲,只要是能够保存调用状态的栈都可以称为调用栈,因而有栈协程的定义也可以更加宽泛。本书中若无特别说明,调用栈均特指普通函数调用栈,并按照这个标准对协程进行分类。

按调度方式分类

调度过程中,根据协程调度权的转移目标的不同又可将协程分为对称协程非对称协程

  • 对称协程(Symmetric Coroutine):任何一个协程都是相互独立且平等的,调度权可以在任意协程之间转移。
  • 非对称协程(Asymmetric Coroutine):协程出让调度权的目标只能是它的调用者,即协程之间存在调用和被调用关系。

对称协程实际上已经非常接近线程的样子了,例如Go语言中的goroutine可以通过读写不同的channel来实现控制权的自由转移,而非对称协程的调用关系实际上更符合我们的思维方式。常见语言对协程的实现大多是非对称实现,例如Lua的协程中,当前协程调用yield总是会将调度权转移给之前调用它的协程;还有我们在前面提到的async/await,await时将调度权转移到异步调用中,异步调用返回结果或抛出异常时总是将调度权转移回await的位置。

从实现的角度来讲,非对称协程的实现更自然,也相对容易而我们只要对非对称协程稍作修改,即可实现对称协程的能力。在非对称协程的基础上,我们只需要添加一个中立的第三方作为协程调度权的分发中心,所有的协程在挂起时都将调度权转移给分发中心,分发中心根据参数来决定将调度权转移给哪个协程,例如Lua的第三方库coro(http://luapower.com/coro)和Kotlin协程框架中基于Channel(https://kotlinlang.org/docs/channels.html)的通信等。

协程基础

切线程:launch()

前面说到,对于Jvm上的kotlin协程就是个用线程来实现的并发管理库,而并发这个东西的内容十分复杂,大概分为三类:

  1. 切线程(最基本)
  2. 在各个线程执行过程中等待别的线程,它属于线程之间在流程上的配合
  3. 互斥锁,它面对的是对于线程间的共享资源的保护,也就是所谓的线程安全

我们为什么要切线程呢

最常见的原因就是我有代码要执行,但不希望它挡住我当前的线程,所以给它一个并行的线程来执行这段代码。可以切换到子线程后台线程以及UI线程(Android或者swing)

1
2
3
4
5
6
7
8
9
10
//Java线程
Thread {
}.start()
//简写
thread {
}
//线程池
val executor = Executors.newCachedThreadPool()
executor.execute {
}

首先我们要使用协程,并需要引入协程依赖

1
2
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.1") // Android
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")

CoroutineScope 的作用跟Executor在线程里的作用是类似的,不过CoroutineScope的功能范围更大一些,Executor本身就是一个线程池,而CoroutineScope里面也包含线程池,但线程池只是它功能的一部分,也是最重要的一部分。

1
2
3
4
val scope = CoroutineScope(EmptyCoroutineContext)
//启动协程
scope.launch {
}

image-20240804184259881

CoroutineContext 会提供启动协程会用到的上下文信息,比如线程池。调用launch方法启动协程,实际上就是切换线程。

execute是把代码装进一个Runnable对象,而协程是把代码装进一个函数类型的对象

image-20240804185001706

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main() {
//线程池
println("Main thread: ${Thread.currentThread().name}")
val executor = Executors.newCachedThreadPool()
executor.execute {
println("Executor thread: ${Thread.currentThread().name}")
}

val scope = CoroutineScope(EmptyCoroutineContext)
//启动协程
scope.launch {
println("Coroutine thread: ${Thread.currentThread().name}")
}
}

image-20240804185525897

其中管理任务执行的线程的工具叫 ContinuationInterceptor ,继续拦截器?,其实就是代码在往下执行之前先拦截住,做点别的操作再继续执行的意思,也就是拦截一下、做点别的工作、再继续执行,我们可以给出自己的实现类,但协程给我们提供了4个直接使用的实现类,放在了 Dispatchers 这个Object中:

image-20240804201004065

为什么叫Dispatchers,而不是叫Interceptors或者ContinuationInterceptors?

因为它们并不是直接实现ContinuationInterceptor,而是实现了它唯一的子类 CoroutineDispatcher,直译过来就是协程调度器,用来调度任务,也就是切线程。

image-20240804201334279

如果你没有指定任何ContinuationInterceptor,launch()启动的协程就会使用Default来调度任务,它提供一个全局的线程池来管理任务。跟它类似的还有一个叫IO,它也是提供后台线程的,但他俩的定位不一样。Default是来处理计算密集型任务的,IO是来处理IO密集型任务的。

Default:

  • 线程池大小:跟程序运行的设备的CPU可用核心数是相等的
    • 虽然线程数越多效率越高,但线程数超过CPU核心数时,效率就从升高变成降低了

IO:

  • 线程池大小:固定64线程
    • 当CPU核心数超过64时,就等于CPU核心数

计算密集型:就是整个任务过程里,你的CPU是满负荷运转的,或者说,卡住你的执行流程的是你的CPU计算性能。比如:你在程序里用滤镜算法给图片加滤镜、图片压缩、媒体编解码,也包括普通工作,比如在内存里进行个字符串拼接之类的(不跟外界交互)。

IO密集型:与计算密集型相反,它指的是在整个任务里,CPU比较空闲,反而是IO工作(跟内存之外的世界进行数据交互:磁盘、网络)在耗时间。比如:读写磁盘、进行http请求

开启IO,默认为Default

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//复用写法
val scope = CoroutineScope(Dispatchers.IO)
scope.launch {
println("Coroutine thread: ${Thread.currentThread().name}")
}
// 或者
// 每个launch指定
val scope = CoroutineScope(EmptyCoroutineContext)
scope.launch(Dispatchers.IO) {
println("Coroutine thread: ${Thread.currentThread().name}")
}
scope.launch(Dispatchers.Default) {
println("Coroutine thread: ${Thread.currentThread().name}")
}

Main:在主线程执行(Android),swing会切换到事件分发线程,总之就是更新界面的线程。如果是Spring或者Ktor这种服务器程序,你填写Main就会在运行时报错,原因很简单,因为服务端框架不存在切到主线程的需求。

一般使用前面三个就行了,但如果你想自己创建线程池:

  1. 使用newFixedThreadPoolContext
1
2
3
4
5
6
val context = newFixedThreadPoolContext(20, "DIY")
val scope = CoroutineScope(context)
scope.launch {
println("Coroutine thread: ${Thread.currentThread().name}")
}
context.close()

该函数被标记为@DelicateCoroutinesApi 比较精细、容易出错,是因为它里面有一个线程池,而线程是比较耗资源的,我们在不使用时,记得及时把它关闭。关闭的方法也比较简单,因为它继承了 ExecutorCoroutineDispatcher 抽象类,而这个抽象类额外实现了 Closeable 接口,所以多了一个 close 函数。

image-20240804205949126

image-20240804210040295可以使用 @OptIn(DelicateCoroutinesApi::class) 注解去除警告。
那为什么这个就需要关闭,而Default和IO就不需要关闭?

因为它们是全局的、永久存活的。而newFixedThreadPoolContext是我们自己创建的,需要我们自己管理生命周期。

  1. newSingleThreadContext单个线程
1
2
3
4
5
6
val context = newSingleThreadContext("Single")
val scope = CoroutineScope(context)
scope.launch {
println("Coroutine thread: ${Thread.currentThread().name}")
}
context.close()

内部其实是使用newFixedThreadPoolContext实现:

image-20240804210950934

而且它是实验性的,不建议使用。

此外系统还有一个ContinuationInterceptor:Unconfined,其实根本不会在实际开发中使用。就像名字一样,就是不进行限制的意思,用它启动协程,直接就执行里面代码,不切线程,而且它不适用于suspend挂起函数这种会自动切回原线程的方法。

自动切回来:挂起函数

我们在做界面开发时有个比较常见的场景需求是在UI线程去启动后台线程发起网络请求,请求结束后切回主线程刷新界面,如果不用协程也不难,只需要切换2次线程,2个回调就搞定了。而如果你用了网络请求库,你甚至可以只用一次回调就够了,因为网络库会帮你自动在后台进行请求,那么后台的这次回调那就不用自己写了,比如Java和Kotlin的HTTP库叫Retrofit,你只要把HTTP的API格式用接口的形式去声明出来

Retrofit简单使用

添加依赖:

1
2
3
implementation("com.squareup.retrofit2:retrofit:2.11.0")
implementation("com.squareup.retrofit2:adapter-rxjava3:2.11.0")
implementation("com.squareup.retrofit2:converter-gson:2.11.0")

Api.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
const val GITHUB_API = "https://api.github.com/"

data class Contributor(
val login: String,
val contributions: Long
)

interface GitHub{
@GET("/repos/{owner}/{repo}/contributors")
fun contributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<Contributor>>

@GET("/repos/{owner}/{repo}/contributors")
suspend fun contributors(
@Path("owner") owner: String,
@Path("repo") repo: String
): List<Contributor>

@GET("/repos/{owner}/{repo}/contributors")
fun contributorsFuture(
@Path("owner") owner: String,
@Path("repo") repo: String
): CompletableFuture<List<Contributor>>
}
1
2
3
4
5
6
7
private val retrofit =
Retrofit.Builder().baseUrl(GITHUB_API)
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()

val gitHub: GitHub = retrofit.create(GitHub::class.java)

网络请求业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class SuspendActivity : ComponentActivity() {
private lateinit var textView: TextView

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.suspend_layout)
textView = findViewById(R.id.suspendText)
callbackStyle()
}

private fun callbackStyle() {
gitHub.contributorsCall("square", "retrofit")
.enqueue(object : Callback<List<Contributor>> {
override fun onResponse(
call: Call<List<Contributor>>,
response: Response<List<Contributor>>
) {
showContributors(response.body()!!)
}

override fun onFailure(p0: Call<List<Contributor>>, t: Throwable) {
t.printStackTrace()
}
})
}

private fun coroutineStyle() = CoroutineScope(Dispatchers.Main).launch {
val contributors = gitHub.contributors("square", "retrofit")
showContributors(contributors)
}

private fun showContributors(contributors: List<Contributor>) = contributors
.map { "${it.login} (${it.contributions})" }
.reduce { acc, s -> "$acc\n$s" }
.let { textView.text = it }
}
image-20240805111835763

可以发现 contributors 函数我们给它标记上了 suspend 关键字,suspend意为挂起,那么这个函数就是一个挂起函数,那么什么是挂起函数呢?

挂起函数就是你在执行这个函数的时候,它所在的协程就被挂起了,或者说被暂停了。在kotlin里面,所谓协程被挂起,指的就是它不再占用它正在工作的线程,在这个例子里就是主线程,它的核心在于协程与线程的脱离,线程被让出了。同时,协程虽然被挂起了,但这个挂起函数并没有被挂起,或者说,其实就不存在挂起函数被挂起这个概念,协程里的挂起指的是挂起函数把协程给暂停了的意思,它会切换到指定的线程执行代码(这里是在后台线程执行网络请求),执行完后,协程恢复。所谓的协程恢复,其实就是协程继续回到自己的线程去执行挂起函数后面的代码。这么说来,挂起函数只有在协程(或者其他挂起函数)里面才有意义,不然它挂起谁呀,都没有对象。

这只是一个简单的例子,但也能看出协程的优势,一旦回调增多(容易发生回调地狱),协程的优点就更加明显。

除了我们这里使用的Retrofit支持协程,常用的还有Jetpack全家桶,比如:room、paging

Android项目里协程写法

在Android应用的开发中,我们通常不用自己去创建这个CoroutineScope,而是使用Jetpack给我们提供的成品,Android的Jetpack库里有一些针对kotlin语言特性的扩展,也就是所谓的 KTX ,其中 lifecycle 这个库给我们提供了一个扩展属性 lifecycleScope ,他是 LifecycleOwner 的扩展属性,而 LifecycleOwner 是一个接口,我们常用的 ComponentActivityFragment 都实现了这个接口。

image-20240805112405095

image-20240805112731473

lifecycleScope:

  1. 与当前组件声明周期绑定,你在Activity里调用它,它就与Activity生命周期绑定,就是在Activity调用onDestory时自动帮你取消整个CoroutineScope所包含的所有协程
  2. 有内置的ContinuationInterceptor,它内置的是主线程

image-20240805114512193

这个 immediate 也是一个 CoroutineDispatcher 。它和Main一样,也是指定的主线程,不过Main启动的协程,不管当前线程是什么,都会把整个协程的代码块包起来,然后用 Handler.post() 抛到主线程去,而Main.immediate会先看一下是不是已经在主线程了,如果不在主线程,就用Handler.post(),如果已经在主线程了,那就直接执行协程的代码。一般情况下,immediate要比直接用Main好,因为它属于一个已经性能优化 的版本。

1
2
3
4
private fun coroutineStyle() = lifecycleScope.launch {
val contributors = gitHub.contributors("square", "retrofit")
showContributors(contributors)
}

如果我们不是在Activity或者Fragment中,而是用Jetpack里面的ViewModel组件

首先你需要引入它的KTX依赖

1
implementation("androidx.lifecycle:lifecycle-viewmodel-ktx:2.6.1")

使用:

1
2
3
4
5
6
class MyViewModel:ViewModel(){
fun viewModelTest(){
viewModelScope.launch {
}
}
}

image-20240805120231972

viewModelScope 是ViewModel的扩展属性,内置的也是Main.immediate,用法与lifecycleScope一致。

withContext():手动切线程

?切线程不是讲过了吗,用launch呀。

launch()开启的是并行的切线程,但我们可能还需要串行的切线程。

并行:

1
2
3
4
5
6
7
8
9
CoroutineScope(Dispatchers.Main).launch {
println("Test: 1 - launch + ${Thread.currentThread().name}")
launch(Dispatchers.IO) {
Thread.sleep(2000) // 模拟耗时,sleep()最准确
//delay(2000)
println("Test: 2 - launch + ${Thread.currentThread().name}")
}
println("Test: 3 - launch + ${Thread.currentThread().name}")
}

image-20240806100638412

串行(withContext:它是用来临时切换CoroutineContext而不只是切线程):

1
2
3
4
5
6
7
8
9
CoroutineScope(Dispatchers.Main).launch {
println("Test: 1 - launch + ${Thread.currentThread().name}")
withContext(Dispatchers.IO) {
Thread.sleep(2000) // 模拟耗时,sleep()最准确
//delay(2000)
println("Test: 2 - withContext + ${Thread.currentThread().name}")
}
println("Test: 3 - launch + ${Thread.currentThread().name}")
}

image-20240806100513037

下面我们看一个简单的业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
CoroutineScope(Dispatchers.Main).launch {
val data = withContext(Dispatchers.IO) {
// 网络请求
"data"
}
val processor = withContext(Dispatchers.Default) {
// 处理数据
"processor"
}
// 显示数据
println("$data - $processor")
}

这样看起来,好像还是挺复杂的呀,这就需要接下来讲的自定义挂起函数。

自定义挂起函数

首先,怎么写挂起函数呢?

其实很简单,只需要在普通函数前面加上 suspend 关键字就行了,函数的内容根据你的实际业务逻辑编写。

那么我们什么时候需要用到挂起函数呢?

当我们的函数中用到了挂起函数,我们就需要将该函数声明为挂起函数。声明为了挂起函数,那这个函数就只能在协程里或者其他挂起函数中使用。

1
2
3
suspend fun getRetrofitContributors():List<Contributor>{
return gitHub.contributors("square", "retrofit")
}

挂起函数的性能优势

我们首先思考一个问题,我们怎么把withContext内容抽离出来,下面有2个方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class WithContextActivity : ComponentActivity() {
private lateinit var textView: TextView

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.suspend_layout)
textView = findViewById(R.id.suspendText)

CoroutineScope(Dispatchers.Main).launch {
println("Test: 1 - launch + ${Thread.currentThread().name}")
launch(Dispatchers.IO) {
Thread.sleep(2000) // 模拟耗时,sleep()最准确
//delay(2000)
println("Test: 2 - launch + ${Thread.currentThread().name}")
}

println("Test: 3 - launch + ${Thread.currentThread().name}")
}

CoroutineScope(Dispatchers.Main).launch {
val data = withContext(Dispatchers.IO) {
// 网络请求
"data"
}
val processor = withContext(Dispatchers.Default) {
// 处理数据
"processor - $data"
}
val processor1 = withContext(Dispatchers.Default) {
// 处理数据
processData1(data)
}
val processor2 = processData2(data)
// 显示数据
println("$data - $processor")
}
}

private suspend fun processData2(data: String) {
withContext(Dispatchers.Default) {
// 处理数据
"processor - $data"
}
}

private fun processData1(data: String) = "processor - $data"
}

为什么会出现2种分离方法呢?

是因为withContext()并不是面向具体业务的,而是提供一个下层技术支持,用withContext包住一段业务代码,就能把这段代码放到指定的线程或者线程池去执行。那么,我想把数据处理的功能抽出来,就有2种选择:我可以只把数据处理的代码抽出来;也可以把外面的withContext一起抽出来。一般来说会将withContext一起抽出来,也就是下面这种方法:

1
2
3
4
5
6
private suspend fun processData2(data: String) {
withContext(Dispatchers.Default) {
// 处理数据
"processor - $data"
}
}

如果你在切换上下文的时候,它的CotinuationIntercepter并没有改变,那么它就不会真的去切线程,而是直接保持在原来的线程往下去执行代码,这样就没有额外线程切换成本了。现在看上面的代码就知道它的优势了吧。它限制的代码只能在Default里,这样我就不用在实际调用的时候都去包一层withContext了,也不用去记住业务代码的任务类型以及忘记包了。简言之,协程保证了我们可以百分百确保耗时的工作一定在正确的线程执行。

现在,我们就可以去优化之前的代码了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class WithContextActivity : ComponentActivity() {
private lateinit var textView: TextView

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.suspend_layout)
textView = findViewById(R.id.suspendText)

CoroutineScope(Dispatchers.Main).launch {
val data = getData()
val processor = processData(data)
// 显示数据
println("$data - $processor")
}
}

private suspend fun getData(): String = withContext(Dispatchers.IO) {
// 网络请求
"data"
}

private suspend fun processData(data: String) = withContext(Dispatchers.Default) {
// 处理数据
"processor - $data"
}
}

挂起函数为什么不卡线程

首先Java的线程是不能指定线程的(可以指定线程池),也不能从子线程切换到主线程去执行某个任务,从主线程切换到某个(指定)子线程去执行。

Android工程师可能会说:不对呀,不是可以切换到主线程吗?

客户端项目(Android、swing)是可以切换到主线程去执行指定任务的,因为它们的UI线程是一个无线循环的线程,它其实是将任务扔到任务队列(确切说在Android里面叫消息队列)里面,然后主线程在循环过程中,每一轮都会去检查任务队列,如果有任务就把它们执行了,然后从队列里面移除,这就是所谓的切到主线程的实现方法。之前说过服务器程序是没法切到主线程的,因为服务器程序的主线程不具备这样的无限循环的反复检查和执行任务队列的性质。

Android中有一个HandlerThread继承自Thread,Android允许我们把任务扔给(切到)HandlerThread去执行,这是为什么呢?

因为它是一种特殊的线程,它一运性起来就会无限循环,在循环里不停的等待新任务,一旦有新任务就会去执行,因此它不能被当成普通线程使用,因为它的核心流程是不能被定制的,它是强制无限循环的。

回到协程,当我启动协程,其实就是把这个任务对象扔给ContinuationInterceptor去执行,具体怎么执行要看ContinuationInterceptor的实现。

default

底层依然是回调。其实协程对于挂起和恢复都是抽象出了一套状态机的机制,每次挂起函数调用前和调用后,协程都会做一次状态的切换。

那么挂起函数为什么不卡线程呢?

因为虽然网络请求在后台线程进行,可是这期间主线程在等着它,那为什么主线程没有被卡住呢,其实主线程并没有等着这个网络请求的结束,而是在无限循环,每一轮都会去检查任务队列,如果有任务就把它们执行了,然后从队列里面移除。

轻量级线程:delay()

它是Kotlin官方给协程打的比方。上一节我们介绍挂起函数为什么不卡线程,这节我们讲一下一个典型的不卡线程的挂起函数 delay

我们看看官方给出的例子:

1
2
3
4
5
6
7
8
fun main() = runBlocking {
repeat(50_000) {
launch {
delay(5000L)
print(".")
}
}
}

image-20240806120321835

上面代码虽然开了50000个协程,但它外面包的 runBlocking() ,它会提供一个单线程的ContinuationInterceptor,其实这5万个协程都是运行在同一个线程上,也就是主线程。

1
2
3
4
5
6
7
8
fun main() = runBlocking {
repeat(50_000) {
thread {
Thread.sleep(5000L)
print(".")
}
}
}

image-20240806120411738

使用Thread的时,它就是老老实实开了5万个线程,runBlocking虽然给出的上下文是单线程的,但它只能管住它里面的协程代码,管不住线程代码。

你可能会觉得这不恰恰说明了协程比线程轻量吗?

该代码真正耗时的是等待5秒,打印是瞬时的,所以这是一个延时任务,而不是耗时任务。而传统的线程方案对于延时任务不是用线程的sleep(),而是用一个 schedule() 的方法。

1
2
3
4
5
6
7
fun main() = runBlocking {
val executor = Executors.newSingleThreadScheduledExecutor()
repeat(50_000) {
executor.schedule({ print(".") }, 5, TimeUnit.SECONDS)
}
executor.shutdown()
}

这样就不会溢出了,这才是协程等价的线程代码。

虽然线程代码让高手来写,也能写出像协程一样高性能的效果,但从实用的方面来看,协程随手一写就是高性能的,也是可以看成是比线程轻的。

结构化并发

用协程来写并发代码比线程更好写、也更好读,它面向的是并发任务的写法,而结构化并发面向的是并发任务的管理。

我们知道协程要用CoroutineScope来启动,为什么要用CoroutineScope?

  1. 它提供一堆上下文信息给协程用,比如提供ContinuationInterceptor来做线程管理
  2. 它提供了取消的能力

为什么要取消?因为我们的并发任务有可能在半路,在还没执行完成的时候就变得不被需要了。比如在网络请求还没完成时,用户就把页面关闭了,那这个网络请求以及后面的更新页面操作就没必要了,如果继续执行,就有点浪费资源(甚至造成内存泄漏)。还有个问题,由于界面关闭之后,界面里面的视图组件就从界面组件的结构中移除了,这时候,如果代码继续执行,取到网络数据,并尝试更新到界面的话,就会由于组件已经不存在了而造成程序的崩溃。

按道理来说,Java是有GC(垃圾回收机制)的,但为什么还是会内存泄漏呢?

GC其实就是扫描内存里面的每个对象,看看它有没有被几种类型的引用所执行,哪几种呢?第一种,就是静态变量,也就是用Java的static关键字所修饰的变量,它是永久存活的,GC就不会回收它。第二类是活跃的线程,也就是正在运行的线程。第三类,也就是Native对象,也就是来自JNI(Java Native Interface),因为它来自更下层的位置,它有用没用JVM判断不了。这三类对象都会被GC判定为有用的对象,另外,GC也会传递性地把被这三类对象所指(直接或间接)的对象也判定为有用的。而如果一个对象不符合任何有用的条件,它就会被GC在下一次垃圾回收的扫描里面被标记为没用,再接着就会被回收了。这就是GC自动回收的逻辑。

之前,Android内存泄漏问题通过弱引用来解决。后来,到了RxJava的时代,这个问题被解决得更好了,它实现了用链式调用来进行链式逻辑的写法,它可以使用 .dispose() 把RxJava流程取消,它的取消是把后续流程全都取消,它不仅省了Activity的内存,而且省了线程占用的内存和消耗的CPU。

协程也可以做类似的事情,每一个协程在启动之后都返回了一个 Job 对象,通过 .cancel() 取消协程。

image-20240806213014267
1
2
3
4
5
6
val job = lifecycleScope.launch {
println("Coroutine started")
delay(5000L)
println("Coroutine finished")
}
job.cancel()

image-20240806213350967

当然我们也能用CoroutineScope取消协程,它取消的是CoroutineScope所启动的所有协程:

1
2
3
4
override fun onDestroy() {
super.onDestroy()
lifecycleScope.cancel()
}

事实上lifecycleScope.cancel()也不用写,lifecycleScope是Jetpack提供的一个自带的CoroutineScope,它已经注册了这个绑定了,它会自动在onDestroy()里调用cancel()。

上面的就是结构化并发,我调用lifecycleScope.cancel(),它就会取消所启动的所有协程,而不是简单的一对一,而是一对多,它还是一层对多层的。

image-20240806214419924

大括号里面的this其实就是CoroutineScope,所以我们在里面可以直接写launch{},而不用写前缀,this就是隐式的前缀。这不是重要的点,重点在于这个this并不是外面的这个lifecycleScope的CoroutineScope,但它受外面这个CoroutineScope管理,每个协程大括号里面都有一个CoroutineScope的this,里面的launch()就是由这个this启动的,它们就形成了父子关系。而cancel()函数不仅会取消父协程,而且会连锁性的取消这些协程的子协程,子子协程,等等。

并行协程的启动和交互

挂起函数:串行,不同线程间可以交互(前一个线程结果给后一个线程用)

launch:并行

那我怎么让并行的流程之间进行交互呢?比如:我想同时执行2个请求,在它们都返回后,把它们结果合并,把合并的结果显示到页面上。

1
2
3
4
5
private fun coroutineStyle() = lifecycleScope.launch {
val contributors1 = gitHub.contributors("square", "retrofit")
val contributors2 = gitHub.contributors("square", "retrofit")
showContributors(contributors1 + contributors2)
}

这样写貌似也行,但是它们不是2个并行的请求,而是串行的。也就是说,它是先进行第一个网络请求,然后再进行第二个请求,再合并结果,这样虽然结果不会错,但两个没有依赖关系的请求却做成了串行的,那不是网络耗时就翻倍了吗!!那怎么写呢?

协程给我们的方案是综合起来,先并行,然后转串行,这是就需要另一个开启协程的函数 async 了,async和launch的内部逻辑都是一样的,只不过async的大括号就可以有返回值了。

image-20240807100501735 image-20240807100717871

我们使用 await() 就可以拿到返回值:

1
2
3
4
5
6
7
8
val deferred = lifecycleScope.async {
gitHub.contributors("square", "retrofit") // 返回值
}
lifecycleScope.launch {
val contributors1 = gitHub.contributors("square", "okhttp")
val contributors2 = deferred.await() // 拿到上一个协程返回值
showContributors(contributors1 + contributors2)
}

我们也可以把两个协程代码都写到async里面:

1
2
3
4
5
6
7
8
9
10
11
val deferred1 = lifecycleScope.async {
gitHub.contributors("square", "okhttp")
}
val deferred2 = lifecycleScope.async {
gitHub.contributors("square", "retrofit") // 返回值
}
lifecycleScope.launch {
val contributors1 = deferred1.await()
val contributors2 = deferred2.await() // 拿到上一个协程返回值
showContributors(contributors1 + contributors2)
}

我们甚至可以这样写,为什么可以这样写,我们在结构化并发中已经讲过了:

1
2
3
4
5
lifecycleScope.launch {
val deferred1 = async { gitHub.contributors("square", "okhttp") }
val deferred2 = async { gitHub.contributors("square", "retrofit") }
showContributors(deferred1.await() + deferred2.await())
}

其实,通常我们最好是在它们外面再包一层:

1
2
3
4
5
6
7
8
9
10
11
12
lifecycleScope.launch {
val contributors = coroutineScope {
val deferred1 = async {
gitHub.contributors("square", "okhttp")
}
val deferred2 = async {
gitHub.contributors("square", "retrofit") // 返回值
}
deferred1.await() + deferred2.await()
}
showContributors(contributors) //也可以放里面
}

包上这个coroutineScope之后,对于协程的异常的结构化管理可以提供很大的方便。这个功能传统Java也能实现,通过使用 CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
private val handler = Handler(Looper.getMainLooper())

private fun completableFutureStyleMerge() {
val future1 = gitHub.contributorsFuture("square", "retrofit")
val future2 = gitHub.contributorsFuture("square", "okhttp")
future1.thenCombine(future2) { contributors1, contributors2 ->
contributors1 + contributors2
}.thenAccept { mergedContributors ->
handler.post {
showContributors(mergedContributors)
}
}
}

两个并行流程在顺序上有某种依赖,而不依赖结果,可以直接使用 join() 函数

1
2
3
4
5
6
7
8
lifecycleScope.launch {
val initJob = launch {
// init() 初始化工作
}
val contributors = gitHub.contributors("square", "retrofit")
initJob.join() // 等待 init() 完成,但不关心协程返回值
//processData() 依赖初始化工作
}

连接线程世界:和回调型API合作

现实生活中,就算你用协程了,开始要完全避免跟传统的线程API做交互,其实也不是很容易,因为很多项目是有大量的老代码的,你的老代码没有用协程,大概率还是基于线程的API的,也就是回调的写法,还有某些外部库,它们提供的API也可能是回调的写法。

对于回调的API,协程有一个专门的函数 suspendCoroutine

1
2
3
4
5
lifecycleScope.launch {
suspendCoroutine {
callbackStyle() // 把回调函数转换为挂起函数
}
}

这样回调函数就可以在协程中启动了,但光是启动还不够,我们还需要它在挂起函数里面返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
lifecycleScope.launch {
val contributors = suspendCoroutine {
gitHub.contributorsCall("square", "retrofit")
.enqueue(object : Callback<List<Contributor>> {
override fun onResponse(
call: Call<List<Contributor>>,
response: Response<List<Contributor>>
) {
//showContributors(response.body()!!)
it.resume(response.body()!!)
}
override fun onFailure(p0: Call<List<Contributor>>, t: Throwable) {
it.resumeWithException(t) // 抛出异常
}
})
}
showContributors(contributors)
}

我们也可以把它抽成函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private suspend fun callbackToSuspend() =
suspendCoroutine {
gitHub.contributorsCall("square", "retrofit")
.enqueue(object : Callback<List<Contributor>> {
override fun onResponse(
call: Call<List<Contributor>>,
response: Response<List<Contributor>>
) {
//showContributors(response.body()!!)
it.resume(response.body()!!)
}
override fun onFailure(p0: Call<List<Contributor>>, t: Throwable) {
it.resumeWithException(t) // 抛出异常
}
})
}

我们可以使用try-catch处理异常,也可以让它自然的丢给更外面的协程去结构化地处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class CallbackActivity : ComponentActivity() {
private lateinit var textView: TextView

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.suspend_layout)
textView = findViewById(R.id.suspendText)

lifecycleScope.launch {
try {
val contributors = callbackToSuspend()
showContributors(contributors)
} catch (e: Exception) {
textView.text = e.message
}
}
}

private suspend fun callbackToSuspend() =
suspendCoroutine {
gitHub.contributorsCall("square", "retrofit")
.enqueue(object : Callback<List<Contributor>> {
override fun onResponse(
call: Call<List<Contributor>>,
response: Response<List<Contributor>>
) {
//showContributors(response.body()!!)
it.resume(response.body()!!)
}

override fun onFailure(p0: Call<List<Contributor>>, t: Throwable) {
it.resumeWithException(t) // 抛出异常
}
})
}

private fun showContributors(contributors: List<Contributor>) = contributors
.map { "${it.login} (${it.contributions})" }
.reduce { acc, s -> "$acc\n$s" }
.let { textView.text = it }
}

注意,这样是捕获不到异常的:

1
2
3
4
5
6
7
8
try {
launch {
val contributors = callbackToSuspend()
showContributors(contributors)
}
} catch (e: Exception) {
textView.text = e.message
}

因为在launch启动时,try-catch就已经结束了。

除了suspendCoroutine之外,还有一个叫 suspendCancellableCoroutine ,它跟suspendCoroutine非常像,只不过suspendCoroutine支持取消:

1
2
3
4
5
6
7
8
9
10
11
12
val job = lifecycleScope.launch {
try {
val contributors = callbackToSuspend()
showContributors(contributors)
} catch (e: Exception) {
textView.text = e.message
}
}
lifecycleScope.launch {
delay(200)
job.cancel()
}

运行程序,发现页面还是显示了,说明它不配合协程的取消,那如果换成suspendCancellableCoroutine:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 修改val contributors = callbackToSuspend() 为 val contributors = callbackToCancellableSuspend()

private suspend fun callbackToCancellableSuspend() =
suspendCancellableCoroutine {
gitHub.contributorsCall("square", "retrofit")
.enqueue(object : Callback<List<Contributor>> {
override fun onResponse(
call: Call<List<Contributor>>,
response: Response<List<Contributor>>
) {
//showContributors(response.body()!!)
it.resume(response.body()!!)
}
override fun onFailure(p0: Call<List<Contributor>>, t: Throwable) {
it.resumeWithException(t) // 抛出异常
}
})
}

image-20240807111142890

我们一般都会使用suspendCancellableCoroutine,除非你有什么特殊的需求。

此外suspendCancellableCoroutine还有一点,你可以在它里面注册一个取消的回调,来方便做一些收尾工作:

image-20240807111759648

回到线程世界:runBlocking()

除了之前讲的launch和async,其实协程还有一个自带的启动协程的函数,叫 runBlocking ,这是一个特殊的协程启动函数,特殊在于:

  1. 它不需要CoroutineScope
  2. 它会阻塞线程

为什么会这样呢?

它的定位是把挂起函数转换为阻塞式代码,它虽然可以启动协程,但它的作用并不是启动协程本身,而是把协程的代码块封装起来变成阻塞式的,变成阻塞式的干嘛?

去让传统的、线程写法的API使用。比如:你用到一个库,它可以通过注册回调的方式来在某些事件发生的时候去帮你做指定的事,假设你让它做的事已经通过挂起函数实现了,只有把它封装进协程里面才能在线程的世界去使用它,如果你将它封装在launch或者async里面,它就是并行的,但如果你希望它是串行的,或者是阻塞式的,那launch和async就不好使了,这就是runBlocking存在的价值所在:从协程世界回到阻塞的线程世界。

1
2
3
private fun blockingContributors() = runBlocking {
gitHub.contributors("square", "retrofit") // 这样就变成阻塞式的
}

服务器写法:

1
2
3
fun main() = runBlocking {
val contributors = gitHub.contributors("square", "retrofit")
}

当然你也可以直接添加suspend关键字:

1
2
3
4
suspend fun main() {
val contributors = gitHub.contributors("square", "retrofit")
// launch { } 报错
}

suspend只提供了挂起函数的环境,没有提供一个CoroutineScope类型的隐式receiver,也就是this,也就是你不能在它里面直接启动协程。也不是没有解决方法,你可以套一层coroutineScope:

1
2
3
4
suspend fun main() = coroutineScope<Unit> {
val contributors = gitHub.contributors("square", "retrofit")
launch { }
}

runBlocking还有一个比较常用的地方就是测试代码,有时候我们不想写太复杂的测试代码,就想在一个函数里面把某个挂起函数的逻辑给运行完之后返回,那你可以用runBlocking包一下。

其他语言协程

python

Python的Generator是一个典型的无栈协程的实现。可以在任意Python函数中调用yield来实现当前函数调用的挂起,yield的参数作为对下一次next(gen)调用的返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time


def numbers():
i = 0
while True:
yield (i)
i += 1
time.sleep(1)


num_generator = numbers()
print(f"[0] {next(num_generator)}")
print(f"[1] {next(num_generator)}")

for i in num_generator:
print(f"[Loop] {i}")

image-20240906181439916

Lua

Lua的协程实它提供了几个API,允许开发者灵活控制协程的执行。

  • coroutine.create:创建协程,参数为函数类型,作为协程的执行体,返回协程实例。
  • coroutine.yield:挂起协程,第一个参数为被挂起的协程实例,后面的参数则作为之前外部调用当前协程时对应的resume函数的返回值,而它的返回值则又是外部下一次调用resume时传的参数。
  • coroutine.resume:恢复协程,第一个参数为被继续的协程实例,后面的参数则作为协程内部yield时的返回值,返回值为协程内部下一次yield时传出的参数;如果是第一次对该协程实例执行resume,参数会作为协程体的参数传入。

Lua的协程也有几个状态:创建(CREATED)、挂起(SUSPENDED)、运行(RUNNING)、结束(DEAD)。其中,调用yied之后的协程处于挂起态;获得执行权而正在运行的协程则处于运行态;协程体运行结束后,协程处于结束态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
function producer()
for i = 0, 3 do
print("send"..i)
-- coroutine.yield(i)
yield_here(i)
end
print("End Producer")
end

function yield_here(i) -- 注意在python中使用函数时不行的
coroutine.yield(i)
end

function consumer(value)
repeat
print("receive"..value)
value = coroutine.yield()
until(not value)
print("End Consumer")
end

producerCoroutine = coroutine.create(producer)
consumerCoroutine = coroutine.create(consumer)

repeat
status, product = coroutine.resume(producerCoroutine)
coroutine.resume(consumerCoroutine, product)
until(not status)

image-20240906182848973

协程第一次被resume时,从创建状态转入运行态,后续再次resume则从挂起状态恢复到运行态;而每次调用yield会将自已从运行态转入挂起状态。

协程包括以下部分:

  • 协程的执行体,即我们常提到的协程体,主要是指启动协程时对应的函数。
  • 协程的控制实例,我们可以通过协程创建时返回的实例控制协程的调用流转,我们将该对象的类型称为协程的描述类
  • 协程的状态,在调用流程转移前后,协程的状态会发生相应的变化。

go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"time"
)

func main() {
channel := make(chan int)
var readChannel <- chan int = channel
var writeChannel chan <- int = channel

// reader
go func() {
fmt.Println("wait for read")
for i := range readChannel {
fmt.Println("read",i)
}
fmt.Println("read over")
}()

// writer
go func() {
for i := 0; i < 3; i++ {
fmt.Println("write",i)
writeChannel <- i
time.Sleep(time.Second)
}
close(writeChannel)
}()

time.Sleep(time.Second * 5)
}

image-20240906184440044