1.可以使用flow构建函数构建一个Flow类型返回值的函数 2.flow{}构建体中可以调用挂起函数,即上流 3.上流使用emit函数发射值 4.下流使用collect函数收集值
//上流函数
fun simpleFlow() = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() {
runBlocking {
//下流接收数据
simpleFlow().collect { value ->
println(value)
}
println("finished")
}
}
结果: 1 2 3 finished
fun simpleFlow() = flow {
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() {
runBlocking {
simpleFlow().collect { value ->
println(value)
}
println("collect1 finished")
simpleFlow().collect { value ->
println(value)
}
println("collect2 finished")
}
}
结果: 1 2 3 collect1 finished 1 2 3 collect2 finished
Flow也支持函数式编程,并且从上流到下流的每个过渡操作符都会处理发射值,最终流入下流
fun main() {
runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.filter {
it % 2 == 0 //只取偶数
}.map {
"String $it"
}.collect {
println(it)
}
}
}
结果: String 2 String 4
除了使用flow函数外,还有两种方式 1.flowOf函数 2.使用.asFlow()扩展函数,可以将各种集合与序列转为流
fun main() {
runBlocking {
val startTime = System.currentTimeMillis()
flowOf(3, 5, 7)
.onEach { delay(100) }
.collect {
println("${System.currentTimeMillis() - startTime}ms $it")
}
(3..6).asFlow().collect { println(it) }
}
}
结果: 131ms 3 239ms 5 350ms 7 3 4 5 6
fun main() {
runBlocking {
flow {
println("flow :${Thread.currentThread().name}")
for (i in 1..5) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.Default)
.collect {
println("collect:${Thread.currentThread().name} $it")
}
}
}
结果: flow :DefaultDispatcher-worker-1 collect:main 1 collect:main 2 collect:main 3 collect:main 4 collect:main 5
下流还是会使用主协程的上下文
fun main() {
runBlocking {
flow {
println("flow :${Thread.currentThread().name}")
for (i in 1..5) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.Default)
.onEach { println("collect:${Thread.currentThread().name} $it") }
.launchIn(CoroutineScope(Dispatchers.IO))
.join()//主线程等待这个协程执行结束
}
}
结果: flow :DefaultDispatcher-worker-1 collect:DefaultDispatcher-worker-1 1 collect:DefaultDispatcher-worker-1 2 collect:DefaultDispatcher-worker-1 3 collect:DefaultDispatcher-worker-1 4 collect:DefaultDispatcher-worker-1 5
Flow的取消和协程的取消相同,流的收集是CPU密集型的,但是如果收集时有挂起函数,那么挂起函数可以抛出取消异常来中断执行 使用了新协程的情况,可以使用cancel:
fun main() {
runBlocking {
val flow = flow {
println("flow :${Thread.currentThread().name}")
for (i in 1..5) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.Default)
.onEach { println("collect:${Thread.currentThread().name} $it") }
.launchIn(CoroutineScope(Dispatchers.IO))
delay(200)
flow.cancel()
flow.join()
}
}
使用timeout:
fun main() {
runBlocking {
withTimeoutOrNull(300){
flow {
println("flow :${Thread.currentThread().name}")
for (i in 1..5) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.Default)
.collect { println("collect:${Thread.currentThread().name} $it") }
}
println("finished")
}
}
之前我们调用子协程的取消时,CPU密集型代码并不能结束运行,在不使用挂起函数的情况下,我们在子协程体中通过ensureActive函数来检测该协程是否被取消了 1.而Flow为了方便,Flow构建器会对每个发射值(emit函数)执行ensureActive函数来进行取消
fun main() {
runBlocking {
flow {
for (i in 1..5) {
emit(i)
}
}
.collect {
println("$it")
if (it > 2)
cancel()
}
println("finished")
}
}
结果: 1 2 3 Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@2833cc44
2.出于性能考虑,大多数其他流操作不会执行检测,此时我们可以使用cancellable函数来指定该Flow是可以取消的
fun main() {
runBlocking {
val flow = flowOf(1, 2, 3, 5)
.cancellable()//不指定,那么将不执行取消检测
.collect {
println("$it")
if (it > 2)
cancel()
}
println("finished")
}
}
结果: 1 2 3 Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@7c29daf3
上流每次发射耗时1s,下流接收耗时3s,那么它们总共会耗时多久
fun main() {
runBlocking {
val flow = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
val time = measureTimeMillis {
flow
.collect {
delay(3000)
println("$it")
}
}
println("time : $time ms")
}
}
结果: 1 2 3 time : 12073 ms 可以看出,一般情况下,上下流执行是同步的
1.使用buff,来让上流不等待下流接收,而是发射到缓存区
fun main() {
runBlocking {
val flow = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
val time = measureTimeMillis {
flow.buffer(50)//指定缓存区大小为50个
.collect {
delay(3000)
println("$it")
}
}
println("time : $time ms")
}
}
结果: 1 2 3 time : 10158 ms 时间是1s + 3s * 3
2.指定上流协程
fun main() {
runBlocking {
val flow = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
val time = measureTimeMillis {
flow.flowOn(Dispatchers.IO)
.collect {
delay(3000)
println("$it")
}
}
println("time : $time ms")
}
}
结果和1.是一样的
3.有时我们不需要一个不漏的接收上流的元素时,可以使用conflate,下流来不及处理的会被丢弃掉
fun main() {
runBlocking {
val flow = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
val time = measureTimeMillis {
flow.conflate()
.collect {
delay(3000)
println("$it")
}
}
println("time : $time ms")
}
}
结果: 1 3 time : 7124 ms
4.collectLast可以只接收上流发射的最后一个元素
fun main() {
runBlocking {
val flow = flow {
for (i in 1..3) {
delay(1000)
emit(i)
}
}
val time = measureTimeMillis {
flow
.collectLatest {
delay(3000)
println("$it")
}
}
println("time : $time ms")
}
}
3 time : 6144 ms
上面我们也提到了Flow支持函数式编程,用法和之前学习的差不多
1.map函数
fun main() {
runBlocking {
flow {
for (i in 1..3) {
emit(i)
}
}.map {
"String $it"
}.collect {
println(it)
}
}
}
结果: String 1 String 2 String 3
2.transform函数,还可以将上流的一个变为多个发射出去
fun main() {
runBlocking {
flow {
for (i in 1..3) {
emit(i)
}
}.transform {
emit("String1 $it")
emit("String2 $it")
}.collect {
println(it)
}
}
}
结果: String1 1 String2 1 String1 2 String2 2 String1 3 String2 3
take函数
fun main() {
runBlocking {
flow {
for (i in 1..3) {
emit(i)
}
}.take(2).collect {
println(it)
}
}
}
结果: 1 2
末端操作符是用于启动流的挂起函数,collect是最基础的末端操作符,除此以外还有其他的 1.转化为各种集合,如:toList或toSet 2.获取第一个元素(first)与确保流只发射一个元素(single) 3.flod与reduce将流整合到一个值
flod函数
fun main() {
runBlocking {
val value = flow {
for (i in 1..3) {
emit(i)
}
}.map {
it * it
}.fold(0) { acc, value ->
acc + value
}
print(value)
}
}
结果: 14
zip函数
fun main() {
runBlocking {
val flow1 = flow {
for (i in 1..3) {
emit(i)
}
}
val flow2 = flowOf("one", "two", "three")
flow1.zip(flow2) { a, b ->
"$a -> $b"
}.collect { value ->
println(value)
}
}
}
结果: 1 -> one 2 -> two 3 -> three
类似于集合的集合,流里也有可能有流,那么这个时候我们就需要使用展平操作符了 1.flatMapConcat
fun main() {
runBlocking {
val startTime = System.currentTimeMillis()
flow {
for (i in 1..3) {
emit(i)
}
}.flatMapConcat {
flow {
emit("first $it")
delay(500)
emit("second $it")
}
}.collect {
println("${System.currentTimeMillis() - startTime}ms $it")
}
}
}
结果: 52ms first 1 570ms second 1 571ms first 2 1074ms second 2 1074ms first 3 1577ms second 3
2.flatMapMerge 和flatMapConcat不同,flatMapConcat是按流函数体中顺序执行,而flatMapMerge中遇到发射函数时,会一次性执行上流的所有发射
fun main() {
runBlocking {
val startTime = System.currentTimeMillis()
flow {
for (i in 1..3) {
emit(i)
}
}.flatMapMerge {
flow {
emit("first $it")
delay(500)
emit("second $it")
}
}.collect {
println("${System.currentTimeMillis() - startTime}ms $it")
}
}
}
结果: 130ms first 1 130ms first 2 131ms first 3 632ms second 1 632ms second 2 632ms second 3
3.flatMapLatest flatMapLatest中遇到第二个发射函数时,只会发射上流最后一次的元素
fun main() {
runBlocking {
val startTime = System.currentTimeMillis()
flow {
for (i in 1..3) {
emit(i)
}
}.flatMapLatest {
flow {
emit("first $it")
delay(500)
emit("second $it")
}
}.collect {
println("${System.currentTimeMillis() - startTime}ms $it")
}
}
}
结果: 298ms first 1 300ms first 2 301ms first 3 806ms second 3
当运算符中的发射器或代码抛出异常,可以有两种方式处理 1.try catch 2.catch函数
fun main() {
runBlocking {
val flow = flow {
for (i in 1..3) {
emit(i)
}
}
try {
flow.collect {
println(it)
throw RuntimeException()
}
} catch (e: Exception) {
print("caught: $e")
}
}
}
fun main() {
runBlocking {
val flow = flow {
for (i in 1..3) {
emit(i)
throw RuntimeException()
}
}.catch { e ->
print("caught1: $e")
}.collect {
println(it)
}
}
}
有时候我们需要在Flow完成时,做一些其他事情,可以使用下面的方式
1.finally块
fun main() {
runBlocking {
try{
val flow = flow {
for (i in 1..3) {
emit(i)
}
}.collect {
println(it)
}
}finally {
println("done")
}
}
}
2.onCompletion函数
fun main() {
runBlocking {
val flow = flow {
for (i in 1..3) {
emit(i)
}
}.onCompletion {
println("done")
}.collect {
println(it)
}
}
}