前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kotlin--Flow的运用

kotlin--Flow的运用

作者头像
aruba
发布2021-12-06 17:22:31
6610
发布2021-12-06 17:22:31
举报
文章被收录于专栏:android技术
Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。
一、Flow的使用
1.Flow的创建

1.可以使用flow构建函数构建一个Flow类型返回值的函数 2.flow{}构建体中可以调用挂起函数,即上流 3.上流使用emit函数发射值 4.下流使用collect函数收集值

代码语言:javascript
复制
//上流函数
fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() {
    runBlocking {
        //下流接收数据
        simpleFlow().collect { value ->
            println(value)
        }

        println("finished")
    }
}

结果: 1 2 3 finished

2.Flow是冷流,所以collect是挂起函数,不是子协程,并且只有执行collect函数时,上流的代码才会被执行,所以在一个协程中多次调用collect,它们会按顺序执行
代码语言:javascript
复制
fun simpleFlow() = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow().collect { value ->
            println(value)
        }

        println("collect1 finished")

        simpleFlow().collect { value ->
            println(value)
        }

        println("collect2 finished")
    }
}

结果: 1 2 3 collect1 finished 1 2 3 collect2 finished

3.Flow的连续性

Flow也支持函数式编程,并且从上流到下流的每个过渡操作符都会处理发射值,最终流入下流

代码语言:javascript
复制
fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.filter {
            it % 2 == 0 //只取偶数
        }.map {
            "String $it"
        }.collect {
            println(it)
        }
    }
}

结果: String 2 String 4

4.Flow构建器

除了使用flow函数外,还有两种方式 1.flowOf函数 2.使用.asFlow()扩展函数,可以将各种集合与序列转为流

代码语言:javascript
复制
fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()

        flowOf(3, 5, 7)
            .onEach { delay(100) }
            .collect {
                println("${System.currentTimeMillis() - startTime}ms $it")
            }

        (3..6).asFlow().collect { println(it) }
    }
}

结果: 131ms 3 239ms 5 350ms 7 3 4 5 6

5.collect为挂起函数,但是Flow也提供了flowOn函数方便我们指定上流是否使用子协程执行
代码语言:javascript
复制
fun main() {
    runBlocking {
        flow {
            println("flow :${Thread.currentThread().name}")
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.Default)
            .collect {
                println("collect:${Thread.currentThread().name} $it")
            }
    }
}

结果: flow :DefaultDispatcher-worker-1 collect:main 1 collect:main 2 collect:main 3 collect:main 4 collect:main 5

下流还是会使用主协程的上下文

6.除了使用子协程执行上流外,我们还可以使用launchIn函数来让Flow使用全新的协程上下文
代码语言:javascript
复制
fun main() {
    runBlocking {
        flow {
            println("flow :${Thread.currentThread().name}")
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.Default)
            .onEach { println("collect:${Thread.currentThread().name} $it") }
            .launchIn(CoroutineScope(Dispatchers.IO))
            .join()//主线程等待这个协程执行结束
    }
}

结果: flow :DefaultDispatcher-worker-1 collect:DefaultDispatcher-worker-1 1 collect:DefaultDispatcher-worker-1 2 collect:DefaultDispatcher-worker-1 3 collect:DefaultDispatcher-worker-1 4 collect:DefaultDispatcher-worker-1 5

7.Flow的取消

Flow的取消和协程的取消相同,流的收集是CPU密集型的,但是如果收集时有挂起函数,那么挂起函数可以抛出取消异常来中断执行 使用了新协程的情况,可以使用cancel:

代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flow {
            println("flow :${Thread.currentThread().name}")
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.Default)
            .onEach { println("collect:${Thread.currentThread().name} $it") }
            .launchIn(CoroutineScope(Dispatchers.IO))

        delay(200)
        flow.cancel()
        flow.join()
    }
}

使用timeout:

代码语言:javascript
复制
fun main() {
    runBlocking {
        withTimeoutOrNull(300){
            flow {
                println("flow :${Thread.currentThread().name}")
                for (i in 1..5) {
                    delay(100)
                    emit(i)
                }
            }.flowOn(Dispatchers.Default)
                .collect { println("collect:${Thread.currentThread().name} $it") }
        }
        
        println("finished")
    }
}
8.Flow的取消检测

之前我们调用子协程的取消时,CPU密集型代码并不能结束运行,在不使用挂起函数的情况下,我们在子协程体中通过ensureActive函数来检测该协程是否被取消了 1.而Flow为了方便,Flow构建器会对每个发射值(emit函数)执行ensureActive函数来进行取消

代码语言:javascript
复制
fun main() {
    runBlocking {
        flow {
            for (i in 1..5) {
                emit(i)
            }
        }
            .collect {
                println("$it")
                if (it > 2)
                    cancel()
            }

        println("finished")
    }
}

结果: 1 2 3 Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@2833cc44

2.出于性能考虑,大多数其他流操作不会执行检测,此时我们可以使用cancellable函数来指定该Flow是可以取消的

代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flowOf(1, 2, 3, 5)
            .cancellable()//不指定,那么将不执行取消检测
            .collect {
                println("$it")
                if (it > 2)
                    cancel()
            }

        println("finished")
    }
}

结果: 1 2 3 Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@7c29daf3

9.背压

上流每次发射耗时1s,下流接收耗时3s,那么它们总共会耗时多久

代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow
                .collect {
                delay(3000)
                println("$it")
            }
        }

        println("time : $time ms")
    }
}

结果: 1 2 3 time : 12073 ms 可以看出,一般情况下,上下流执行是同步的

1.使用buff,来让上流不等待下流接收,而是发射到缓存区

代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow.buffer(50)//指定缓存区大小为50个
                .collect {
                delay(3000)
                println("$it")
            }
        }

        println("time : $time ms")
    }
}

结果: 1 2 3 time : 10158 ms 时间是1s + 3s * 3

2.指定上流协程

代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow.flowOn(Dispatchers.IO)
                .collect {
                    delay(3000)
                    println("$it")
                }
        }

        println("time : $time ms")
    }
}

结果和1.是一样的

3.有时我们不需要一个不漏的接收上流的元素时,可以使用conflate,下流来不及处理的会被丢弃掉

代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow.conflate()
                .collect {
                    delay(3000)
                    println("$it")
                }
        }

        println("time : $time ms")
    }
}

结果: 1 3 time : 7124 ms

4.collectLast可以只接收上流发射的最后一个元素

代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                delay(1000)
                emit(i)
            }
        }

        val time = measureTimeMillis {
            flow
                .collectLatest {
                    delay(3000)
                    println("$it")
                }
        }

        println("time : $time ms")
    }
}

3 time : 6144 ms

二、操作符

上面我们也提到了Flow支持函数式编程,用法和之前学习的差不多

1.转换操作符

1.map函数

代码语言:javascript
复制
fun main() {
    runBlocking {
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.map {
            "String $it"
        }.collect {
            println(it)
        }
    }
}

结果: String 1 String 2 String 3

2.transform函数,还可以将上流的一个变为多个发射出去

代码语言:javascript
复制
fun main() {
    runBlocking {
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.transform {
            emit("String1 $it")
            emit("String2 $it")
        }.collect {
            println(it)
        }
    }
}

结果: String1 1 String2 1 String1 2 String2 2 String1 3 String2 3

2.限长操作符

take函数

代码语言:javascript
复制
fun main() {
    runBlocking {
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.take(2).collect {
            println(it)
        }
    }
}

结果: 1 2

3.末端操作符

末端操作符是用于启动流的挂起函数,collect是最基础的末端操作符,除此以外还有其他的 1.转化为各种集合,如:toList或toSet 2.获取第一个元素(first)与确保流只发射一个元素(single) 3.flod与reduce将流整合到一个值

flod函数

代码语言:javascript
复制
fun main() {
    runBlocking {
       val value =  flow {
            for (i in 1..3) {
                emit(i)
            }
        }.map {
            it * it
        }.fold(0) { acc, value ->
            acc + value
        }
        
        print(value)
    }
}

结果: 14

4.组合操作符

zip函数

代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow1 = flow {
            for (i in 1..3) {
                emit(i)
            }
        }

        val flow2 = flowOf("one", "two", "three")

        flow1.zip(flow2) { a, b ->
            "$a -> $b"
        }.collect { value ->
            println(value)
        }
    }
}

结果: 1 -> one 2 -> two 3 -> three

5.展平操作符

类似于集合的集合,流里也有可能有流,那么这个时候我们就需要使用展平操作符了 1.flatMapConcat

代码语言:javascript
复制
fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.flatMapConcat {
            flow {
                emit("first $it")
                delay(500)
                emit("second $it")
            }
        }.collect {
            println("${System.currentTimeMillis() - startTime}ms  $it")
        }
    }
}

结果: 52ms first 1 570ms second 1 571ms first 2 1074ms second 2 1074ms first 3 1577ms second 3

2.flatMapMerge 和flatMapConcat不同,flatMapConcat是按流函数体中顺序执行,而flatMapMerge中遇到发射函数时,会一次性执行上流的所有发射

代码语言:javascript
复制
fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.flatMapMerge {
            flow {
                emit("first $it")
                delay(500)
                emit("second $it")
            }
        }.collect {
            println("${System.currentTimeMillis() - startTime}ms  $it")
        }
    }
}

结果: 130ms first 1 130ms first 2 131ms first 3 632ms second 1 632ms second 2 632ms second 3

3.flatMapLatest flatMapLatest中遇到第二个发射函数时,只会发射上流最后一次的元素

代码语言:javascript
复制
fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        flow {
            for (i in 1..3) {
                emit(i)
            }
        }.flatMapLatest {
            flow {
                emit("first $it")
                delay(500)
                emit("second $it")
            }
        }.collect {
            println("${System.currentTimeMillis() - startTime}ms  $it")
        }
    }
}

结果: 298ms first 1 300ms first 2 301ms first 3 806ms second 3

三、Flow的异常处理

当运算符中的发射器或代码抛出异常,可以有两种方式处理 1.try catch 2.catch函数

1.try catch适用于收集时发生的异常
代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }

        try {
            flow.collect {
                println(it)
                throw RuntimeException()
            }
        } catch (e: Exception) {
            print("caught: $e")
        }
    }
}
2.虽然上流也可以使用try catch,但是更推荐catch函数
代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
                throw RuntimeException()
            }
        }.catch { e ->
            print("caught1: $e")
        }.collect {
            println(it)
        }
    }
}
四、Flow的完成

有时候我们需要在Flow完成时,做一些其他事情,可以使用下面的方式

1.finally块

代码语言:javascript
复制
fun main() {
    runBlocking {
        try{
            val flow = flow {
                for (i in 1..3) {
                    emit(i)
                }
            }.collect {
                println(it)
            }
        }finally {
            println("done")            
        }
    }
}

2.onCompletion函数

代码语言:javascript
复制
fun main() {
    runBlocking {
        val flow = flow {
            for (i in 1..3) {
                emit(i)
            }
        }.onCompletion {
            println("done")
        }.collect {
            println(it)
        }
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/9/3 下午,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flow是kotlin提供的一个工具,使用协程封装成生产者-消费者模式,上流来负责生产,下流来接收消耗。
  • 一、Flow的使用
    • 1.Flow的创建
      • 2.Flow是冷流,所以collect是挂起函数,不是子协程,并且只有执行collect函数时,上流的代码才会被执行,所以在一个协程中多次调用collect,它们会按顺序执行
        • 3.Flow的连续性
          • 4.Flow构建器
            • 5.collect为挂起函数,但是Flow也提供了flowOn函数方便我们指定上流是否使用子协程执行
              • 6.除了使用子协程执行上流外,我们还可以使用launchIn函数来让Flow使用全新的协程上下文
                • 7.Flow的取消
                  • 8.Flow的取消检测
                    • 9.背压
                    • 二、操作符
                      • 1.转换操作符
                        • 2.限长操作符
                          • 3.末端操作符
                            • 4.组合操作符
                              • 5.展平操作符
                              • 三、Flow的异常处理
                                • 1.try catch适用于收集时发生的异常
                                  • 2.虽然上流也可以使用try catch,但是更推荐catch函数
                                  • 四、Flow的完成
                                  领券
                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档