아파치 카프카 버전 0.11 이전까지는 명령줄 프로그램으로만 가능했던 관리 기능이 0.11부터 추가된 AdminClient를 통해 프로그램적으로 가능해졌다. AdminClient를 활용하면 토픽 목록 조회, 생성, 삭제, 클러스터 상세 정보 확인, 컨슈머 그룹 관리 등 다양한 기능을 이용할 수 있다.
비동기적이고 최종적 일관성 가지는 API
AdminClient는 비동기적(asynchronous)으로 작동한다. AdminClient 내 각 요청은 클러스터 컨트롤러로 전송된 뒤 바로 1개 이상의 Future 객체를 리턴하는 식으로 동작한다. AdminClient는 Future 객체를 Result 객체 안에 감싸며, Result 객체는 작업이 끝날 때까지 대기하거나 작업 결과에 대해 쓰이는 작업을 수행하는 헬퍼 메소드를 가지고 있다.
카프카 컨트롤러부터 브로커로 메타데이터 전파는 비동기적으로 이루어지며, AdminClient API가 리턴하는 Future 객체들은 컨트롤러의 상태가 완전히 업데이트된 시점에 완료된 것으로 간주한다. 따라서, 완료 시점에 모든 브로커가 전부 다 새로운 상태에 대해 알고 있지 못할 수 있다. 최종적으로는 모든 브로커가 새로운 상태에 대해 알게되겠지만, 그 시점이 언제인지는 아무도 보장할 수 없다(최종적 일관성(eventual consistency)).
수평 구조
모든 어드민 작업은 KafkaAdminClient에 구현된 아파치 카프카 프로토콜을 사용한다. 여기는 객체 간 의존 관계나 네임스페이스 같은게 없다. 이로 인해 인터페이스가 좀 크긴 하지만, 어드민 작업을 하고자 JavaDoc 문서를 통해 필요한 메소드를 찾을 때 한눈에 필요한 메소드를 찾을 수 있다(여기에 없는 기능이라면, 아직 구현 전이라고 보면 됨).
수행되는 브로커는 작업 종류에 따라 다름
클러스터의 상태를 변경하는 작업(create, delete, alter)은 컨트롤러에 의해 수행된다. 하지만, 클러스터 상태를 읽기만 하는 작업(list, describe)은 어느 브로커에서나 수행될 수 있다. 이로 인해 읽기만 하는 작업은 클라이언트 입장에서 가장 부하가 적어보이는 브로커에서 이루어진다.
클러스터 상태 변경은 컨트롤러에서 이루어지다보니, 상태 변경 직후 바로 해당 내용을 조회하려고 하면 아직 변경 내용이 반영되지 않은 브로커를 통해 응답이 올 수 있어서 예상치 못한 결과가 나올 수 있다.
AdminClient 사용법
AdminClient를 사용하기 위해서는 AdminClient을 생성, 설정 및 닫기를 할 수 있어야 한다.
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// admin 작업 수행
admin.close(Duration.ofSeconds(30));
AdminClient를 생성(create)할 때는 Properties 객체를 인수를 받는다. 이 객체에 반드시 있어야 하는 설정은 연결하고자 하는 클러스터 URI(연결할 브로커 목록을 쉼표로 구분한 내용) 하나 뿐이다. admin 작업을 다 끝냈을 때는 close()를 호출하여 연결을 닫을 수 있고, close() 메소드의 인자로는 timeout 정보를 넣어서 아직 끝나지 않은 admin 작업이 있다면 해당 시간만큼은 기다려준다. timeout 설정을 하지 않은채로 종료할 경우 진행중인 admin 작업 완료를 무한정 기다린다.
client.dns.lookup
apache kafka 3.1
AdminClient를 생성할 때 연결 대상으로 잡은 클러스터 URI 정보는 호스트명을 기준으로 연결을 검증하고, 해석, 생성한다. 이로 인해 아래와 같은 상황에서 제대로 연결이 되지 않을 수 있어서 client.dns.lookup이라는 설정이 아파치 카프카 2.1.0에 추가되었다.
DNS 별칭을 사용하는 경우
브로커들 각각이 서로 다른 DNS로 구성되어 있을 때, 모든 브로커들을 부트스트랩 서버 설정에 일일이 지정하는 것보다 모든 브로커들을 가리킬 하나의 DNS alias를 설정할 수 있다.
이때, SASL을 사용해서 인증을 하려고 하면, DNS alias를 기준으로 인증을 시도하는데 서버의 보안 주체는 각 브로커의 실제 DNS이다보니 호스트명이 일치하지 않아 악의적인 사용자의 중간자 공격일 수 있다고 판단해 SASL은 인증을 거부하고 연결이 실패한다.
이때는 client.dns.lookup=resolve_canonical_bootstrap_servers_only 설정을 잡아주면 DNS 별칭을 펼쳐서 해당 별칭에 포함된 모든 브로커 이름을 부트스트랩 서버 목록에 넣어준 것과 동일하게 작동하게 된다.
다수의 IP 주소로 연결되는 DNS 이름을 사용하는 경우
기본적으로 카프카 클러스터는 해석된 첫 번째 호스트명으로 연결을 시도한다.
다수의 IP 주소로 연결된 DNS 이름으로 부트스트랩 서버 설정을 진행할 경우, 해석된 IP 주소가 사용 불능이면 브로커가 멀쩡하게 작동해도 클라이언트는 연결에 실패할 수 있다.
이로 인해 클라이언트가 로드 밸런싱 계층의 고가용성을 활용할 수 있도록 client.dns.lookup=use_all_dns_ips를 활용할 수 있다.
request.timeout.ms
애플리케이션이 AdminClient 응답을 기다릴 수 있는 시간의 최대값을 정의한다. 이 시간에는 클라이언트가 재시도 가능한 에러일 때 재시도 하는 시간까지 포함되어 있다. 기본값은 120초이다.
토픽 관리 기능
토픽 존재하는지 확인 후 없으면 생성
카프카 핵심 가이드 5.3 필수적인 토픽 관리 기능 내 코드 예제describeTopics() JavaDoc
❷ DescribeTopicResult에 대해 values() 메소드를 호출하면, DescribeTopicResult 갖고 있는 key는 토픽명, value는 토픽에 대한 상세 정보를 가진 Future 객체로 하는 Map을 반환하고, 이때 get(TOPIC_NAME)을 하면 Map에서 해당하는 토픽의 Future 객체를 조회하게 된다. 이후 get()을 한 번 더 호출함으로써 Future 객체의 응답을 받아올 때까지 블로킹된다.
이때, 토픽이 존재하지 않거나 서버가 제대로 요청 처리하지 못할 경우 대응할 수 있도록 try로 감싸져있다.
❸ 토픽이 존재하면 토픽에 속한 파티션 목록을 담은 TopicDescription을 ❷번을 통해 받아온다.
❹ AdminClient의 result 객체는 카프카가 에러 응답을 보낼 경우 ExecutionException을 발생시킨다. 자세한 에러 원인을 보기 위해서는 ExecutionException의 cause를 확인해야 한다.
createTopics() JavaDoc
❺ 여기까지 로직이 흘러오면 토픽이 없는 상태인 것이므로, 토픽을 생성한다. 토픽 생성 메소드 역시 Future 객체를 포함한 객체를 반환한다.
❻ 토픽 생성 후 Future 객체 응답을 기다려서(get() 호출) ❸에서 토픽 존재시 확인했던 파티션 목록을 확인한다.
Future 객체를 블로킹하지 않는 방법
앞선 예시는 Future 객체에 블로킹 방식으로 작동하는 get()을 호출한다. 일반적으로 어드민 작업은 드물고, 작업이 성공하거나 타임아웃날 때까지 기다리는 것이 보통 부담되지 않기 때문이다. 하지만, 많은 어드민 요청을 처리할 것으로 예상된다면, 카프카 응답이 올 때까지 서버 스레드가 블록되는 것은 방지하고 싶을 수 있다. 이때를 위해 KafkaFuture는 작업 완료 시 수행되는 콜백함수를 지정할 수 있다.
카프카 핵심 가이드 5.3 필수적인 토픽 관리 기능 내 코드 예제
❸ 이전 예시처럼 describeTopics()를 호출하면 응답이 들어있는 Future 객체를 받아온다.
KafkaFuture의 whenComplete() JavaDoc
❹ DescribeTopicsResult에 .values().get(topic) 함수를 호출하면 해당 토픽에 대한 정보를 가진 KafkaFuture<TopicDescription>을 반환한다. KafkaFuture 클래스 내에는 whenComplete() 함수를 제공하는데, 이 함수는 future가 완료되면 파라미터로 전달한 함수를 유발한다.
❺ 이 콜백함수에는 예외 발생 시 예외를 담을 Throwable 객체가 있고, Future가 예외를 발생시키면서 완료되면 이 if문 내로 들어가서 동작한다.
위와 같이 구성이 되면, 카프카 응답이 올 때까지 블로킹되지 않고 그 사이에 다른 요청을 처리할 수 있게 된다.
설정 관리
(토픽 설정 관리는 앞선 예시들과 사용하는 메소드만 다르므로 생략)
컨슈머 그룹 관리
카프카는 다른 메시지 큐와 다르게, 이전에 데이터를 읽어서 처리한 것과 동일한 순서로 데이터를 재처리할 수 있다. 이를 위해 AdminClient는 프로그램적으로 컨슈머 그룹과 이 그룹들이 커밋한 오프셋을 조회하고 수정하는 메소드를 제공한다.
listConsumerGroupOffsets() 메소드를 활용하면 컨슈머 그룹의 정보들을 조회할 수 있고, listOffsets() 메소드를 통해 토픽 파티션별 마지막으로 커밋된 오프셋 값을 얻어올 수 있다. 이 두 메소드를 활용하면 컨슈머 그룹이 각 파티션별로 LAG이 얼만큼 있는지 판단이 가능하다.
카프카 핵심 가이드 5.5 컨슈머 그룹 관리 내 코드 예제
❶ 컨슈머 그룹이 사용중인 모든 토픽 파티션을 key로 가지고 있고, 각 토픽 파티션에 대해 마지막으로 커밋된 오프셋을 value로 가진 맵을 가져온다. listConsumerGroupOffsets()은 describeConsumerGroups()와 다르게 하나의 컨슈머 그룹만 인자로 넘길 수 있다.
❷ 결과로 받은 각 토픽 파티션에 대해 마지막 오프셋을 받아야하므로, listOffsets() 요청에 담을 파티션으로 추가한다.
❸ 모든 파티션들을 루프를 돌면서 각 파티션별 마지막 커밋된 오프셋과 마지막 오프셋 사이의 차이를 출력하여 LAG을 계산한다.
컨슈머 그룹 수정하는 방법
AdminClient는 컨슈머 그룹을 수정하기 위한 메소드를 가지고 있으며(그룹 삭제, 멤버 제외, 커밋된 오프셋 삭제/변경), 비상 상황일 때 복구를 위한 툴로써 사용한다.
alterConsumerGroupOffsets() JavaDoc
오프셋 토픽의 오프셋 값을 변경하여도 컨슈머 그룹에 변경 여부가 전달되지 않으며, 컨슈머 그룹은 컨슈머가 새로운 파티션을 할당받거나 새로 시작할 때만 오프셋 토픽에 저장된 값을 읽어온다. 따라서, 컨슈머가 모르는 오프셋 변경을 방지하기 위해 카프카에서는 현재 작업이 수행되고 있는 컨슈머 그룹에 대해서는 오프셋 수정을 허용하지 않는다.
테스트
아파치 카프카는 원하는 수만큼 브로커를 설정해서 초기화할 수 있는 MockAdminClient 테스트 클래스를 제공한다. 이 클래스를 사용하면 실제 카프카 클러스터를 돌려서 실제 어드민 작업 수행할 필요 없이 애플리케이션이 정상 작동하는지 확인할 수 있다.
MockAdminClient는 카프카 API의 일부는 아니기 때문에, 언제든 사전 경고 없이 변경될 수 있다. 따라서, 이 클래스를 이용한 테스트의 편리함과 API가 바뀌어서 테스트 깨질 위험 사이에는 약간의 트레이드오프가 있다는 점을 명심해야 한다. (모든 메소드가 목업되어 있지도 않다)
public TopicCreator(AdminClient admin) {
this.admin = admin;
}
// Example of a method that will create a topic if its name starts with "test"
public void maybeCreateTopic(String topicName)
throws ExecutionException, InterruptedException {
Collection<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(topicName, 1, (short) 1));
if (topicName.toLowerCase().startsWith("test")) {
admin.createTopics(topics);
// alter configs just to demonstrate a point
ConfigResource configResource =
new ConfigResource(ConfigResource.Type.TOPIC, topicName);
ConfigEntry compaction =
new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
Collection<AlterConfigOp> configOp = new ArrayList<AlterConfigOp>();
configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> alterConf =
new HashMap<>();
alterConf.put(configResource, configOp);
admin.incrementalAlterConfigs(alterConf).all().get();
}
}
@Before
public void setUp() {
Node broker = new Node(0, "localhost", 9092);
this.admin = spy(new MockAdminClient(Collections.singletonList(broker), broker));
// without this, the tests will throw
// `java.lang.UnsupportedOperationException: Not implemented yet`
AlterConfigsResult emptyResult = mock(AlterConfigsResult.class);
doReturn(KafkaFuture.completedFuture(null)).when(emptyResult).all();
doReturn(emptyResult).when(admin).incrementalAlterConfigs(any());
}
@Test
public void testCreateTestTopic()
throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(admin);
tc.maybeCreateTopic("test.is.a.test.topic");
verify(admin, times(1)).createTopics(any());
}
@Test
public void testNotTopic() throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(admin);
tc.maybeCreateTopic("not.a.test");
verify(admin, never()).createTopics(any());
}
아파치 카프카는 MockAdminClient를 test jar에 담아서 공개하므로, maven 기준 pom.xml에 테스트 디펜던시를 포함해야 MockAdminClient를 활용할 수 있다
컨트롤러
컨트롤러는 일반적인 카프카 브로커의 기능에 더해 파티션 리더를 선출하는 역할을 추가적으로 맡는다.
클러스터에서 가장 먼저 시작되는 브로커는 주키퍼의 /controller에 Ephemeral 노드를 생성함으로써 컨트롤러가 된다. 다른 브로커 역시 시작할 때 해당 위치에 노드 생성을 시도하나 '노드가 이미 존재함' 예외를 받고 컨트롤러 노드가 이미 존재함을 인지하게 된다.
브로커들은 주키퍼의 컨트롤러 노드에 변동이 생겼을 때 알림을 받기 위해 이 노드에 와치를 설정한다. 이렇게 함으로써 클러스터 내 단 한개의 컨트롤러만 있도록 보장한다.
컨트롤러 브로커가 멈추거나 주키퍼와 연결이 끊어지면 Ephemeral 노드가 삭제되며, 이 때 클러스터 내 다른 브로커들은 주키퍼에 설정된 와치를 통해 컨트롤러가 없어졌다는 것을 인지하게 되며 주키퍼에 컨트롤러 노드를 다시 생성하려고 시도한다. 이때 주키퍼에 가장 먼저 새로운 노드를 생성한 브로커가 새로운 컨트롤러 역할을 맡게 된다.
브로커는 새로운 컨트롤러가 선출될 때마다 주키퍼의 조건적 증가(conditional increment) 연산에 의해 증가된 에포크(epoch) 값을 전달받게 된다. 브로커는 현재 컨트롤러의 에포크 값을 알고 있어서, 이전 세대 에포크 값을 가진 컨트롤러의 메시지를 받으면 무시한다(스플릿 브레인(split brain)) 현상 방지).
KRaft: 카프카의 새로운 래프트 기반 컨트롤러
주키퍼 기반 컨트롤러로부터 벗어나서 래프트 기반 컨트롤러 쿼럼으로 옮겨가기 위해 아파치 카프카 2.8에 KRaft 컨트롤러 프리뷰 버전이 등장했다. 3.3부터는 정식 프로덕션 환경에서 사용 가능한 기능으로써 KRaft 기반 컨트롤러가 릴리즈되었다.
컨트롤러를 교체하게 된 이유는 아래와 같다.
컨트롤러가 주키퍼에 메타데이터 쓰는 작업은 동기적이지만, 브로커 메시지를 보내는 작업은 비동기적으로 이루어진다. 또한 주키퍼로부터 업데이트 받는 과정 역시 비동기적이다. 이로 인해 브로커, 컨트롤러, 주키퍼 간 메타데이터 불일치가 발생할 수 있고, 인지하기 어렵다.
컨트롤러가 재시작될 때마다 주키퍼로부터 모든 브로커와 파티션에 대한 메타데이터를 읽어와야 하며, 그 후 이 메타데이터를 모든 브로커로 전송한다. 이 부분은 파티션과 브로커 수가 증가하면 컨트롤러 재시작이 느려질 수 있어 병목이다.
메타데이터 소유권 관련 작업을 일부는 컨트롤러가 하고 일부는 브로커, 주키퍼가 한다(한 곳으로 정리가 안됨).
주키퍼 자체도 하나의 분산 시스템이라 카프카와 마찬가지로 운영을 위해서는 기반 지식이 필요하다.
복제
복제는 카프카 아키텍처의 핵심이다. 카프카는 '분산되고, 분할되고, 복제된 커밋 로그 서비스'로 표현되기도 한다. 개별적인 노드에 필연적으로 장애가 발생할 수 밖에 없는 상황에서 복제는 카프카가 신뢰성과 지속성을 보장하는 방식이다.
리더 레플리카
각 파티션에는 리더 역할을 하는 레플리카가 하나씩 있다. 일관성을 보장하기 위해, 모든 쓰기 요청은 리더 레플리카로 요청된다.
반면, 읽기 요청은 관련 설정을 켰다면, 리더 레플리카나 팔로워로부터 레코드를 읽어올 수 있다.
리더는 각각의 팔로워 레플리카가 최신 상태로 유지되고 있는지 확인하여 동기화가 제대로 되고 있는지 체크한다.
팔로워 레플리카
파티션에 속한 모든 레플리카 중 리더 레플리카를 제외한 나머지를 팔로워 레플리카라고 한다.
별로 설정 없이는 팔로워는 클라이언트 요청을 설정할 수 없으며, 팔로워가 주로 하는 일은 리더 레플리카로 들어온 최근 메시지들을 복제하여 최신 상태를 유지하는 것이다. 이후 파티션의 리더 레플리카에 크래쉬가 나면, 팔로워 중 하나가 파티션의 새 리더 파티션으로 승격된다.
팔로워로부터 읽기 KIP-392에서 팔로워 레플리카로부터 읽기 기능이 추가되었다. 이 기능의 목표는 클라이언트가 리더 레플리카 대신 가장 가까이 있는 인-싱크 레플리카로부터 읽음으로써 네트워크 트래픽 비용을 줄이는 것이다. 이 기능을 사용하려면, client.rack 컨슈머 설정값을 잡아야 하며, 브로커 설정 중 replica.selector.class(기본값: LeaderSelector - 항상 리더로부터 읽음를 잡아 주어야 한다.
클라이언트가 팔로워 레플리카로부터 메시지를 읽어와도 커밋된 메시지만 읽도록 확장되었기 때문에, 팔로워로부터 메시지를 읽어도 기존과 동일한 신뢰성이 보장된다.
아웃-오브-싱크 레플리카와 인-싱크 레플리카
리더 레플리카와의 동기화를 유지하기 위해 팔로워 레플리카들은 리더 레플리카에 읽기 요청을 보낸다. 리더 레플리카는 각 팔로워 레플리카가 마지막으로 요청한 오프셋 값을 확인해 각 팔로워 레플리카가 얼마나 뒤쳐져 있는지 알 수 있다.
만약 팔로워 레플리카가 일정 시간 이상 동기화가 안되면 아웃-오브-싱크 레플리카로 간주하며, 동기화가 풀린 팔로워 레플리카는 리더 레플리카에 장애가 발생했을 때 리더가 승격될 수 없다(인-싱크 레플리카만이 파티션 리더로 선출될 수 있음).
팔로워 레플리카가 동기화 풀린 것으로 판정될 때까지 걸리는 시간은 replica.lag.time.max.mx 설정 매개변수에 의해 결정된다.
선호 리더(preferred leader)
선호 리더란 토픽이 처음 생성되었을 때 리더 레플리카로 선정되었던 레플리카이다. 파티션이 처음 생성되던 시점에서의 리더 레플리카는 모든 브로커에 걸쳐 균등하게 분포되므로 선호라는 표현이 붙었다. 기본적으로 카프카는 auto.leader.rebalance.enable=true 설정이 되어 있어서 선호 리더가 현재 리더가 아니면 현재 리더와 동기화 되고 있을 때 리더 선출을 실행시킴으로써 선호 리더를 현재 리더로 만들어 준다.
선호 리더 찾기 선호 리더를 식별하는 가장 좋은 방법은 파티션 레플리카 목록을 살펴보는 것이다. kafka-topics.sh 툴이 출력하는 파티션과 레플리카 상세 정보를 보면, 목록에 표시된 첫 번째 레플리카가 선호 리더이다.
요청 처리
브로커는 연결을 받는 각 포트별로 acceptor 쓰레드를 하나씩 실행시킨다. acceptor 쓰레드는 연결을 생성하고, 들어온 요청을 network 쓰레드에 넘겨 처리할 수 있도록 한다. network 쓰레드의 수는 직접 설정이 가능하며, processor 쓰레드라고도 불린다. network 쓰레드는 클라이언트 연결을 통해 들어온 요청을 요청 큐에 넣고, 응답 큐에서 응답을 가져다 클라이언트로 보낸다. 클라이언트로 보낼 응답에 지연이 필요할 때에는 퍼거토리(purgatory)에 저장했다가 보낸다.
위 과정을 통해 요청이 요청 큐에 들어오면 I/O 스레드(request handler 쓰레드라고도 불림)가 요청을 처리하느 ㄴ일을 담당한다.
아파치 카프카 내부의 요청 처리
클라이언트는 어디로 요청을 보내는지 어떻게 알까?
카프카 클라이언트는 메타데이터 요청이라 불리는 또 다른 유형의 요청을 사용한다. 모든 브로커는 이 정보를 포함한 메타데이터 캐시를 가지고 있기 때문에 아무 브로커에나 메타데이터 요청을 해도 된다. 클라이언트는 메타데이터 요청을 통해 메타데이터 정보를 캐싱해두었다가, 이 정보를 이용해 각 파티션 리더 역할을 맡는 브로커에 쓰기/읽기 요청을 보낸다.
클라이언트에 캐싱된 메타데이터 정보는 아래 상황일 때 갱신된다.
클라이언트 요청 라우팅
metadata.max.age.ms 설정 매개변수에 설정된 값에 따라 새로운 메타데이터 요청을 보내서 갱신한다.
위 주기가 아니더라도, Not a Leader 에러를 리턴받으면 요청을 재시도하기 전에 메타데이터를 먼저 새로고침한다.
위 에러는 클라이언트가 이미 만료된 정보를 사용중이고, 잘못된 브로커에 요청을 했다는 것을 의미한다.
쓰기 요청
acks 설정 매개변수는 쓰기 작업이 성공한 것으로 간주되기 전 메시지에 대한 응답을 보내야 하는 브로커 수를 가리킨다. 파티션의 리더 레플리카를 가지고 있는 브로커가 쓰기 요청을 받으면 아래와 같은 유효성 검증을 한다.
데이터를 보내고 있는 사용자가 토픽에 대한 쓰기 권한이 있는가?
요청에 지정된 acks가 올바른가? (0, 1, all)
acks가 all로 잡혀있다면, 메시지를 안전하게 쓸 수 있을 만큼 충분한 인-싱크 레플리카가 있는가?
인-싱크 레플리카 수가 설정된 값 아래로 내려가면 새로운 메시지 받지 않도록 브로커 설정 가능
유효성 검증이 끝나면 브로커는 새 메시지를 로컬 디스크에 쓰며, 브로커는 acks 설정에 따라 응답을 내려보낸다. 만약, 0이나 1이 아닌 all로 설정되어 있다면 요청을 퍼거토리(버퍼)에 저장했다가 팔로워 레플리카들이 메시지를 복제한 것을 확인한 후 클라이언트에 응답을 돌려보낸다.
읽기 요청
충분한 데이터가 누적될 때까지 브로커가 응답 유보
클라이언트는 브로커에 토픽, 파티션, 오프셋 목록에 해당하는 메시지들을 보내 달라는 요청을 보낸다. 클라이언트에 캐싱된 메타데이터를 기반으로 알맞은 브로커에 요청을 전송하게 되고, 유효한 요청이면 응답을 한다.
이때, 클라이언트가 오래되어 파티션에 이미 삭제된 메시지나 아직 존재하지 않는 오프셋 메시지를 요청하면 브로커는 에러를 응답한다. 존재하는 오프셋을 기반으로 요청하면, 브로커는 파티션으로부터 클라이언트가 요청에 지정한 크기 한도만큼의 메시지를 읽어서 클라이언트에 보내준다.
브로커가 리턴할 데이터 양의 상한과 클라이언트가 받을 데이터의 하한을 지정함으로써, 데이터를 주고받는 횟수를 최적화하여 오버헤드를 줄일 수 있도록 한다.
컨슈머는 모든 인-싱크 레플리카에 쓰여진 메시지들만 읽을 수 있다.
파티션 리더에 존재하더라도 모든 데이터를 클라이언트가 읽을 수 있는 건 아니다. 대부분의 클라이언트는 모든 인-싱크 레플리카에 쓰여진 메시지들만 읽을 수 있다.
기타 요청
새 브로커는 하위 버전의 요청을 처리할 수 있지만, 그 반대는 아니다. 따라서, 클라이언트를 업데이트하기 전에 브로커들을 먼저 업그레이드 하길 권한다.
물리적 저장소
카프카의 기본 저장 단위는 파티션 레플리카이다. 파티션은 서로 다른 브로커들 사이에 분리될 수 없고, 단일 브로커 내 서로 다른 디스크에 분할 저장도 불가하다. 따라서, 파티션의 크기는 특정 마운트 지점에 사용 가능한 공간에 제한을 받는다.
계층화된 저장소
계층화된 저장소 기능은 카프카 클러스터의 저장소를 로컬과 원격 두 계층으로 나누는 것이다.
로컬 계층: 현재의 카프카 저장소 계층과 똑같이 로컬 세그먼트를 저장하기 위해 카프카 브로커의 로컬 디스크를 사용한다.
원격 계층: 완료된 로그 세그먼트를 저장하기 위해 HDFS나 S3 같은 전용 저장소 시스템을 사용한다.
사용자는 계층별로 서로 다른 보존 정책을 설정할 수 있어서, 로컬 저장소가 상대적으로 보통 비싸기 떄문에 대게 몇 시간 이하로 설정하고 원격 계층의 보존 기한은 그보다 길게 설정한다.
원격 계층에 저장되는 로그 세그먼트들은 조회 시, 브로커로 복원되지 않고 원격 계층에서 바로 클라이언트에 전달(네트워크 경로를 통해 HDFS나 S3에서 데이터 읽음)된다. 따라서, 로컬 계층 읽는 것과 리소스 경합이 없어서 오래된 데이터를 읽을 때 기존 구조보다 더 빠르게 읽어올 수 있다.
계층화된 저장소 기능은 무한한 저장 공간, 더 낮은 비용, 탄력성 뿐 아니라 오래된 데이터와 실시간 데이터 읽는 작업을 분리시키는 기능이 있다.
파일 관리
카프카는 토픽에 대해 보존 기한(retention period)을 설정하고, 해당 기한이 지난 메시지는 지우는 방식을 사용한다. 큰 파일에서 삭제할 메시지를 찾아 지우는 작업은 시간이 오래걸리고, 에러 가능성도 높아서 하나의 파티션을 여러 개의 세그먼트로 분할하여 사용한다. 기본적으로 각 세그먼트는 1GB 데이터 또는 최근 1주일치 데이터 중 적은 쪽만큼 저장한다.
각 세그먼트 한도가 다 차면 세그먼트를 닫고 새 세그먼트를 생성하며, 현재 쓰여지고 있는 세그먼트를 액티브 세그먼트(active segment)라고 한다. 액티브 세그먼트는 어떠한 경우에도 삭제되지 않으므로, 로그 보존 기한이 하루여도, 액티브 세그먼트에 5일치 데이터가 있다면 해당 데이터는 제거되지 않는다.
카프카 브로커는 각 파티션의 모든 세그먼트에 대해 파일 핸들을 열기 때문에(액티브 세그먼트가 아닌 세그먼트도 연다), 사용중인 파일 핸들 수가 매우 높게 유지될 수 있어 운영체제를 튜닝해야 할 수도 있다.
파일 형식
각 세그먼트는 하나의 데이터 파일 형태로 저장되며, 디스크에 저장되는 데이터 형식은 사용자가 프로듀서를 통해 브로커로 보내고, 컨슈머가 받는 메시지 형식과 동일하다. 네트워크를 통해 전달되는 형식과 디스크에 저장되는 형식을 통일함으로써 카프카는 컨슈머에 메시지 전소 시 제로카피 최적화를 달성할 수 있다. 또한, 프로듀서가 압축하여 메시지를 전송했을 때 압축 해제 및 재압축을 하지 않아도 된다.
압착
데이터의 종류에 따라 같은 키에 대해 과거 데이터가 필요할 수도 있고, 불필요할 수도 있다.
key: 고객ID, value: 고객주소 - 가장 마지막 메시지만 의미가 있음(과거 주소는 더 이상 유효하지 않음)
key: 가게번호, value: 상태정보 - 상태가 변할때마다 메시지가 쓰여질 때, 크래시가 발생하면 크래시 발생 직전의 마지막 상태부터 필요함(그 이전 기록은 관심없음)
카프카는 삭제(delete), 압착(compact)라는 두 가지 보존 정책을 허용함으로써 위 사례들을 모두 지원한다.
delete 보존 정책: 지정된 보존 기한보다 오래된 이벤트를 삭제한다.
compact 보존 정책: 토픽에서 각 키의 가장 최근값만 저장하도록 한다. compact 정책은 키가 null인 메시지가 있을 경우 실패하므로 유의해야 한다.
보존 기한과 압착 설정을 동시에 적용하는 delete, compact으로도 설정이 가능하다. 이때는 지정된 보존 기한보다 오래된 메시지들은 키에 대한 가장 최근값이어도 삭제된다.
압착 작동 원리
클린, 더티 영역을 모두 갖는 파티션
세그먼트 내에는 위 사진처럼 2개의 영역으로 나뉜다.
클린(clean): 이전에 압착된 적이 있던 메시지들이 저장된다. 이 영역에는 하나의 키마다 단일 값만 포함되어 있다.
더티(dirty): 마지막 압착 작업 이후 쓰여진 메시지들이 저장된다.
카프카에 압착 기능이 활성화(log.cleaner.enabled)가 되어 있을 때, 각 브로커는 압착 매니저 쓰레드와 함께 다수의 압착 쓰레드를 실행시킨다. 이 쓰레드는 전체 파티션 크기 대비 더티 메시지의 비율이 높은 파티션을 골라서 압착한 뒤 클린 상태로 만든다.
삭제된 이벤트
compact 보존 정책을 사용하면, 키에 대한 가장 최근 메시지는 남아있기 때문에, 시스템에서 특정 키를 완전히 삭제하려면 해당 키에 대해 값을 null로 갖는 메시지를 보내주면 된다. 클리너 쓰레드가 이 메시지를 발견하면, 평소처럼 압착을 한 뒤 null 값을 갖는 메시지만 보존하게 된다. 카프카는 사전에 설정된 기간동안 이 특별한 메시지(톰스톤(tomstone))를 보존하며, 이 메시지를 통해 해당 키가 삭제되었음을 인지할 수 있다.
예를 들어, 컨슈머가 카프카에서 읽어온 데이터의 값이 null이면 RDB에서 해당 키와 관련된 데이터를 다 제거하도록 로직을 구현할 수도 있음
참고로, 카프카 어드민 클라이언트의 deleteRecords() 메소드는 톰스톤과는 다른 방식이다. deleteRecords() 메소드는 오프셋 위치를 변경하는 식으로 움직여 사실상 접근 불가능한 레코드로 만드는 방식으로 실제 데이터 삭제는 이후 클리너 쓰레드에 의해 진행된다. 클리너 쓰레드에 의해 제거가 될 수 있도록 하기 위해 deleteRecords()는 보존 기한이나 압착 설정이 된 토픽에 대해 사용 가능한 메소드이다.
토픽 압착 시점
압착 정책 역시 액티브 세그먼트에는 작동하지 않으며, 닫힌 세그먼트에 한하여 압착된다. 기본적으로 카프카는 토픽 내용물의 50% 이상이 더티 레코드일 때만 압착이 시작된다. 이는 너무 자주 압착하여 토픽의 쓰기/읽기 성능에 영향을 주는 것을 방지하면서 너무 많은 더티 레코드 존재를 막기 위함이다.