본문 바로가기
카프카

카프카 톺아보기 3-1. 컨슈머 개념

by pius712 2024. 5. 30.

커밋

커밋이란 컨슈머가 자신이 어디까지 메시지를 소비했는지 메시지의 오프셋(offset)을 기록하는 행위이다.

참고로 offset 을 커밋하는 방식은 여러가지가 있을 수 있다.

offset 을 자동으로 커밋할 수도 있고 수동으로 커밋할 수도 있다. 또한 커밋에 대한 결과를 동기적으로 받을 수도 있고 비동기로 받을 수도 있다.

아래 그림을 보면 poll 을 통해 1~500 개의 메시지를 가져온다. 그리고 offset 500을 커밋하게 된다.

이렇게 되면, 해당 파티션과 연결된 컨슈머 그룹의 컨슈머는 다음에는 501 오프셋을 가진 메시지를 소비하게 된다.

auto commit

자동으로 커밋을 수행하는 경우, auto.commit.interval.ms 설정에 의해서, 메시지들을 poll 한 후에 해당 시간이 흐른뒤에 로직이 수행을 마치는것과 관계 없이 자동으로 커밋하게 된다.

auto commit 으로 인한 문제점

auto commit 은 마치 자동으로 모든 것을 커밋해주기 때문에 문제가 없어보일 수 있다.

하지만 auto commit 으로 인해서 특정 메시지를 소비하지 못하는 상황이 발생할 수 있다.

일반적으로 poll 을 통해서 가져오는 레코드는 1개가 아니라 기본 500개로 설정되어 있는데, 아래 상황을 보자.

컨슈머가 300번까지 처리하고 컨슈머가 장애가 나서 서버가 재시작될 수 있다. 하지만 이미 500번까지의 메시지를 소비했기 때문에, 다음부터는 501 메시지를 읽게된다.

따라서, 301~499 까지의 메시지를 사실상 처리하지 못하게 된다.

명시적 commit

수동으로 커밋하는 경우, 명시적으로 커밋 로직을 작성해줘야 한다.

아래와 같이 빈을 설정해준다.

@Bean
fun listenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
    factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
    
    return factory
}

그리고 consumer listener 쪽에 아래처럼 수동으로 acknowledge 메서드를 호출한다.

@Component
class TestConsumer {
    @KafkaListener(topics = ["test"], groupId = "group_1")
    fun listen(userId: Long, acknowledgment: Acknowledgment) {
        println("Received Message: $userId")
        acknowledgment.acknowledge()
    }
}

컨슈머 그룹

컨슈머 그룹이란 특정 토픽의 데이터를 소비할 때, 동일한 로직을 공유하는 컨슈머 집합을 의미한다.

일반적으로 토픽의 파티션 개수와 동일한 수의 컨슈머를 구성하게 된다.

주의할 점은, 파티션은 하나의 컨슈머 그룹내의 컨슈머 1개만 매칭이 되기 때문에 파티션의 개수 < 컨슈머의 개수 일때, 매핑되지 않는 컨슈머는 유휴상태(idle) 상태라서 메시지를 소비하지 않는다.

그렇다면 컨슈머 그룹이라는 개념이 왜 필요할까? 많은 이유가 있겠지만 MSA 환경에서 특정 이벤트에 대해서 다른 서버들이 처리하게 되는데 이를 구분지어준다. 이를 통해 컨슈머 그룹간의 커플링을 해소와 SPOF 를 방지할 수 있게 해준다.

 

예를들어, 주문 이벤트가 발생한다고 가정해보자. 이때, 결제 서비스와 재고 서비스가 이에 따른 로직을 처리해야한다고 할 때 해당 이벤트를 하나의 컨슈머가 받게되면 어떻게 될까? 결제와 재고 서비스 로직이 하나의 컨슈머 내에 엉키게 되고 두 서비스 로직의 커플링이 증가하게 된다. 또한 해당 컨슈머 장애시 결제도 안되고 재고차감도 안되는 문제가 발생하게 된다.

이때, 두 서비스가 각자 컨슈머 그룹을 통해서 소비하게 되면, 위 문제가 해결되게 된다.

리밸런싱

컨슈머 그룹 내의 컨슈머가 추가되거나 컨슈머가 응답하지 않는 장애상태인 경우, 파티션에 매핑된 컨슈머를 조정하게 되는데 이를 리밸런싱이라고 한다.

예를들어, 컨슈머를 배포하는 경우 기존의 컨슈머가 중단되고 새로운 컨슈머가 동작할 때도 리밸런싱이 일어나게 된다.

heartbeat(하트비트)

그렇다면 컨슈머가 정상 동작하는지 어떻게 알까? 이것을 확인하기 위한 것이 바로 하트비트(heartbeat) 이다.

consumer 의 옵션을 통해 하트비트의 간격을 설정할 수 있다.

  • heartbeat.interval.ms : 하트비트를 보내는 간격 설정
  • session.timeout.ms : 해당 시간동안 하트비트를 보내지 않으면 컨슈머가 정상동작하지 않는다고 간주하고 리밸런싱 발생.