[Kotlin Coroutine] Asynchronous Flow
목표
- 코틀린 공식 문서의 Asynchronous Flow 학습
Asynchronous Flow
중단 함수는 비동기적으로 하나의 값을 반환합니다.
비동기적으로 계산된 여러 가지 값들을 반환하고 싶을 때는 어떻게 해야 할까요?
이를 위해 등장한 것이 Kotlin Flow입니다.
여러 값들을 표현하는 방법
코틀린에서는 collections을 이용해서 여러 값들을 표현할 수 있습니다.
forEach
collections의 값들을 표현할 때에는 아래와 같이 간단하게 forEach
를 사용할 수 있습니다.
Sequence
만약, CPU를 소비하는 로직을 포함한다면 sequence
를 활용하기도 합니다. 아래 코드는 앞선 forEach
와 결과는 동일하지만, CPU 연산처럼 동작하기 위해 100ms 대기하는 로직을 포함하여 프로그램 수행 시간이 300ms 넘게 소요되는 것을 확인할 수 있습니다.
suspend function
앞선 예시의 simple
함수의 경우, 메인 스레드를 블락했기 때문에, 시간이 300ms가 걸린 것입니다. 따라서 아래와 같이 중단 함수를 활용하면, 스레드를 블락시키지 않고 훨씬 빠르게 동작시킬 수 있습니다.
Flows
List<Int>
라는 반환 타입은 모든 값을 한 번에 반환함을 의미합니다. 동기적으로 계산하기 위해 Sequence<Int>
를 쓰는 것처럼, 비동기적으로 계산되는 값들의 스트림을 표현하기 위해 Flow<Int>
타입을 쓸 수 있습니다.
위 예시를 통해 알 수 있는 점은 아래와 같습니다.
- 메인 스레드를 블록하지 않고 동작함
- Flow<Int> 생성 함수는 flow 함수를 호출하며,
flow { ... }
내부는suspend
블록임 suspend
블록을 호출하는 simple 함수는suspend
지시자가 붙어있지 않음- Flow와 FlowCollector에 의해 제공된 collect, emit 함수를 통해 값이 수집되고 방출됩니다.
Flows는 cold stream
Flows
는 sequence
와 유사한 cold stream으로, flow 빌더 내부의 코드는 flow가 수집(collect)되기 전까지는 실행되지 않습니다.
- 이것이 flow를 반환하는 함수가
suspend
지시자가 붙지 않은 주된 이유입니다.
위 예시에서 simple() 메소드는 자기 자신을 바로 반환하고 아무것도 기다리지 않습니다.
- flow는 collect가 될 때마다 실행되기 때문에, Flow started는 collect가 호출될때마다 매번 출력됩니다.
핫 데이터 소스와 콜드 데이터 소스
핫 | 콜드 |
컬렉션(List, Set) | Sequence, Stream |
Channel | Flow, RxJava 스트림 |
핫 데이터 스트림은 열정적이라 데이터를 소비하는 것과 무관하게 원소를 생성하는 반면, 콜드 데이터 스트림은 게을러서 요청이 있을 때만 작업을 수행하며 아무것도 저장하지 않습니다.
(핫 데이터 스트림인) List와 (콜드 데이터 스트림인) sequence
를 사용할 때 그 차이를 확실하게 느낄 수 있습니다.
위 예시를 보면, 핫 데이터 스트림인 list는 원소가 필요하지 않아도 무조건 실행되는 것을 확인할 수 있으며, 콜드 데이터 스트림인 sequence
는 원소가 필요하지 않아 아무런 작업도 진행하지 않았음을 확인할 수 있습니다.
위처럼 콜드 데이터 스트림의 원소를 필요로 하는 코드가 수행되어야만, 해당 원소를 가져오기 위한 로직이 수행됨을 확인할 수 있습니다.
콜드 데이터 스트림은 아래와 같은 특징을 가집니다.
- 무한할 수 있습니다.
- 최소한의 연산만 수행합니다.
- (중간에 생성되는 값들을 보관할 필요가 없으므로) 메모리를 적게 사용합니다.
Flow 취소
Flow
의 취소는 일반적인 코루틴의 취소 정책을 따릅니다. 보통, flow collection은 취소 가능한 중단 함수(예: delay
)에서 중단된 경우 취소될 수 있습니다.
위는 withTimeoutOrNull
블록 내에서 수행되는 flow
가 시간초과로 인해 취소되는 예시입니다.
- 250ms만에 취소되었기 때문에, flow 블록이 2까지 출력한 뒤 취소되었습니다.
Flow builders
지금까지 살펴본 flow { ... }
빌더는 가장 기본적인 flow
빌더입니다. flow
빌더로 제공되고 있는 다른 빌더들은 아래와 같습니다.
flowOf()
빌더- 인자로 들어온 값들을 emit하도록 설계된 빌더
asFlow()
확장 함수- 여러 컬렉션과 시퀀스는
.asFlow()
확장 함수를 통해flow
로 변환할 수 있음
- 여러 컬렉션과 시퀀스는
중간 흐름 연산자(Intermediate flow operators)(중간 연산자)
Flow
는 컬렉션이나 시퀀스를 변환하는 방법과 동일하게 연산자를 통해 변환할 수 있습니다. 중간 흐름 연산자는 upstream flow를 받아서 downstream flow를 반환해줍니다. 이러한 연산자들 역시 flow
와 마찬가지로 cold이기 때문에 연산자 자체가 중단 함수는 아닙니다. 따라서, 새로운 변환된 flow
의 정의를 빠르게 반환해줍니다.
Flow
의 기본적인 연산자는 map
과 filter
와 같은 친숙한 이름을 가지고 있습니다. 시퀀스의 연산자와의 주된 차이점은 flow
의 연산자는 내부적으로 중단 함수를 호출할 수 있다는 점입니다. 예를 들어, flow
는 map
연산자를 이용해 중단 함수에 의해 수행되는 오래 걸리는 연산 결과에 매핑될 수도 있습니다.
Transform operator
flow
의 변환 연산자 중 가장 주된 연산자는 transform
입니다. 이는 map
이나 filter
와 같은 심플한 변환을 포함해 복잡한 변환까지 사용될 수 있습니다. transform
연산자를 사용하면, 임의의 값을 임의 개수만큼 방출(emit)할 수 있습니다.
아래 예시는 tranform 연산자를 통해 긴 연산을 수행하기 전에 string 값을 하나 방출하는 예시입니다.
Size-limiting operators
take와 같은 사이즈 제한 중간 연산자는 사이즈 제한에 도달하면 flow
수행을 취소합니다. 코루틴에서 취소는 항상 예외를 던짐으로써 동작하기 때문에, 리소스를 관리하는 기능(예: try ~ finally
)이 취소 케이스에서 정상 작동합니다.
터미널 흐름 연산자(Terminal flow operators)(최종 연산자)
flows
의 터미널 연산자는 flow
를 수집하기 시작하는 일시 중단 함수입니다. collect
연산자가 기본적으로 사용되는 연산자이며, 아래와 같은 터미널 흐름 연산자도 존재합니다.
toSet
,toList
- 다양한 컬렉션을 변환해주는 연산자
first
,single
- 첫번째 값을 반환하거나, 단일 값을 방출하도록 하는 연산자
reduce
,fold
- 값을 누적할 때 사용하는 연산자
Flows는 순차적이다
flow
의 각각의 collection은 여러 flow
들에서 동작하는 특별한 연산자들이 사용되지 않는한 순차적으로 수행됩니다. collection은 코루틴에서 터미널 연산자가 호출되면 바로 작동합니다. 기본적으로 새로운 코루틴이 시작되지 않으며, 방출(emit)된 각각의 값은 upstream에서 downstream으로 중간 연산자에 의해 모두 처리된 뒤, 최종 연산자에 전달됩니다.
Flow context
flow
의 수집(collect)은 호출 코루틴의 컨텍스트에서 발생합니다. 예를 들어, 아래와 같이 simple flow
가 존재할 때 flow
의 세부 구현과 상관없이 코드가 수행되는 context
에서 수행됩니다. flow
의 이런 속성을 컨텍스트 보존이라고 합니다.
- 따라서, 기본적으로
flow { ... }
빌더의 코드는 해당flow
의collect
가 호출되는 컨텍스트에서 실행됩니다. - 방출(emit)과 수집(collect) 시 사용되는 코루틴이 동일합니다.
flow
에서 withContext
사용 시 참고 사항
flow { ... }
내부에withContext
를 추가하여 CPU를 많이 사용하는 연산만 따로 다른 디스패처에서 실행시키려해도,flow { ... }
빌더는 보존된 컨텍스트를 존중해야 하므로,flow
빌더 내에서 따로 지정한 컨텍스트에서 방출(emit)할 수 없습니다.
위 상황에서 발생하는 에러를 간략하게 정리하면 아래와 같습니다.
Flow invariant is violated:
java.lang.IllegalStateException: Flow invariant is violated:
...
Please refer to 'flow' documentation or use 'flowOn' instead
flowOn 연산자
앞서 살펴본 에러 메시지에 보면, flow
대신 flowOn
을 사용하라는 내용이 있었습니다. flowOn
연산자는 flow
방출(emit) 시, 컨텍스트를 바꿀 수 있게 해줍니다. flow
의 컨텍스트를 바꾸는 올바른 방법은 아래와 같습니다.
flowOn
연산자를 통해 컨텍스트를 바꿀 때에는 수집(collect)은 해당 연산자를 호출한 컨텍스트 내에서 실행되고, 방출(emit)에 대해서만 flowOn
연산자에 지정한 컨텍스트에서 수행됨을 확인할 수 있습니다. 로그에서 확인할 수 있듯이 방출(emission)과 수집(collection)은 서로 다른 코루틴에서 수행됩니다.
flowOn
연산자는 자신의 컨텍스트에서coroutineDispatcher
를 변경해야 할 때, upstream flow를 위해 새로운 코루틴을 생성합니다.
Buffering
서로 다른 코루틴에서 flow
의 각각의 파트를 수행하면 flow
에서 수집(collect)에 걸리는 전체적인 시간을 줄이는데 도움이 됩니다.
예를 들어 아래처럼 방출(emission)이 100ms씩 걸리고, 수집(collect)에는 300ms이 걸린다고 가정했을 때, 이를 buffering을 하면 속도가 빨라집니다.
buffer 연산자를 사용하면, 방출(emit) 로직을 수집(collect) 로직과 동시에 수행할 수 있습니다. 이 경우, flow
는 순차적으로 실행되지 않습니다. buffer 연산자를 이용하면, 효율적으로 flow
를 프로세싱할 수 있게 됩니다.
참고로, 앞서 이야기한flowOn
연산자는CoroutineDispatcher
를 변경해야 할 때 동일한 버퍼링 매커니즘을 사용합니다.
Conflation(융합)
flow
의 작업 또는 작업 상태 업데이트의 부분적 결과를 나타낼 때 각 값들을 처리할 필요 없이, 가장 최근 값만 처리할 수 있습니다. 이 때 conflate
연산자를 통해 불필요한 중간 값을 건너뛸 수 있습니다.
- 위에서 1, 2, 3이 아닌 1, 3만 로그에 찍힌 이유는 방출(emit)과 수집(collect)이 동시에 이루어지면서 1이 수집되는 동안에 2와 3이 순차적으로 방출되어서 가장 최근에 방출된 3만 collector에 전달되었기 때문입니다.
참고로, conflate()
연산자 호출은 buffer
연산자에 capacity
인자에 CONFLATED
(-1을 의미함)을 넣어 호출하는 것과 동일합니다.
최신 값 처리하기
conflation
(융합)은 방출(emit)과 수집(collect)이 느려 속도를 빠르게 할 때 사용하며, 방출한 값을 버리는(drop) 식으로 작동합니다. 최신 값을 처리하는 또 다른 방법은 느린 수집기(collector)를 새로운 값이 방출(emit)될 때마다 재시작하는 것입니다.
collectLatest
는 300ms가 걸리고, 새로운 값이 매 100ms마다 방출(emit)되어 마지막 값만 수집이 완료됨
다중 flow
구성
여러 flow
를 하나의 flow
로 합치는 방법에는 여러 가지가 있습니다.
Zip
zip
은 Sequence.zip
확장 함수처럼 Kotlin의 기본 라이브러리에 속한 함수입니다. zip
은 두 flow
로부터 쌍(변경되어 새로운 flow
로 내보낼 원소)을 생성합니다. zip
을 사용할 때에는 쌍을 만드는 방법을 정하는 함수도 필요하며, zip은 두 flow 중 하나의 flow
의 zipping이 완료되면 생성되는 flow
도 완료되기 때문에, 하나의 flow
가 다른 flow
보다 원소가 더 많다면 원소가 많은 flow
의 남은 원소는 유실됩니다.
zip
을 사용할 때에는 느린flow
에 수행 시간에 맞추어 새로운flow
가 생성됩니다.
Combine
combine
은 zip
처럼 원소들로 쌍을 형성하기 때문에 첫 번째 쌍을 만들기 위해 느린 flow
를 기다려야 합니다. 하지만, combine
을 사용하면 모든 새로운 원소가 기존 원소를 대체하기 때문에 첫 번째 쌍이 이미 만들어졌다면 다른 플로우의 이전 원소와 함께 새로운 쌍이 만들어집니다.
combine은 두 데이터 소스의 변화를 능동적으로 감지할 때 주로 사용됩니다.
- 아래 예시를 보면,
zip
과 달리 첫 번째 쌍을 미룬 이후, 다른 플로우의 이전 원소(one)와 함께 새로운 쌍(3 -> one)이 만들어짐을 확인할 수 있습니다. - 또한,
zip
은 하나의flow
가 종료되면 새로운flow
생성 역시 완료되지만, 이와 달리combine
은 다른 플로우의 이전 원소와 새로운 쌍이 만들어질 수 있어 두 플로우가 모두 닫힐 때까지 원소를 내보내는 것을 확인할 수 있습니다. (3 -> two, 3 -> three)
flow의 평탄화(flatten)
컬렉션에 존재하는 flatMap
은 map
와 비슷하지만 변환 함수가 평탄화된 컬렉션을 반환해야 한다는 점에서 다릅니다. 예를 들어, 부서 목록을 가지고 있고 각 부서가 사원 목록을 가지고 있을 때, 전체 부서의 사원 목록을 만들때 flatMap
을 사용할 수 있습니다.
flow
의 flatMap
은 컬렉션의 flatMap
과 유사하게 변환 함수가 평탄화된 flow
를 반환한다고 보면 됩니다. 하지만, flow
원소가 나오는 시간이 flow
마다 다르기 때문에 두 번째 원소에서 만들어진 flow
가 첫 번째 flow
에서 만들어진 원소를 기다릴지, 동시에 처리할지 등 정해야 할 사항들이 있습니다. 이로 인해 flow
에는 flatMap
은 존재하지 않고 flatMapConcat
, flatMapMerge
, flatMapLatest
와 같은 다양한 함수가 존재합니다.
flatMapConcat
flatMapConcat
은 생성된 flow
를 하나씩 처리합니다. 따라서, 두 번째 flow
는 첫 번째 flow
가 완료되었을 때 시작이 가능합니다.
flatMapMerge
flatMapMerge
는 만들어진 flow
를 동시에 처리합니다. 아래 예시를 보면, flatMapConcat
사용 시와 다르게, 생성된 flow
를 하나씩 처리하되, 동시에 처리함을 확인할 수 있습니다.
flatMapMerge
에서 동시 처리 개수는 concurrency
인자를 사용해 동시 처리 flow 개수를 설정할 수 있으며, 인자의 기본값은 16으로 JVM에서 DEFAULT_CONCURRENCY_PROPERTY_NAME
프로퍼티를 사용해 변경할 수 있습니다.
flatMapMerge는 어떨때 사용할까?
flatMapMerge
는 flow
의 각 원소에 대한 데이터를 요청할 때 주로 사용합니다. 예를 들어, 종류를 목록으로 가지고 있고 종류별로 요청을 보내야 할 때 사용합니다.
이때 async
를 통해 요청을 할 수도 있지만, flow
와 함께 flatMapMerge
를 사용하면 아래와 같은 이점을 가집니다.
- 동시성 인자를 제어하고 (같은 시간에 수백 개의 요청을 보내는 걸 피하기 위해) 같은 시간에 얼마만큼의 종류를 처리할지 결정할 수 있습니다.
Flow
를 반환하여 데이터가 생성될 때마다 다음 원소를 보낼 수 있습니다(함수를 사용하는 측면에서 보면 데이터를 즉시 처리할 수 있습니다).
suspend fun getOffers(categories: List<Category>): List<Offer> = coroutineScope {
categories.map { async { api.requestOffers(it) }
.flatMap { it.await() }
}
// 더 나은 방법
suspend fun getOffers(categories: List<Category>): Flow<Offer> = categories
.asFlow()
.flatMapMerge(concurrency = 20) {
suspend { api.requestOffers(it) }.asFlow()
// 또는 flow { emit(api.requestOffers(it))) }
}
flatMapLatest
flatMapLatest
는 새로운 flow
가 나타나면 이전에 처리하던 flow
를 잊어버립니다.
위 예시를 통해 A, B, C 사이에 지연이 없어서 A와 B가 속한 값 없이 1_C, 2_C, 3_C만 출력된 것을 볼 수 있습니다.
시작 flow
의 원소 생성 시 지연이 발생하면 위처럼 결과가 보여지며, 상세 내용은 아래와 같습니다.
- 1200ms 후에
flowFrom("A")
을 사용하는flow
가 생성되어, 2200ms에 1_A를 반환합니다. - 2400ms(1_A 반환 200ms 후)에는 B가 생성되어
flowFrom("A")
에 의한flow
는 닫히고,flowFrom("B")
를 통한 새로운flow
가 생성되어, 3400ms에 1_B를 반환합니다. - 3600ms(1_B 반환 200ms 후)에는 C가 생성되어
flowFrom("B")
에 의한flow
는 닫히고,flowFrom("C")
를 통한 새로운flow
가 생성되어, 4600ms에 1_C를 반환합니다. - 그 후에는 해당
flow
가 닫힐 일은 없어서 1000ms마다 다음 번 값이 수집됩니다.
Flow 예외
flow
를 만들거나 처리하는 도중 예외를 발생하면, 예외로 종료될 수 있습니다. 이러한 예외를 다루는 몇 가지 방법에 대해 소개하도록 하겠습니다.
수집기에 대해 try and catch
collect
최종 연산자에서 예외를 catch
하면 더 이상 다음 원소를 emit하지 않습니다.
예외 투명성
emitter는 catch
연산자를 사용하여 예외 투명성을 보존하고 예외 핸들러를 캡슐화할 수 있습니다. catch
연산자 본문에서는 예외를 분석하고 어떤 예외인지에 따라 다른 방식으로 대응할 수 있습니다.
throw
를 통해 다시 예외를 던질 수 있습니다.- 예외는 emit의 값으로써
catch
바깥으로 넘겨질 수 있습니다. - 예외에 대해 무시, 로깅, 다른 로직 수행을 진행할 수 있습니다.
catch
중간 연산자는, 예외 투명성에 의무가 있어 오직 upstream 예외만 처리합니다.
즉,flow
에서 예외는 아래로 흐를 때 잡기 때문에,catch
함수의 위치 기준으로 아래쪽 체이닝에서 발생한 예외는 잡을 수 없습니다.
마찬가지 이유로, collect
연산자는 최종 연산자라 downstream이 되기 때문에 collect
에서 발생한 예외 역시 처리할 수 없습니다.
이 경우에는 아래와 같이 onEach
를 활용해 선언적으로 예외를 잡을 수 있습니다.
Flow 완료
flow
가 완료되면, 어떠한 액션을 수행할 수 있어야 합니다. 액션을 수행하는 방법에는 명령형 또는 선언형 방법이 있습니다.
finally 블록
선언적 핸들링
onCompletion
의 장점은null
이 가능한Throwable
파라미터를 통해flow
가 정상 종료인지 예외로 인한 종료인지 여부를 파악할 수 있다는 점입니다.
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
Launching flow(launchIn)
collect
는 flow
가 완료될 때까지 코루틴을 중단하는 중단 연산입니다. collect
를 launch
빌더로 래핑(wrapping)하면 flow
를 다른 코루틴에서 처리할 수 있습니다.
flow
의 확장 함수인launchIn
을 사용하면, 유일한 인자로 스코프를 받아collect
를 새로운 코루틴에서 시작할 수 있습니다.
참고 자료