Flow
Flow的出现是为了解决挂起函数只能返回一个返回值的问题,Flow支持返回一个”流”,可以理解为可以返回多个异步处理的结果。
创建一个Flow
创建flow的方式在Builders.kt
中,主要有以下:
- 最基本的创建方式:
1 2 3 4 5
val mFlow = flow { emit(1) emit(2) emit(3) }
- asFlow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
/**
* Creates a _cold_ flow that produces a single value from the given functional type.
*
* Example of usage:
*
* ```
* suspend fun remoteCall(): R = ...
* fun remoteCallFlow(): Flow<R> = ::remoteCall.asFlow()
* ```
*/
@FlowPreview
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
/**
* Creates a _cold_ flow that produces values from the given iterable.
*/
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a _cold_ flow that produces values from the given iterator.
*/
public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a _cold_ flow that produces values from the given sequence.
*/
public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
public fun <T> Array<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a _cold_ flow that produces values from the array.
* The flow being _cold_ means that the array components are read every time a terminal operator is applied
* to the resulting flow.
*/
public fun IntArray.asFlow(): Flow<Int> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a _cold_ flow that produces values from the given array.
* The flow being _cold_ means that the array components are read every time a terminal operator is applied
* to the resulting flow.
*/
public fun LongArray.asFlow(): Flow<Long> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a flow that produces values from the range.
*/
public fun IntRange.asFlow(): Flow<Int> = flow {
forEach { value ->
emit(value)
}
}
/**
* Creates a flow that produces values from the range.
*/
public fun LongRange.asFlow(): Flow<Long> = flow {
forEach { value ->
emit(value)
}
}
如:
1
2
3
4
val mFlow1 = (1..3).asFlow()
val mFlow2 = arrayOf(1, 2, 3).asFlow()
val mFlow3 = sequenceOf(1, 2, 3).asFlow()
//...
- flowOf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Creates a flow that produces values from the specified `vararg`-arguments.
*
* Example of usage:
*
* ```
* flowOf(1, 2, 3)
* ```
*/
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
/**
* Creates a flow that produces the given [value].
*/
public fun <T> flowOf(value: T): Flow<T> = flow {
/*
* Implementation note: this is just an "optimized" overload of flowOf(vararg)
* which significantly reduces the footprint of widespread single-value flows.
*/
emit(value)
}
如:
1
2
val mFlow1 = flowOf(1,2,3)
val mFlow2 = flowOf(1)
主要操作符简介:
- emit
- collect
- transform
- filter
- map
- take
- drop
- reduce
- buffer
- conflate
- collectLatest
- zip
- combine
- oneach
- flowOn
- catch
- completion
emit、collect
“发射”、“收集”,规律如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val f = flow {
(1..3).forEach {
emit(it)
"emit $it".log()
}
}
runBlocking {
f.collect { value ->
"collect $value".log()
}
}
//collect 1
//emit 1
//collect 2
//emit 2
//collect 3
//emit 3
如果flow只有一个元素的时候,可以使用
single()
直接获取这个元素,但是如果flow为空,则会抛出NoSuchElementException("Flow is empty")
,再或者flow元素不止一个的时候,会抛出IllegalArgumentException("Flow has more than one element")
源码:
1
2
3
4
5
6
7
8
9
10
public suspend fun <T> Flow<T>.single(): T {
var result: Any? = NULL
collect { value ->
require(result === NULL) { "Flow has more than one element" }
result = value
}
if (result === NULL) throw NoSuchElementException("Flow is empty")
return result as T
}
类似的还有singleOrNull
、first
、firstOrNull
。
如:
1
2
3
4
5
6
7
8
9
10
11
12
13
val f = flowOf(1, 2, 3)
runBlocking {
f.first().toString().log()
}
//1
val f = flowOf(1, 2, 3)
runBlocking {
f.first {
it > 2
}.toString().log()
}
//3
transform(转换)
源码:
1
2
3
4
5
6
7
8
public inline fun <T, R> Flow<T>.transform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
}
例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val f = flow {
(1..3).forEach {
"emit $it".log()
emit(it)
}
}
runBlocking {
f.transform { originValue->
emit("transform-$originValue")
emit("ok")
}.collect { value ->
"collect $value".log()
}
}
//emit 1
//collect transform-1
//collect ok
//emit 2
//collect transform-2
//collect ok
//emit 3
//collect transform-3
//collect ok
filter(过滤)
内部实现是transform
,根据filter
的表达式过滤,并生成一个新的flow
。
源码:
1
2
3
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}
例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val f = flow {
(1..3).forEach {
"emit $it".log()
emit(it)
}
}
runBlocking {
f.filter {
it > 1
}.collect { value ->
"collect $value".log()
}
}
//emit 1
//collect 2
//emit 2
//collect 3
//emit 3
还有与之对应的filterNot
、filterIsInstance
、filterNotNull
。
map(转换)
实现内部是transform
,与transform
不同的是map
的参数不需要返回一个FlowCollector
,只是干预emit
发出来的值。
源码:
1
2
3
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val f = flow {
(1..3).forEach {
"emit $it".log()
emit(it)
}
}
runBlocking {
f.map { originValue->
"transform-$originValue"
}.collect { value ->
"collect $value".log()
}
}
//emit 1
//collect transform-1
//emit 2
//collect transform-2
//emit 3
//collect transform-3
此外还有mapNotNull
。
take
从开始位置到需要take
数量的为止,返回一个新的flow
当collect到take
数量之后,原始的flow
会被取消。
源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public fun <T> Flow<T>.take(count: Int): Flow<T> {
require(count > 0) { "Requested element count $count should be positive" }
return flow {
var consumed = 0
try {
collect { value ->
// Note: this for take is not written via collectWhile on purpose.
// It checks condition first and then makes a tail-call to either emit or emitAbort.
// This way normal execution does not require a state machine, only a termination (emitAbort).
// See "TakeBenchmark" for comparision of different approaches.
if (++consumed < count) {
return@collect emit(value)
} else {
return@collect emitAbort(value)
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this)
}
}
}
private suspend fun <T> FlowCollector<T>.emitAbort(value: T) {
emit(value)
throw AbortFlowException(this)
}
例:
1
2
3
4
5
6
7
8
val f = (1..3).asFlow()
runBlocking {
f.take(1)
.collect { value ->
"collect $value".log()
}
}
//collect 1
此外还有takeWhile
。
drop
顾名思义,从drop
的的位置开始,返回一个新的flow
。
源码:
1
2
3
4
5
6
7
8
9
public fun <T> Flow<T>.drop(count: Int): Flow<T> {
require(count >= 0) { "Drop count should be non-negative, but had $count" }
return flow {
var skipped = 0
collect { value ->
if (skipped >= count) emit(value) else ++skipped
}
}
}
例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val f = flow {
(1..3).forEach {
"emit $it".log()
emit(it)
}
}
runBlocking {
f.drop(2).collect { value ->
"collect $value".log()
}
}
//emit 1
//emit 2
//emit 3
//collect 3
此外还有dropWhile
。
reduce
根据操作符累计值。
源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
var accumulator: Any? = NULL
collect { value ->
accumulator = if (accumulator !== NULL) {
@Suppress("UNCHECKED_CAST")
operation(accumulator as S, value)
} else {
value
}
}
if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
@Suppress("UNCHECKED_CAST")
return accumulator as S
}
例:
1
2
3
4
5
6
7
8
9
10
11
runBlocking {
val result = flowOf(1, 2, 3)
.reduce { accumulator, value ->
"accumulator $accumulator,value $value".log()
accumulator + value
}
"result = $result".log()
}
//accumulator 1,value 2
//accumulator 3,value 3
//result = 6
1
2
3
4
5
6
7
8
runBlocking {
val result = flowOf(1, 2, 3)
.reduce { accumulator, value ->
accumulator - value
}
"result = $result".log()
}
//result = -4
类似的还有fold
,fold
会提供一个初始值参与计算。
源码:
1
2
3
4
5
6
7
8
9
10
public suspend inline fun <T, R> Flow<T>.fold(
initial: R,
crossinline operation: suspend (acc: R, value: T) -> R
): R {
var accumulator = initial
collect { value ->
accumulator = operation(accumulator, value)
}
return accumulator
}
例:
1
2
3
4
5
6
7
8
9
10
11
12
runBlocking {
val result = flowOf(1, 2, 3)
.fold(10) { accumulator, value ->
"accumulator $accumulator,value $value".log()
accumulator + value
}
"result = $result".log()
}
//accumulator 10,value 1
//accumulator 11,value 2
//accumulator 13,value 3
//result = 16
buffer
先运行完emit的代码,再进行collect。
不使用buffer
的情况:
1
2
3
4
5
6
7
8
9
10
11
val f = flowOf("A", "B", "C")
runBlocking {
f.onEach { "each 1$it".log() }
.collect { value -> "collect 2$value".log() }
}
//each 1A
//collect 2A
//each 1B
//collect 2B
//each 1C
//collect 2C
使用buffer操作的结果:
1
2
3
4
5
6
7
8
9
10
11
12
val f = flowOf("A", "B", "C")
runBlocking {
f.onEach { "each 1$it".log() }
.buffer()
.collect { value -> "collect 2$value".log() }
}
//each 1A
//each 1B
//each 1C
//collect 2A
//collect 2B
//collect 2C
conflate
将flow
任务混合,并将collect
运行在一个单独的协程中,这样emit
不会因为collect
执行慢而挂起等待,collect
会一直拿到最新的emit
值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val f = flow {
for (i in 1..3) {
delay(100)
emit(i)
"emit $i".log()
}
}
runBlocking {
val time = measureTimeMillis {
f.conflate().collect {
delay(300)
"collect $it".log()
}
}
"time is $time".log()
}
//emit 1
//emit 2
//emit 3
//collect 1
//collect 3
//time is 792
collectLatest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
val f = flow {
for (i in 1..3) {
delay(100)
emit(i)
"emit $i".log()
}
}
runBlocking {
val time = measureTimeMillis {
f.collectLatest {
"before delay collect $it".log()
delay(300)
"collect $it".log()
}
}
"time is $time".log()
}
//before delay collect 1
//emit 1
//before delay collect 2
//emit 2
//before delay collect 3
//emit 3
//collect 3
//time is 745
zip
根据transform
表达式“压缩”另一个flow
,并返回一个新的flow
,“压缩”过程中flow
会以最先完成的flow
为结束依据,剩下的没有完成的flow
会被取消。
1
2
3
4
5
6
7
8
9
val flow = flowOf(1, 2, 3).onEach { delay(10) }
val flow2 = flowOf("A", "B", "C", "D").onEach { delay(15) }
runBlocking {
flow.zip(flow2) { i, s -> i.toString() + s }
.collect { it.log() }
}
//1A
//2B
//3C
combine
与zip
不同的是combine
会将两个flow
都执行完,并且会根据最新emit
的值组合。
1
2
3
4
5
6
7
8
9
10
val flow = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("A", "B", "C").onEach { delay(15) }
runBlocking {
flow.combine(flow2) { i, s -> i.toString() + s }
.collect { it.log() }
}
//1A
//2A
//2B
//2C
oneach
在返回新flow
之前,先执行onEach
表达式。
1
2
3
4
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
action(value)
return@transform emit(value)
}
如:
1
2
3
4
5
6
7
8
9
val intFlow = flowOf(1, 2, 3)
runBlocking {
intFlow.onEach { value ->
"onEach $value".log()
}.collect()
}
//onEach 1
//onEach 2
//onEach 3
其中collect()
会触发flow
的collect
方法,但是会忽略emit
的内容,其实就是collect{}
的缩略写法。
collect()
一般会跟onEach
、onCompletion
、catch
操作符一起出现。
flowOn
正常情况下flow
的emit
跟collect
会在同一个协程中执行,知道基本的协程切换的话,可能会想通过withContext
来将flow
的emit
和
collect
放在不同协程中执行,但是这是错误的,实现flow
的协程切换要通过flowOn
。
错误示范:
1
2
3
4
5
6
7
8
9
10
val intFlow = flow {
withContext(Dispatchers.IO) {
emit(1)
}
}
runBlocking {
intFlow.collect { value ->
"collect $value".log()
}
}
以上代码会抛出异常
java.lang.IllegalStateException: Flow invariant is violated:
正确使用方式:
没有使用flowOn
:
1
2
3
4
5
6
7
8
9
10
11
12
13
val intFlow = flow {
"emit thread ${Thread.currentThread().name}".log()
emit(1)
}
runBlocking {
intFlow.collect { value ->
"collect thread ${Thread.currentThread().name}".log()
"collect $value".log()
}
}
//emit thread main @coroutine#1
//collect thread main @coroutine#1
//collect 1
使用了flowOn
:
1
2
3
4
5
6
7
8
9
10
11
12
13
val intFlow = flow {
"emit thread ${Thread.currentThread().name}".log()
emit(1)
}.flowOn(Dispatchers.IO)
runBlocking {
intFlow.collect { value ->
"collect thread ${Thread.currentThread().name}".log()
"collect $value".log()
}
}
//emit thread DefaultDispatcher-worker-1 @coroutine#2
//collect thread main @coroutine#1
//collect 1
完整的解释:
1
2
3
4
5
6
7
8
withContext(Dispatchers.Main) {
val singleValue = intFlow // will be executed on IO if context wasn't specified before
.map { ... } // Will be executed in IO
.flowOn(Dispatchers.IO)
.filter { ... } // Will be executed in Default
.flowOn(Dispatchers.Default)
.single() // Will be executed in the Main
}
如果一个flow
操作调用了多次的flowOn
,则以第一次的flowOn
为准,如:
1
2
3
flow.map { ... } // Will be executed in IO
.flowOn(Dispatchers.IO) // This one takes precedence
.flowOn(Dispatchers.Default)
注意SharedFlow
的子类本身没有执行的context
,所以flowOn
对其无效。
Exceptions
try catch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
val f = flow {
for (i in 1..3) {
"emit $i".log()
emit(i)
}
}
runBlocking {
try {
f.collect { value ->
"collect $value".log()
check(value <= 1) {
"crashed on $value"
}
}
} catch (e: Throwable) {
"异常 $e".log()
}
}
//emit 1
//collect 1
//emit 2
//collect 2
//异常 java.lang.IllegalStateException: crashed on 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
val f = flow {
for (i in 1..3) {
"emit $i".log()
emit(i)
}
}.map { value ->
check(value <= 1) {
"crashed on $value"
}
"mapValue $value"
}
runBlocking {
try {
f.collect { value ->
"collect $value".log()
}
} catch (e: Throwable) {
"异常 $e".log()
}
}
//emit 1
//collect mapValue 1
//emit 2
//异常 java.lang.IllegalStateException: crashed on 2
catch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val f = flow {
for (i in 1..3) {
"emit $i".log()
emit(i)
}
}.map { value ->
check(value <= 1) {
"crashed on $value"
}
"mapValue $value"
}
runBlocking {
f.catch { e->
emit("不小心捕获了个异常 $e")
}.collect { s->
"collect $s".log()
}
}
//emit 1
//collect mapValue 1
//emit 2
//collect 不小心捕获了个异常 java.lang.IllegalStateException: crashed on 2
collect发生异常无法捕获到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val f = flow {
for (i in 1..3) {
"emit $i".log()
emit(i)
}
}
runBlocking {
f.catch { e ->
"不小心捕获了个异常 $e".log()
}.collect { i ->
"collect $i".log()
check(i <= 1) {
"crashed on $i"
}
}
}
//emit 1
//collect 1
//emit 2
//collect 2
//Exception in thread "main" java.lang.IllegalStateException: crashed on 2
兼并上述两种方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val f = flow {
for (i in 1..3) {
"emit $i".log()
emit(i)
}
}
runBlocking {
f.onEach { value ->
check(value <= 1) {
"crashed on $value"
}
"collect $value".log()
}.catch { e ->
"不小心捕获了个异常 $e".log()
}.collect()
}
//emit 1
//collect 1
//emit 2
//不小心捕获了个异常 java.lang.IllegalStateException: crashed on 2
completion
finally
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val f = flowOf(1, 2, 3)
runBlocking {
try {
f.collect { value->
"collect $value".log()
}
} finally {
"complete".log()
}
}
//collect 1
//collect 2
//collect 3
//complete
onCompletion
onCompletion接收一个 Throwable 的nullable参数,可以用这个判断Flow是正常结束还是发生了异常。
1
2
3
4
5
6
7
8
9
val f = flowOf(1, 2, 3)
runBlocking {
f.onCompletion { "complete".log() }
.collect { value -> "collect $value".log() }
}
//collect 1
//collect 2
//collect 3
//complete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val f = flow {
emit(1)
throw RuntimeException()
}
runBlocking {
f.onCompletion { cause ->
if (cause != null) {
"Flow completed exceptionally".log()
}
}
.catch { cause ->
"Caught exception $cause".log()
}
.collect { value -> "collect $value".log() }
}
//collect 1
//Flow completed exceptionally
//Caught exception java.lang.RuntimeException
不同于catch
,onCompletion
接收所有异常,如果接收得参数为null则代表Flow正常结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
val f = flowOf(1, 2, 3)
runBlocking {
f.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) {
"crashed on $value"
}
"collect $value".log()
}
}
//collect 1
//Flow completed with java.lang.IllegalStateException: crashed on 2
//Exception in thread "main" java.lang.IllegalStateException: crashed on 2