자주 쓰는 명령어

카프카 컨테이너 bash 실행
(컨테이너 네임을 ref_kafka 로 지정 했다.)
docker-compose exec -it kafka bash

카프카 토픽 리스트 확인 (내부 포트를 9092 로 지정했다.)
kafka-topics.sh --list --bootstrap-server kafka:9092

카프카 토픽 삭제
kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic <토픽명>

카프카 토픽 생성
kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic <토픽명>

카프카 토픽 데이터 생성
kafka-console-producer.sh --bootstrap-server kafka:9092 --topic <토픽명>
프롬프트 데이터 입력 후 엔터

카프카 토픽 데이터 조회 (처음부터 조회) (--from-beginning 을 붙이지 않으면 실시간 조회)
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic <토픽명> --from-beginning

1. (프로듀서 실행)
kafka-console-producer.sh --bootstrap-server kafka:9092 --topic test
2. (컨슈머 실행)
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test
(이후 프로듀서 에서 입력한걸 컨슈머 에서 확인 가능)

정리

카프카는 컨슈머에서 브로커에 저장된 메시지를 가져온다.

모든 컨슈머는 하나의 컨슈머 그룹에 속해야 한다

821a9970b7870a17a8770d5bc840b8f264c952e005edf288f6c10db5e918a9c8.png

위와 같이 컨슈머 그룹이 여러개인 경우 서로 다른 컨슈머 그룹 간 옵셋은 서로 영향을 줄 수 없다.

subscribe(), poll(), commit()

4a7c32a131a82e5480681df7e6e15d0a110f8ba6d65354f7cf257d203fcaa8b1.png

  1. subscribe() → 컨슈머가 Consumer Group의 정보응 이용해 브로커에게 구독 요청을 하는 메서드 → 브로커는 해당 요청을 받으면, 해당 Consumer Group 에 대해 리밸런싱 지시
  2. poll() → 주기적으로, 할당 받은 파티션에서 메시지를 가져오는 역할 수행 → 첫 번째 poll() 에서는 메시지를 가져오지 않고 프로커의 메타데이터를 가져오고 그룹 코디네이터와 연결 → 두 번째 poll() 에서 부터 메시지를 가져옴
  3. commit() → 브로커 내부 토픽인 __consumer_offsets 에 특정 토픽의 특정 파티션의 어떤 offset 까지 읽었는지 기록 → 개발자가 직접 manual commit 할 수 있으며, 주기적으로 auto commit 하도록 할 수 있음