Kotlin-Flow

在学习Kotlin Flow之前,我们先举一个例子,循环调用10次网络请求,获取的结果平方,然后过滤掉奇数,最后取前2个,然后打印。

请先在 build.gradle.kts 中添加协程依赖项

1
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.random.Random

fun main(): Unit = runBlocking {
// notUseFlow()
useFlow()
}

suspend fun useFlow() {
val flow = flow {
repeat(10) {
emit(request())
}
}

flow.map { it * it }
.filter { it % 2 == 0 }
.take(2)
.onEach { println(it) }
.collect()
}

suspend fun notUseFlow() {
var count = 0
repeat(10) {
var res = request().let { it * it }
if (count == 2) {
return
}
if (res % 2 != 0) return@repeat
++count
println(res)
}
}

suspend fun notUseFlow2() {
val resList = (0 until 10).map { request() }
resList.map { it * it }
.filter { it % 2 == 0 }
.take(2)
.onEach { println(it) }
}

//模拟网络请求
suspend fun request(): Int {
delay(Random.nextLong(1000))
return Random.nextInt(100)
}

其中 notUseFlownotUseFlow2 两个函数是不使用 Flow 的方式,useFlow 是使用 Flow 的方式。
notUseFlow 是一种常规写法,但是代码较冗余,不易阅读,简单场景可以使用。notUseFlow2 是采用链式操作,代码简单,易读,但是因为 request 是一个阻塞的方法,会阻塞在 .map 操作,导致代码时间消耗高。而 useFlow 是采用 Flow 的方式,代码简单,可读性强,不会阻塞。

冷流

首先我们看一下创建流的几种方式:

1
2
3
4
5
fun main(): Unit = runBlocking {
(1..10).asFlow()
listOf(1, 2, 3).asFlow()
flowOf(1, 2, 3)
}

asFlow , flowOf都是通过flow高阶函数进行创建的

image-20240727184933188

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main(): Unit = runBlocking {
//生产者 流的上游
val flow = flow {
repeat(10) {
emit(it)
}
}
//消费者
flow.map { it * it }
.filter { it % 2 == 0 }
.take(2)
.onEach { println(it) }
.collect()
}

其中 emit().collect() 为suspend挂起函数,需要在协程中执行, emit() 为生产者,collect() 为消费者。
Flow的扩展函数 .map().filter() 是实现的transform高阶函数,实际上,我们的流操作可以通过transform来实现。

image-20240727184658042

下面代码和上面代码实现的功能完全一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main(): Unit = runBlocking {
//生产者 流的上游
val flow = flow {
repeat(10) {
emit(it)
}
}
//消费者
flow.transform { num ->
val newNum = num * num
if (newNum % 2 == 0) {
emit(newNum)
}
}.take(2)
.onEach { println(it) }
.collect()
}

热流

SharedFlow(共享数据流)

它是通过 MutableSharedFlow 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

fun main(): Unit = runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
repeat(10) {
sharedFlow.emit(it)
}
sharedFlow.onEach {
println(it)
}.collect()
println("end")
}

运行程序,我们发现它没有任何输出,没有输出就对了,因为它是自己生产自己的,不管消费者是否订阅。而且不会打印 end 信息,因为消费者不知道流是什么时候结束的,所以它会一直订阅,因为它是挂起的,所以会阻塞这一行。这是我们会使用 .launchIn(this) 把它开到新的协程作用域里面,此时它就不会阻塞我们了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

fun main(): Unit = runBlocking {
val sharedFlow = MutableSharedFlow<Int>()
repeat(10) {
sharedFlow.emit(it)
}
sharedFlow.onEach {
println(it)
}.launchIn(this)
println("end")
}

image-20240727190520907

MutableSharedFlow函数的参数

  1. replay
1
2
3
4
5
6
7
8
9
10
11
12
fun main(): Unit = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(
replay = 3, //把最近期的,我们设置的重播缓冲区的大小的那几个值存储起来,如果有订阅者就直接发送给它
)
repeat(10) {
sharedFlow.emit(it)
}
sharedFlow.onEach {
println(it)
}.launchIn(this)
println("end")
}

image-20240727191015461

  1. extraBufferCapacity
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun main(): Unit = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(
extraBufferCapacity = 1, //缓冲区的大小
)
sharedFlow.onEach {
println("收到数据$it")
delay(1000)
}.launchIn(this)
delay(1000) // 休眠,避免开协程作用域没有emit快
repeat(10) {
println("发送次数${it + 1}")
sharedFlow.emit(it)
}
println("end")
}
image-20240727191732415

注意replay和extraBufferCapacity一起使用,缓冲区容量等于它们之和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun main(): Unit = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(
replay = 3, //把最近期的,我们设置的重播缓冲区的大小的那几个值存储起来,如果有订阅者就直接发送给它
extraBufferCapacity = 1, //缓冲区的大小
)
sharedFlow.onEach {
println("收到数据$it")
delay(1000)
}.launchIn(this)
delay(1000) // 休眠,避免开协程作用域没有emit快
repeat(10) {
println("发送次数${it + 1}")
sharedFlow.emit(it)
}
println("end")
}
image-20240727192149107

从MutableSharedFlow源码中也可以看出:

image-20240727192010037

  1. onBufferOverflow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun main(): Unit = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(
replay = 3,
extraBufferCapacity = 1,
// 缓冲区(队列)溢出操作
// SUSPEND :如果缓冲区满了,就挂起
// DROP_OLDEST :如果缓冲区满了,就丢掉最老的那个
// DROP_LATEST :如果缓冲区满了,就丢掉最新的那个
onBufferOverflow = BufferOverflow.DROP_LATEST
)
sharedFlow.onEach {
delay(1000)
println(it)
}.launchIn(this)
delay(1000) // 休眠,避免开协程作用域没有emit快
repeat(10) {
sharedFlow.emit(it)
}
println("end")
}
DROP_OLDEST DROP_LATEST

StateFlow(状态数据流)

这是一个常用到的东西,跟Android中的 LiveData 是一样的效果,主要是对某一个状态进行响应式监听和改变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main(): Unit = runBlocking {
val stateFlow = MutableStateFlow(1) // 只会发一个值,并不会发很多值
stateFlow.emit(2)
launch { // 避免阻塞
stateFlow.collect {
println(it)
}
}
stateFlow.emit(3)
}
image-20240727201552379

接下来写一个Android的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.random.Random

fun main(): Unit = runBlocking {
val viewModel = ViewModel()
launch {
viewModel.age.collect {
println(it)
}
}
launch {
while (isActive) {
viewModel.updateAge()
}
}
}

class ViewModel {
private val _age = MutableStateFlow(0)
val age = _age.asStateFlow() //改为不可更新,只能监听的状态
suspend fun updateAge() {
_age.emit(ageRequest())
}
}

suspend fun ageRequest(): Int {
delay(Random.nextLong(1000))
return Random.nextInt(20, 60)
}
image-20240727203013255

这里将 _age 设计为私有的,而只是把只能监听的状态 age 暴露出去,是因为在架构设计中,viewmodel本身就是为了把数据和页面分离开,页面只管监听数据的改变进行更新就行了。