목표
- 채널의 기념 개념과 사용법, 채널을 이용한 데이터 전송 이해
- 셀렉트 표현식의 개념, 셀렉트를 이용한 비동기 작업 처리 이해
채널
채널은 송신자와 수신자의 수에 제한이 없으며, 채널을 통해 전송된 모든 값은 한 번만 받을 수 있습니다.
채널은 누군가가 물품을 기부를 하면 아름다운 가게에 해당 물품이 전시되고, 그 물품은 여러 사람들 중 한 사람에게만 판매될 수 있다는 점에서 아름다운 가게와 비슷하다고 볼 수 있습니다.
Channel은 아래와 같이 SendChannel와 ReceiveChannel를 구현한 인터페이스입니다.
- SendChannel: 원소를 보내거나 채널을 닫는 용도로 사용
- ReceiveChannel: 원소를 받을 때 사용
SendChannel 인터페이스의 send 함수와 ReceiveChannel 인터페이스의 receive 함수는 모두 중단 함수로 구현되도록 작성되어 있습니다. 따라서, 채널에 원소를 보내거나 채널을 통해 원소를 받는 행위는 모두 중단 함수여야 합니다.
- receive 함수 호출 시, 채널에 원소가 없다면 코루틴은 원소가 들어올 때까지 중단됩니다.
- send 함수 호출 시, 채널의 용량이 가득 차 빈 공간이 없다면 중단됩니다.
만약 중단 함수가 아닌 함수로 send하거나 receive하려면 trySend, tryReceive 함수를 사용할 수 있습니다. 두 연산 모두 성공/실패에 대한 정보를 담은 ChannelResult를 즉시 반환하며 용량이 제한적인 채널에서만 사용이 가능합니다.
채널은 위와 같이 여러 생성자와 소비자를 가질 수 있으며, 보통 채널의 양 끝에 각각 하나씩 생성자와 소비자가 있는 경우가 일반적입니다.
채널에 데이터를 전송하고 데이터를 받는 행위는 아래와 같이 예시를 들어볼 수 있습니다.
- 보통 수신자는 송신자가 데이터를 얼만큼 보낼지 알 수 없기 때문에, 채널이 닫힐 때까지 원소를 받도록 구성합니다.
- 채널이 닫힐 때까지 원소를 받도록 구성하는 방법은 위 예시처럼 for 루프를 활용하거나 consumeEach 함수를 사용하면 됩니다.
- 수신자는 송신자가 채널을 닫으면 그 때부터 수신을 중단하기 때문에 위에서 channel.close()를 통해 명시적으로 채널을 종료했습니다.
위 예시는 정상적으로 채널이 동작하는 것 같지만, 송신자가 채널을 닫기 전에 오류가 발생하여 코루틴이 종료될 경우 문제가 생길 수 있습니다.
- 제대로 채널이 닫히지 않아 수신자가 데이터를 무한히 기다리게 될 수 있음
produce
이로 인해, 코루틴에서는 CoroutineScope의 확장함수로 produce 함수를 제공하며, produce 함수는 코루틴이 성공/실패 여부와 상관없이 코루틴이 종료되면 채널을 닫습니다.
위 예시에서 볼 수 있듯이 명시적으로 close 함수를 호출하지 않아도 정상적으로 채널 수신 로직이 완료되는 것을 확인할 수 있습니다.
produce 함수는 어떻게 채널을 닫을까?
produce 함수를 통해 어떻게 채널이 닫히는지 알기 위해 앞선 예제에서 send 하는 부분에 예외를 던지도록 예제를 수정하였습니다.
- 앞선 Channel의 경우 BufferedChannel 구현체가 동작하며, 이 때 close 함수가 호출되면 해당 함수에서 closeOrCancelImpl 함수를 호출하게 되며, 이 함수를 통해 채널을 닫습니다.
produce 함수는 빌더로 시작된 코루틴이 어떻게 종료되든 상관없이(끝나거나, 중단되거나, 취소되거나) 채널을 닫습니다.
따라서, close를 반드시 호출하게 되며, 채널을 만드는 가장 인기 있는 방법입니다.
채널 타입
채널은 설정한 용량 크기에 따라 4가지로 구분할 수 있습니다.
- 무제한(Unlimited): 제한이 없는 용량 버퍼를 가진 Channel.UNLIMITED로 설정된 채널로 send가 중단되지 않음
- 버퍼(Buffered): 특정 용량 크기 또는 Channel.BUFFERED(기본값은 64이며, JVM의 kotlinx.coroutines.channels.defaultBuffer 설정을 통해 오버라이딩 가능)로 설정된 채널
- 랑데뷰(Rendezvous): 용량이 0이거나 Channel.RENDEZVOUS로 설정된 채널로 송신자와 수신자가 만날 때만 원소를 교환함
- 융합(Conflated): 버퍼 크기가 1인 Channel.CONFLATED를 가진 채널로 새로운 원소가 이전 원소를 대체하여 가져가지 못한 원소는 모두 유실됨
버퍼 오버플로우 정책
채널의 버퍼가 가득 찼을 때 어떻게 정책을 가져갈지 커스텀하게 설정을 할 수 있습니다. 이때 사용하는 파라미터는 onBufferedOverflow이여 아래 정책 중 하나를 선택할 수 있습니다.
- SUSPEND(기본 옵션): 버퍼가 가득 찼을 때 send 메시지가 중단됨
- DROP_OLDEST: 버퍼가 가득 차면, 가장 오래된 원소가 제거됨
- DROP_LATEST: 버퍼가 가득 차면, 가장 최근 원소가 제거됨
앞서 봤던 채널 타입 중 하나인 Channel.CONFLATED는 capacity를 1로 설정하고 onBufferedOverflow를 DROP_OLDEST로 설정한 것임을 Channel 로직에서 확인할 수 있습니다.
참고
produce 함수는 onBufferOverflow를 설정할 수 없으므로, 오버플로우 옵션을 변경하려면 Channel 함수를 통해 채널을 직접 정의해야 합니다.
팬아웃(Fan-out)
여러 개의 소비자(코루틴)이 하나의 채널로부터 데이터를 받아올 수도 있습니다. 이 경우에는 consumeEach를 통해 데이터를 수신하지 않고 for 루프를 통해 데이터를 받아와야 합니다.
- consumeEach는 여러 개의 소비자가 사용하기에는 안전하지 않습니다.
- 위 예시에서 볼 수 있듯이, 여러 소비자가 있을 경우 원소는 공평하게 배분됩니다.
- 0, 1, 2번 순서로 돌아가면서 원소를 수신
팬인(Fan-in)
여러 개의 생성자(코루틴)이 하나의 채널로 원소를 전송할 수 있습니다. 아래 예시는 두 개의 생성자로부터 받은 데이터를 하나의 소비자가 수신하는 것입니다.
앞선 예시는 동일한 채널에 명시적으로 여러 송신자가 데이터를 보냈지만, 만약 각 송신자가 서로 다른 채널에 데이터를 보내고 이를 하나의 채널로 합쳐야 한다면 아래 예시처럼 produce 함수를 통해 여러 채널을 합치는 fanIn 함수를 사용해볼 수 있습니다.
파이프라인
하나의 채널에서 받은 원소를 다른 채널로 전송하는 것을 파이프라인이라고 부릅니다.
채널은 언제 사용할까?
채널은 서로 다른 코루틴이 통신할 때 유용합니다. 충돌이 발생하지 않고, 공평함을 보장할 수 있습니다.
- 채널을 사용하는 전형적인 예는 데이터가 한 쪽에서 생성되고 다른 쪽에서 데이터를 처리하는 것입니다.
셀렉트
select란, 여러 중단 함수들을 기다릴 때, 가장 먼저 완료되는 코루틴의 결과를 선택할 수 있게 해주는 함수입니다.
select 함수는 kotlin docs가 마지막으로 업데이트된 2024/08/15까지도 여전히 실험 기능입니다. select가 필요한 경우가 드물기 때문에 안정화되는 것은 어려울 수 있습니다.
select는 일반적인 코루틴에서도 사용할 수 있지만, 채널에서도 사용이 가능합니다. select 표현식에서 사용하는 주된 함수는 아래와 같습니다.
- onReceive: 채널이 값을 가지고 있을 때 선택됩니다. (receive 메소드처럼) 값을 받은 뒤 람다식의 인자로 사용합니다. onReceive가 선택되었을 때 select는 람다식의 결과값을 반환합니다.
- onReceiveCatching: 채널이 값을 가지고 있거나 닫혔을 때 선택됩니다. 값을 나타내거나 채널이 닫혔다는 것을 알려주는 ChannelResult를 받으며 이 값을 람다식의 인자로 사용합ㄴ디ㅏ.
- onSend: 채널의 버퍼에 공간이 있을 때 선택됩니다. (send 메소드처럼) 채널에 값을 보낸 뒤, 채널의 참조값으로 람다식을 수행합니다. onSend가 선택되면 select는 Unit을 반환합니다.
onReceive
- 위 예시는 500ms마다 "Fizz"를 송신하는 fizz와 1000ms마다 "Buzz!"를 송신하는 buzz를 select할 채널로 지정합니다.
- fizz와 buzz 중에 먼저 오는 것을 하나 선택하게 됩니다.
onReceiveCatching
onReceive는 채널이 닫히면 예외를 던지면서 select에 실패하게 됩니다. 이럴 때 사용할 수 있는 함수가 onReceiveCatching이며, 채널이 닫혔을 때에도 처리가 가능합니다.
onSend
onSend 구문은 biased nature of selection(편향적인 selection)에서 사용하면 좋습니다.
- 위 예시는 기본 채널의 소비자가 받을 수 없을 때 다른 쪽의 소비자(side)가 받도록 한 예시입니다.
궁금증
왜 아래 두 케이스는 결과에 차이가 있을까?
- capacity가 3인데 delay를 먼저 주면 4개를 send 하는 이유? (stackoverflow에 질문 남김)
- send를 하기 전에 println() 구문을 호출했기 때문에 마치 먼저 send한 것처럼 보이는 것입니다.
- send와 println() 구문의 순서를 바꾸면 동일하게 3개까지 보낸 후에 receive 한 뒤 send 하는 로그가 출력됨을 확인할 수 있습니다.
- consumeWith은 왜 여러 코루틴이 수신자일 때 사용하기 적합하지 않을까?
- fanIn 함수가 진짜 존재하나? (kotlin docs에 안 보임)
- 어떨 때 trySend나 tryReceive 사용해야 할까?
참고 자료
'PROGRAMMING LANGUAGE > KOTLIN' 카테고리의 다른 글
[Effective Kotlin] 8장 효율적인 컬렉션 처리 (0) | 2024.08.10 |
---|---|
[Effective Kotlin] 7장 비용 줄이기 (0) | 2024.08.03 |
[Effective Kotlin] 6장 클래스 설계 (0) | 2024.07.28 |
[Kotlin Coroutine] 코루틴 단위 테스트 (0) | 2024.07.27 |
[Kotlin Coroutine] Asynchronous Flow (5) | 2024.07.20 |