목표
- Flow API를 이용한 반응형 데이터 스트림 처리 방법 학습
- StateFlow와 SharedFlow의 사용법 및 적용 사례
- 실시간 데이터 처리를 위한 코드 예제 실습
플로우(Flow)
플로우란 비동기적으로 계산해야 할 값의 스트림을 나타냅니다. Flow 인터페이스 자체는 원소들을 모으는 역할을 하며, 플로우의 끝에 도달할 때까지 각 값을 처리합니다.
Flow의 유일한 멤버 함수는 collect로, 그 외 함수들은 확장 함수로 정의되어 있습니다. iterator만 멤버 함수로 가지고 있는 Iterable이나 Sequence와 비슷하다고 볼 수 있습니다.
그래서 Flow가 뭐야?
예를 들어 여러 개의 값을 반환하는 함수가 필요하다고 가정합니다. 한 번에 모든 값을 만들때는 List나 Set과 같은 컬렉션을 사용합니다.
fun allUsers(): List<User> =
api.getAllUsers().map { it.toUser() }
이 때 List와 Set은 모든 원소의 계산이 완료된 컬렉션입니다. 따라서, 각 값을 계산하는 과정에 시간이 소요된다면 이들 원소들이 채워지도록 모든 값이 생성되길 기다려야 합니다.
모든 원소를 한 번에 필요한 것이 아니라면, 원소를 하나씩 계산하고 원소가 나오자마자 바로 얻는 것이 좋습니다. 이 때, Sequence를 사용해볼 수 있습니다.
sequence는 복잡한 결과값을 계산하는 등의 CPU 집약적인 연산 또는 블로킹 연산일 때 필요할 때마다 값을 계산하는 플로우를 나타내기 적절합니다.
다만, forEach와 같은 sequence의 최종 연산은 중단 함수가 아니기 때문에, 시퀀스 빌더 내부에 중단점이 있다면 값을 기다리는 스레드가 블로킹 됩니다. 따라서, sequence 빌더의 스코프에서는 SequenceScope의 리시버에서 호출되는 함수(yield와 yieldAll) 외에 다른 중단 함수를 사용할 수 없습니다.
Flow vs Sequence
fun allUsersSequence(api: UserApi): Sequence<User> = sequence {
var page = 0
do {
val users = api.takePage(page++) // 중단함수라고 가정
// 컴파일 에러 발생
yieldAll(users)
} while(!users.isNullOrEmpty())
}
시퀀스는 앞서 이야기 했던 것처럼 시퀀스 빌더 내에 중단점이 있다면 값을 기다리는 스레드가 블로킹되므로 위와 같은 상황일 때 사용하기 적합하지 않으며, 아래와 같은 상황일 때 유용합니다.
- 데이터 소스의 개수가 많거나(또는 무한정) 원소가 무거운 경우
- 원소를 필요할 때만 계산하거나 읽는 지연 연산을 하게 되는 상황
시퀀스 사용으로 인해 스레드가 블로킹되면 예기치 않은 상황을 유발할 수 있으므로, 아래와 같은 상황일 때에는 sequence 대신 flow를 써야 합니다.
위 상황은, sequence의 최종 연산이 중단함수가 아니고, 시퀀스 빌더 내 스레드 블로킹되는 함수가 있어 main 스레드가 블로킹된 상황입니다. 이때, launch 코루틴 역시 동일한 main 스레드를 사용해야 하기 때문에, launch 코루틴까지 함께 블로킹되었습니다.
이럴 때에는, Sequence 대신 Flow를 사용해야 합니다. 플로우를 사용하면 코루틴이 연산을 수행하는 데 필요한 기능을 전부 사용할 수 있습니다. 플로우는 코루틴을 사용해야 하는 데이터 스트림으로 사용되어야 합니다.
플로우의 빌더와 연산은 중단 함수이며, 구조화된 동시성과 적절한 예외 처리를 지원합니다.
플로우의 특징
Flow에서 제공하는 최종 연산(예: collect)은 스레드를 블로킹하지 않고, 코루틴을 중단시킵니다. 플로우는 코루틴 컨텍스트를 활용하고 예외를 처리하는 등의 코루틴 기능 역시 제공합니다. 다른 코루틴처럼 플로우 처리는 취소 가능하며, 구조화된 동시성도 갖습니다.
참고로, flow 빌더는 중단 함수가 아니며 어떠한 스코프도 필요하지 않습니다.
플로우의 최종 연산은 중단 가능하며, 연산이 실행될 때 부모 코루틴과의 관계가 정립됩니다.
플로우 명명법
모든 플로우(flow)는 몇 가지 요소로 구성됩니다.
- 플로우는 어딘가에서 시작되어야 합니다. 플로우 빌더, 다른 객체에서의 변환, 또는 헬퍼 함수로부터 시작됩니다.
- 플로우의 마지막 연산은 최종 연산이라 불리며, 중단 가능하거나 스코프를 필요로 하는 유일한 연산이라는 점에서 중요합니다.
- 시작 연산과 최종 연산 사이에 플로우를 변경하는 중간 연산을 가질 수 있습니다.
플로우 실 사용 예
플로우가 사용되는 전형적인 예는 아래와 같습니다.
- 웹소켓이나 RSocket 알림과 같이 서버가 보낸 이벤트를 통해 전달된 메시지를 받는 경우
- 텍스트 입력 또는 클릭과 같은 사용자 액션이 감지된 경우
- 센서 또는 위치나 지도와 같은 기기의 정보 변경을 받는 경우
- 데이터베이스의 변경을 감지하는 경우
플로우는 이 외에도 동시성 처리를 위해 유용하게 사용될 수 있습니다.
suspend fun getOffers(sellers: List<Seller>): List<Offer> = coroutineScope {
sellers
.map { seller -> async { api.requestOffers(seller.id) }}
.flatMap { it.await() }
}
예를 들어 위와 같이 외부를 여러 번 호출하는 비동기 async 작업이 있을 때 아래와 같은 단점이 있을 수 있습니다.
- 너무 많은 요청을 한 번에 비동기로 보내게 되면, 해당 요청을 받을 서버에게도 영향을 줄 수 있습니다.
- 알아서 처리양을 제한해서 응답해줄 수도 있지만, 사용자 측면에서 제어하고 싶다면 플로우를 사용해 아래와 같이 제한할 수 있습니다.
suspend fun getOffers(sellers: List<Seller>): List<Offer> = sellers
.asFlow()
// 동시성 호출의 수를 20으로 제어함
.flatMapMerge(concurrency = 20) { seller ->
suspend { api.requestOffers(seller.id) }.asFlow()
}
컬렉션 대신 플로우로 처리하면 동시 처리, 컨텍스트, 예외를 비롯한 많은 것을 조절할 수 있습니다.
참고 자료
'PROGRAMMING LANGUAGE > KOTLIN' 카테고리의 다른 글
[Kotlin Coroutine] Asynchronous Flow (5) | 2024.07.20 |
---|---|
[Effective Kotlin] 5장 객체 생성 (0) | 2024.07.17 |
[Effective Kotlin] 4장 추상화 설계 (0) | 2024.07.09 |
[Kotlin Coroutine] 코루틴과 멀티스레딩 환경에서의 최적화 (0) | 2024.07.04 |
[Effective Kotlin] 3장 재사용성 (0) | 2024.06.30 |