PROGRAMMING LANGUAGE/KOTLIN

[Kotlin Coroutine] Asynchronous Flow

EARTH_ROOPRETELCHAM 2024. 7. 20. 23:50
728x90
반응형

목표

  • 코틀린 공식 문서의 Asynchronous Flow 학습

Asynchronous Flow

중단 함수는 비동기적으로 하나의 값을 반환합니다.
비동기적으로 계산된 여러 가지 값들을 반환하고 싶을 때는 어떻게 해야 할까요?
이를 위해 등장한 것이 Kotlin Flow입니다.

여러 값들을 표현하는 방법

코틀린에서는 collections을 이용해서 여러 값들을 표현할 수 있습니다.

forEach

collections의 값들을 표현할 때에는 아래와 같이 간단하게 forEach를 사용할 수 있습니다.

forEach를 활용한 컬렉션의 아이템 반환

Sequence

만약, CPU를 소비하는 로직을 포함한다면 sequence를 활용하기도 합니다. 아래 코드는 앞선 forEach와 결과는 동일하지만, CPU 연산처럼 동작하기 위해 100ms 대기하는 로직을 포함하여 프로그램 수행 시간이 300ms 넘게 소요되는 것을 확인할 수 있습니다. 

 

sequence를 활용한 컬렉션의 아이템 반환(300ms 이상 소요)
sequence는 스레드를 블록하여 해당 작업이 모두 완료되어야 스레드를 넘김

suspend function

앞선 예시의 simple 함수의 경우, 메인 스레드를 블락했기 때문에, 시간이 300ms가 걸린 것입니다. 따라서 아래와 같이 중단 함수를 활용하면, 스레드를 블락시키지 않고 훨씬 빠르게 동작시킬 수 있습니다.

일시 중단 함수를 활용한 컬렉션의 아이템 반환(100ms 가량 걸림)

Flows

List<Int>라는 반환 타입은 모든 값을 한 번에 반환함을 의미합니다. 동기적으로 계산하기 위해 Sequence<Int>를 쓰는 것처럼, 비동기적으로 계산되는 값들의 스트림을 표현하기 위해 Flow<Int> 타입을 쓸 수 있습니다.

비동기적으로 계산되는 스트림으로 컬렉션 아이템 반환

위 예시를 통해 알 수 있는 점은 아래와 같습니다.

Flow와 FlowCollector가 제공하는 collect, emit 함수

  • 메인 스레드를 블록하지 않고 동작함
  • Flow<Int> 생성 함수는 flow 함수를 호출하며, flow { ... } 내부는 suspend 블록
  • suspend 블록을 호출하는 simple 함수는 suspend 지시자가 붙어있지 않음
  • Flow와 FlowCollector에 의해 제공된 collect, emit 함수를 통해 값이 수집되고 방출됩니다.

Flows는 cold stream

Flowssequence와 유사한 cold stream으로, flow 빌더 내부의 코드는 flow가 수집(collect)되기 전까지는 실행되지 않습니다.

  • 이것이 flow를 반환하는 함수가 suspend 지시자가 붙지 않은 주된 이유입니다.

Flows는 cold stream

위 예시에서 simple() 메소드는 자기 자신을 바로 반환하고 아무것도 기다리지 않습니다.

  • flow는 collect가 될 때마다 실행되기 때문에, Flow started는 collect가 호출될때마다 매번 출력됩니다.
더보기

핫 데이터 소스와 콜드 데이터 소스

콜드
컬렉션(List, Set) Sequence, Stream
Channel Flow, RxJava 스트림

핫 데이터 스트림은 열정적이라 데이터를 소비하는 것과 무관하게 원소를 생성하는 반면, 콜드 데이터 스트림은 게을러서 요청이 있을 때만 작업을 수행하며 아무것도 저장하지 않습니다.

 

(핫 데이터 스트림인) List와 (콜드 데이터 스트림인) sequence를 사용할 때 그 차이를 확실하게 느낄 수 있습니다.

list와 sequence 비교

 위 예시를 보면, 핫 데이터 스트림인 list는 원소가 필요하지 않아도 무조건 실행되는 것을 확인할 수 있으며, 콜드 데이터 스트림인 sequence는 원소가 필요하지 않아 아무런 작업도 진행하지 않았음을 확인할 수 있습니다.

콜드 데이터 스트림 일 시키기

위처럼 콜드 데이터 스트림의 원소를 필요로 하는 코드가 수행되어야만, 해당 원소를 가져오기 위한 로직이 수행됨을 확인할 수 있습니다. 

 

콜드 데이터 스트림은 아래와 같은 특징을 가집니다.

  • 무한할 수 있습니다.
  • 최소한의 연산만 수행합니다.
  • (중간에 생성되는 값들을 보관할 필요가 없으므로) 메모리를 적게 사용합니다.

Flow 취소

Flow의 취소는 일반적인 코루틴의 취소 정책을 따릅니다. 보통, flow collection은 취소 가능한 중단 함수(예: delay)에서 중단된 경우 취소될 수 있습니다.

flow 취소

위는 withTimeoutOrNull 블록 내에서 수행되는 flow가 시간초과로 인해 취소되는 예시입니다.

  • 250ms만에 취소되었기 때문에, flow 블록이 2까지 출력한 뒤 취소되었습니다.

Flow builders

지금까지 살펴본 flow { ... } 빌더는 가장 기본적인 flow 빌더입니다. flow 빌더로 제공되고 있는 다른 빌더들은 아래와 같습니다.

flowOf(), asFlow()

  • flowOf() 빌더
    • 인자로 들어온 값들을 emit하도록 설계된 빌더
  • asFlow() 확장 함수
    • 여러 컬렉션과 시퀀스는 .asFlow() 확장 함수를 통해 flow로 변환할 수 있음

중간 흐름 연산자(Intermediate flow operators)(중간 연산자)

Flow는 컬렉션이나 시퀀스를 변환하는 방법과 동일하게 연산자를 통해 변환할 수 있습니다. 중간 흐름 연산자는 upstream flow를 받아서 downstream flow를 반환해줍니다. 이러한 연산자들 역시 flow와 마찬가지로 cold이기 때문에 연산자 자체가 중단 함수는 아닙니다. 따라서, 새로운 변환된 flow의 정의를 빠르게 반환해줍니다.

 

Flow의 기본적인 연산자는 mapfilter와 같은 친숙한 이름을 가지고 있습니다. 시퀀스의 연산자와의 주된 차이점flow의 연산자는 내부적으로 중단 함수를 호출할 수 있다는 점입니다. 예를 들어, flowmap 연산자를 이용해 중단 함수에 의해 수행되는 오래 걸리는 연산 결과에 매핑될 수도 있습니다.

flow에 map 연산자 사용하기
map 연산자의 transform 파라미터는 중단 함수임

Transform operator

flow의 변환 연산자 중 가장 주된 연산자는 transform입니다. 이는 map이나 filter와 같은 심플한 변환을 포함해 복잡한 변환까지 사용될 수 있습니다. transform 연산자를 사용하면, 임의의 값을 임의 개수만큼 방출(emit)할 수 있습니다. 

 

아래 예시는 tranform 연산자를 통해 긴 연산을 수행하기 전에 string 값을 하나 방출하는 예시입니다.

transform 연산자

Size-limiting operators

take와 같은 사이즈 제한 중간 연산자는 사이즈 제한에 도달하면 flow 수행을 취소합니다. 코루틴에서 취소는 항상 예외를 던짐으로써 동작하기 때문에, 리소스를 관리하는 기능(예: try ~ finally)이 취소 케이스에서 정상 작동합니다.

flow 취소 시 예외를 던짐으로써 동작한다

터미널 흐름 연산자(Terminal flow operators)(최종 연산자)

flows의 터미널 연산자는 flow를 수집하기 시작하는 일시 중단 함수입니다. collect 연산자가 기본적으로 사용되는 연산자이며, 아래와 같은 터미널 흐름 연산자도 존재합니다.

  • toSet, toList
    • 다양한 컬렉션을 변환해주는 연산자
  • first, single
    • 첫번째 값을 반환하거나, 단일 값을 방출하도록 하는 연산자
  • reduce, fold
    • 값을 누적할 때 사용하는 연산자

reduce 사용 예시

Flows는 순차적이다

flow의 각각의 collection은 여러 flow들에서 동작하는 특별한 연산자들이 사용되지 않는한 순차적으로 수행됩니다. collection은 코루틴에서 터미널 연산자가 호출되면 바로 작동합니다. 기본적으로 새로운 코루틴이 시작되지 않으며, 방출(emit)된 각각의 값은 upstream에서 downstream으로 중간 연산자에 의해 모두 처리된 뒤, 최종 연산자에 전달됩니다.

flow는 순차적으로 실행됨

Flow context

flow의 수집(collect)은 호출 코루틴의 컨텍스트에서 발생합니다. 예를 들어, 아래와 같이 simple flow가 존재할 때 flow의 세부 구현과 상관없이 코드가 수행되는 context에서 수행됩니다. flow의 이런 속성을 컨텍스트 보존이라고 합니다.

flow의 컨텍스트 보존

  • 따라서, 기본적으로 flow { ... } 빌더의 코드는 해당 flowcollect가 호출되는 컨텍스트에서 실행됩니다.
  • 방출(emit)과 수집(collect) 시 사용되는 코루틴이 동일합니다.

flow에서 withContext 사용 시 참고 사항

flow { ... } 내부에 withContext를 추가하여 CPU를 많이 사용하는 연산만 따로 다른 디스패처에서 실행시키려해도, flow { ... } 빌더는 보존된 컨텍스트를 존중해야 하므로, flow 빌더 내에서 따로 지정한 컨텍스트에서 방출(emit)할 수 없습니다.

 

flow에서 withContext 사용 함정

위 상황에서 발생하는 에러를 간략하게 정리하면 아래와 같습니다.

Flow invariant is violated:
java.lang.IllegalStateException: Flow invariant is violated:
...
Please refer to 'flow' documentation or use 'flowOn' instead

flowOn 연산자

앞서 살펴본 에러 메시지에 보면, flow 대신 flowOn을 사용하라는 내용이 있었습니다. flowOn 연산자는 flow 방출(emit) 시, 컨텍스트를 바꿀 수 있게 해줍니다. flow의 컨텍스트를 바꾸는 올바른 방법은 아래와 같습니다.

flowOn 연산자를 통해 flow 방출(emit) 시의 컨텍스트를 바꿈

flowOn 연산자를 통해 컨텍스트를 바꿀 때에는 수집(collect)은 해당 연산자를 호출한 컨텍스트 내에서 실행되고, 방출(emit)에 대해서만 flowOn 연산자에 지정한 컨텍스트에서 수행됨을 확인할 수 있습니다. 로그에서 확인할 수 있듯이 방출(emission)과 수집(collection)은 서로 다른 코루틴에서 수행됩니다. 

flowOn 연산자는 자신의 컨텍스트에서 coroutineDispatcher를 변경해야 할 때, upstream flow를 위해 새로운 코루틴을 생성합니다.

Buffering

서로 다른 코루틴에서 flow의 각각의 파트를 수행하면 flow에서 수집(collect)에 걸리는 전체적인 시간을 줄이는데 도움이 됩니다. 

 

예를 들어 아래처럼 방출(emission)이 100ms씩 걸리고, 수집(collect)에는 300ms이 걸린다고 가정했을 때, 이를 buffering을 하면 속도가 빨라집니다.

buffering 전(왼쪽)/후(오른쪽)

buffer 연산자를 사용하면, 방출(emit) 로직을 수집(collect) 로직과 동시에 수행할 수 있습니다. 이 경우, flow는 순차적으로 실행되지 않습니다. buffer 연산자를 이용하면, 효율적으로 flow를 프로세싱할 수 있게 됩니다.

참고로, 앞서 이야기한 flowOn 연산자CoroutineDispatcher를 변경해야 할 때 동일한 버퍼링 매커니즘을 사용합니다.

Conflation(융합)

flow의 작업 또는 작업 상태 업데이트의 부분적 결과를 나타낼 때 각 값들을 처리할 필요 없이, 가장 최근 값만 처리할 수 있습니다. 이 때 conflate 연산자를 통해 불필요한 중간 값을 건너뛸 있습니다.

conflate 예시

  • 위에서 1, 2, 3이 아닌 1, 3만 로그에 찍힌 이유는 방출(emit)과 수집(collect)이 동시에 이루어지면서 1이 수집되는 동안에 2와 3이 순차적으로 방출되어서 가장 최근에 방출된 3만 collector에 전달되었기 때문입니다.

참고로, conflate() 연산자 호출은 buffer 연산자에 capacity 인자에 CONFLATED(-1을 의미함)을 넣어 호출하는 것과 동일합니다.

conflate 연산자는 buffer 연산자에 CONFLATED 인자를 넣은 것과 같음

최신 값 처리하기

conflation(융합)은 방출(emit)과 수집(collect)이 느려 속도를 빠르게 할 때 사용하며, 방출한 값을 버리는(drop) 식으로 작동합니다. 최신 값을 처리하는 또 다른 방법은 느린 수집기(collector)를 새로운 값이 방출(emit)될 때마다 재시작하는 것입니다. 

새로운 값이 방출됨에 따라 수집기(collector)가 재시작되어 새로운 값을 수집

  • collectLatest는 300ms가 걸리고, 새로운 값이 매 100ms마다 방출(emit)되어 마지막 값만 수집이 완료됨

다중 flow 구성

여러 flow를 하나의 flow로 합치는 방법에는 여러 가지가 있습니다. 

Zip

zipSequence.zip 확장 함수처럼 Kotlin의 기본 라이브러리에 속한 함수입니다. zipflow로부터 쌍(변경되어 새로운 flow로 내보낼 원소)을 생성합니다. zip을 사용할 때에는 쌍을 만드는 방법을 정하는 함수도 필요하며, zip은 두 flow 중 하나의 flow의 zipping이 완료되면 생성되는 flow도 완료되기 때문에, 하나의 flow가 다른 flow보다 원소가 더 많다면 원소가 많은 flow의 남은 원소는 유실됩니다.

zip 예시

  • zip을 사용할 때에는 느린 flow에 수행 시간에 맞추어 새로운 flow가 생성됩니다.

Combine

combinezip처럼 원소들로 쌍을 형성하기 때문에 첫 번째 쌍을 만들기 위해 느린 flow를 기다려야 합니다. 하지만, combine을 사용하면 모든 새로운 원소가 기존 원소를 대체하기 때문에 첫 번째 쌍이 이미 만들어졌다면 다른 플로우의 이전 원소와 함께 새로운 쌍이 만들어집니다.

combine은 두 데이터 소스의 변화를 능동적으로 감지할 때 주로 사용됩니다.
  • 아래 예시를 보면, zip과 달리 첫 번째 쌍을 미룬 이후, 다른 플로우의 이전 원소(one)와 함께 새로운 쌍(3 -> one)이 만들어짐을 확인할 수 있습니다. 
  • 또한, zip은 하나의 flow가 종료되면 새로운 flow 생성 역시 완료되지만, 이와 달리 combine은 다른 플로우의 이전 원소와 새로운 쌍이 만들어질 수 있어 두 플로우가 모두 닫힐 때까지 원소를 내보내는 것을 확인할 수 있습니다. (3 -> two, 3 -> three)

combine 예시

flow의 평탄화(flatten)

컬렉션에 존재하는 flatMapmap와 비슷하지만 변환 함수가 평탄화된 컬렉션을 반환해야 한다는 점에서 다릅니다. 예를 들어, 부서 목록을 가지고 있고 각 부서가 사원 목록을 가지고 있을 때, 전체 부서의 사원 목록을 만들때 flatMap을 사용할 수 있습니다.

 

flowflatMap은 컬렉션의 flatMap과 유사하게 변환 함수가 평탄화된 flow를 반환한다고 보면 됩니다. 하지만, flow 원소가 나오는 시간이 flow마다 다르기 때문에 두 번째 원소에서 만들어진 flow가 첫 번째 flow에서 만들어진 원소를 기다릴지, 동시에 처리할지 등 정해야 할 사항들이 있습니다. 이로 인해 flow에는 flatMap은 존재하지 않고 flatMapConcat, flatMapMerge, flatMapLatest와 같은 다양한 함수가 존재합니다.

flatMapConcat

flatMapConcat

flatMapConcat생성된 flow를 하나씩 처리합니다. 따라서, 두 번째 flow는 첫 번째 flow가 완료되었을 때 시작이 가능합니다.

flatMapConcat 예시

flatMapMerge

flatMapMerge

flatMapMerge는 만들어진 flow를 동시에 처리합니다. 아래 예시를 보면, flatMapConcat 사용 시와 다르게, 생성된 flow를 하나씩 처리하되, 동시에 처리함을 확인할 수 있습니다.

flatMapMerge 예시

flatMapMerge에서 동시 처리 개수는 concurrency 인자를 사용해 동시 처리 flow 개수를 설정할 수 있으며, 인자의 기본값은 16으로 JVM에서 DEFAULT_CONCURRENCY_PROPERTY_NAME 프로퍼티를 사용해 변경할 수 있습니다.

flatMapMerge에 concurrency 설정하기

flatMapMerge는 어떨때 사용할까?

flatMapMergeflow의 각 원소에 대한 데이터를 요청할 때 주로 사용합니다. 예를 들어, 종류를 목록으로 가지고 있고 종류별로 요청을 보내야 할 때 사용합니다.

이때 async를 통해 요청을 할 수도 있지만, flow와 함께 flatMapMerge를 사용하면 아래와 같은 이점을 가집니다.

  • 동시성 인자를 제어하고 (같은 시간에 수백 개의 요청을 보내는 걸 피하기 위해) 같은 시간에 얼마만큼의 종류를 처리할지 결정할 수 있습니다.
  • Flow를 반환하여 데이터가 생성될 때마다 다음 원소를 보낼 수 있습니다(함수를 사용하는 측면에서 보면 데이터를 즉시 처리할 수 있습니다).
suspend fun getOffers(categories: List<Category>): List<Offer> = coroutineScope {
    categories.map { async { api.requestOffers(it) }
        .flatMap { it.await() }
}

// 더 나은 방법
suspend fun getOffers(categories: List<Category>): Flow<Offer> = categories
    .asFlow()
    .flatMapMerge(concurrency = 20) {
        suspend { api.requestOffers(it) }.asFlow()
        // 또는 flow { emit(api.requestOffers(it))) }
    }

flatMapLatest

flatMapLatest

flatMapLatest는 새로운 flow가 나타나면 이전에 처리하던 flow를 잊어버립니다. 

flatMapLatest 예시

위 예시를 통해 A, B, C 사이에 지연이 없어서 A와 B가 속한 값 없이 1_C, 2_C, 3_C만 출력된 것을 볼 수 있습니다.

시작 flow에 지연이 있는 경우 예시

시작 flow의 원소 생성 시 지연이 발생하면 위처럼 결과가 보여지며, 상세 내용은 아래와 같습니다.

  • 1200ms 후에 flowFrom("A")을 사용하는 flow가 생성되어, 2200ms에 1_A를 반환합니다.
  • 2400ms(1_A 반환 200ms 후)에는 B가 생성되어 flowFrom("A")에 의한 flow는 닫히고, flowFrom("B")를 통한 새로운 flow가 생성되어, 3400ms에 1_B를 반환합니다.
  • 3600ms(1_B 반환 200ms 후)에는 C가 생성되어 flowFrom("B")에 의한 flow는 닫히고, flowFrom("C")를 통한 새로운 flow가 생성되어, 4600ms에 1_C를 반환합니다.
  • 그 후에는 해당 flow가 닫힐 일은 없어서 1000ms마다 다음 번 값이 수집됩니다.

Flow 예외

flow를 만들거나 처리하는 도중 예외를 발생하면, 예외로 종료될 수 있습니다. 이러한 예외를 다루는 몇 가지 방법에 대해 소개하도록 하겠습니다.

수집기에 대해 try and catch

collect 최종 연산자에서 예외를 catch하면 더 이상 다음 원소를 emit하지 않습니다.

collect에 대한 try ~ catch 예제(왼쪽), 중간 연산자에 대한 try ~ catch 예제(오른쪽)

 

예외 투명성

emitter는 catch 연산자를 사용하여 예외 투명성을 보존하고 예외 핸들러를 캡슐화할 수 있습니다. catch 연산자 본문에서는 예외를 분석하고 어떤 예외인지에 따라 다른 방식으로 대응할 수 있습니다.

  • throw를 통해 다시 예외를 던질 수 있습니다.
  • 예외는 emit의 값으로써 catch 바깥으로 넘겨질 수 있습니다.
  • 예외에 대해 무시, 로깅, 다른 로직 수행을 진행할 수 있습니다.

예외에 대해 예외 메시지를 emit하는 예시

catch 중간 연산자는, 예외 투명성에 의무가 있어 오직 upstream 예외만 처리합니다. 
즉, flow에서 예외는 아래로 흐를 때 잡기 때문에, catch 함수의 위치 기준으로 아래쪽 체이닝에서 발생한 예외는 잡을 수 없습니다.

catch 함수 아래에서 발생한 예외는 catch가 잡을 수 없음

마찬가지 이유로, collect 연산자는 최종 연산자라 downstream이 되기 때문에 collect에서 발생한 예외 역시 처리할 수 없습니다.

catch 연산자는 collect 최종 연산자의 예외를 처리할 수 없음

이 경우에는 아래와 같이 onEach를 활용해 선언적으로 예외를 잡을 수 있습니다. 

collect에 있던 check 로직을 중간 연산자로 올려 catch에 의해 예외 잡을 수 있도록 함

Flow 완료

flow가 완료되면, 어떠한 액션을 수행할 수 있어야 합니다. 액션을 수행하는 방법에는 명령형 또는 선언형 방법이 있습니다.

finally 블록

finally 블록 활용

선언적 핸들링

onCompletion 연산자 활용

onCompletion의 장점은 null이 가능한 Throwable 파라미터를 통해 flow가 정상 종료인지 예외로 인한 종료인지 여부를 파악할 수 있다는 점입니다.
fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}

 

Launching flow(launchIn)

collectflow가 완료될 때까지 코루틴을 중단하는 중단 연산입니다. collectlaunch 빌더로 래핑(wrapping)하면 flow를 다른 코루틴에서 처리할 수 있습니다.

flow의 확장 함수인 launchIn을 사용하면, 유일한 인자로 스코프를 받아 collect를 새로운 코루틴에서 시작할 수 있습니다.

launchIn을 사용하면, 새로운 코루틴에서 실행됨

참고 자료

 

728x90
반응형