본문 바로가기
카프카

카프카 톺아보기2 - CLI

by pius712 2024. 5. 30.

카프카는 운영을 위한 커맨드 라인툴을 많이 제공한다.

카프카를 운영하는 입장은 아니지만, 로컬에서 카프카를 세팅할때 필요한 명령어들을 조금 알아보려고한다.

브로커 설정

도커 컴포즈를 사용하여 카프카 세팅을 하는 경우, 아래와 같은 방식을 통해 설정할 수 있다.

이번 포스팅에서는 커맨드라인 툴을 설명할 예정이라 추후에 다뤄보려고 한다.

  • docker-compose 파일의 environment 프로퍼티로 설정
  • server.properties 로 설정

토픽 관련 - kafka-topics.sh

위 쉘 스크립트를 통해서는 토픽의 생성, 정보 세팅, 정보 변경 등을 수행할 수 있다.

옵션

  • —create : 토픽의 생성. 별다른 설정을 주지 않으면, 브로커의 값을 따르게 된다.

docker 명령어를 통해서 실행

docker exec -it kafka kafka-topics.sh --create \\
	--bootstrap-server localhost:9092 \\
	--topic coupon_create

도커 컨테이너 진입하여 실행

kafka-topics.sh --create \\
	--bootstrap-server localhost:9092 \\
	--topic coupon_create

다른 설정 값 추가

kafka-topics.sh --create \\
	--bootstrap-server localhost:9092 \\
	--partition 3
	--replication-factor 2
	--topic coupon_create
  • —describe: 토픽에 대한 정보를 볼 수 있다.
# kafka-topics.sh --bootstrap-server localhost:9092 \\ 
			--topic test2 \\
			--describe
Topic: test2    PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
Topic: test2    Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

다른 옵션 추가

  • —partitions 숫자
  • —replication-factors 숫자
  • —config: 토픽 설정

자세한 옵션의 경우 kafka-topic.sh --help 를 통해 확인할 수 있다.

kafka-topics.sh --create \\
	--bootstrap-server localhost:9092 \\
	--partitions 10 --replication-factor 1
	--topic test3
	--config retention.ms=123123123
  • —alter : 토픽에 설정된 설정 값을 변경할 수 있다. 파티션 수 조정, replication factor 변경, 토픽의 설정 변경을 할 수 있다.
kafka-topics.sh --bootstrap-server localhost:9092 \\
	--topic test2 \\
	--alter --partitions 2
	
// 변경된 값 확인
kafka-topics.sh --bootstrap-server localhost:9092 --topic test2 --describe
Topic: test2    PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
Topic: test2    Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
Topic: test2    Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001

카프카 설정 - kafka-configs.sh

해당 쉘 스크립트의 경우, topic, client, broker 의 설정을 변경, 추가할 수 있다.

kafka-topics.sh 의 경우 topic 의 설정을 변경했다면, 해당 쉘 스크립트는 더 광범위한 설정을 변경할 수 있다.

브로커 설정 변경 예시

// broker 의 id 를 알기 위해 조회
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

//
kafka-configs.sh --bootstrap-server localhost:9092 \\
		 --broker 1001 \\
		 --alter \\
		 --add-config message.max.bytes=2000000
	

참고로 bootstrap-server 의 경우, 카프카 클러스터로 카프카 클라이언트가 연결될 때 사용할 브로커이고,

broker 옵션의 아이디는 변경할 broker 이다.

컨슈머 그룹 설정 - kafka-consumer-groups.sh

기본적으로 consumer group은 쉘 스크립트를 통해 생성하지 않더라도, consumer 가 메시지를 consume 할 때, 컨슈머 그룹 이름을 지정해주면 해당 이름으로 컨슈머 그룹이 생성된다.

아래의 명령어를 수행하면, 컨슈머 그룹의 리스트를 확인, 상태를 확인할 수 있다.

아래의 경우, console-consumer로 메시지 조회시 임시로 생성된 컨슈머 그룹이다

// 리스트 조회
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \\
		--list
// 리스트 조회 결과
console-consumer-53704

// 상태조회 
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \\
		--group test --describe

// 상태조회 결과 
GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
console-consumer-53704 test            0          -               9               -               consumer-console-consumer-53704-1-2593d1e1-0910-48e3-8ffd-3cc16e8aa939 /127.0.0.1      consumer-console-consumer-53704-1[
		

그 외에도 특정 컨슈머 그룹의 offset 에 대한 수정을 할 수 있는데, 이를 수정하면 해당 컨슈머 그룹은 설정된 offset 부터 다시 메시지를 컨슘하게 된다.

로직이 잘못되어서 수정배포가 나가서 다시 컨슘해야하는 경우에 사용할 수 있음.

  • —to-earliest : 가장 처음 오프셋부터 컨슘
  • —to-latest: 가장 마지막 오프셋부터 컨슘
  • —to-offset 오프셋번호: 특정 오프셋번호부터 컨슘
  • 그 외..
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \\
		-- group group_1
		-- topic test
		-- reset-offsets (오프셋 리셋 설정)

테스트 용도의 producer, consumer

kafka-console-consumer.sh

특정 토픽의 메시지를 읽어 console 에 프린트하는 console consumer 하나가 실행된다.

아래처럼 명령어 수행시, 메세지를 consume 하면 console 에 메시지가 찍히게 된다.

kafka-console-consumer.sh --bootstrap-server localhost:9092 \\
		--topic my-topic

옵션

  • —from-beginning : 토픽의 첫 메시지부터 consume
kafka-console-consumer.sh --bootstrap-server localhost:9092 \\
		--topic my-topic --from-beginning
  • —property
  • —max-message: 최대로 소비할 메시지 수 지정. 해당 개수만큼만 consume 하고 종료
kafka-console-consumer.sh --bootstrap-server localhost:9092 \\
		--topic my-topic --max-message 10
  • —partition: 특정 파티션의 메시지만 consume
kafka-console-consumer.sh --bootstrap-server localhost:9092 \\
		--topic my-topic --partition 1

kafka-console-producer.sh

아래의 명령어를 수행시 특정 토픽에 message 를 보낼 수 있다.

kafka-console-producer.sh --bootstrap-server localhost:9092 \\
	--topic my-topic
// 아래부터 메시지 + enter => 메시지 전송
> hello
> world