Sitemap

Apache Kafka 공부 기록 (Part 3)

21 min readApr 6, 2025
https://www.udemy.com/course/apache-kafka

Table of Contents

Java Consumer
Java Consumer with Graceful Shutdown
Java Consumer with Incremental Cooperative Rebalance

Java Consumer

ConsumerDemo.java 파일

package io.conduktor.demos.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {

private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class.getSimpleName());

public static void main(String[] args) {
log.info("Hello World!");
String groupId = "joshua-app";
String topic = "demo_java";

// Create Consumer Properties.
Properties properties = new Properties();

// Connect to Localhost.
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

// Set Consumer Properties.
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");

// Create the Consumer.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

// Subscribe to a Topic.
consumer.subscribe(Arrays.asList(topic));

// Poll for Data.
while (true) {

log.info("Polling...");

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

for (ConsumerRecord<String, String> record: records) {
log.info("Key: " +record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}

}

}

Takeaways

1. Kafka Consumer 개념

  • Kafka에서 Consumer는 브로커로부터 메시지를 읽어오는 역할
  • Consumer Group에 속해 있어야 하며, 메시지는 그룹 내의 각 Consumer에게 분산 소비됨

2. Properties 설정

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");
  • bootstrap.servers: Kafka 브로커의 주소
  • Deserializer: Consumer가 byte[]를 Java 객체로 바꾸는 방법
  • group.id: COnsumer Group을 구분짓는 ID
  • auto.offset.reset: “earliest” (가장 처음부터 읽음), “latest” (최신 메시지부터 읽음), “none” (offset 없을 시 에러)

3. KafkaConsumer 생성

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  • KafkaConsumer<K, V>는 Generic Type으로 key/value의 데이터 타입을 명시함
  • 여기에서는 둘 다 String임

4. Topic Subscription

consumer.subscribe(Arrays.asList(topic));
  • 하나 이상의 topic에 구독 가능 (Arrays.asList()로 리스트 전달)
  • Kafka는 내부적으로 rebalancing을 통해 partition을 group 내에 분배함

5. Polling Loop

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record: records) {
log.info("Key: " +record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
  • poll(): Kafka에서 데이터를 읽는 API
  • 시간 내에 데이터가 없어도 리턴됨 (non-blocking 형태)
  • ConsumerRecord는 단일 메시지임
  • offset: Kafka는 메시지를 저장하고 있고, 읽은 위치(offset)를 기억함

Java Consumer with Graceful Shutdown

ConsumerDemoWithShutdown.java 파일

package io.conduktor.demos.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

;

public class ConsumerDemoWithShutdown {

private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWithShutdown.class.getSimpleName());

public static void main(String[] args) {
log.info("Hello World!");
String groupId = "joshua-app";
String topic = "demo_java";

// Create Consumer Properties.
Properties properties = new Properties();

// Connect to Localhost.
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

// Set Consumer Properties.
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");

// Create the Consumer.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

// Get a Reference to the Main Thread.
final Thread mainThread = Thread.currentThread();

// Adding the Shutdown Hook.
Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// Join the Main Thread to Allow the Execution of the Code in the Main Thread.
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

try {
// Subscribe to a Topic.
consumer.subscribe(Arrays.asList(topic));
// Poll for Data.
while (true) {
log.info("Polling...");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
} catch (WakeupException e) {
log.info("Consumer is starting to shuw down...");
} catch (Exception e) {
log.error("Unexpected exception in the consumer", e);
} finally {
consumer.close(); // Close the Consumer. This will also commit offsets.
log.info("The Consumer is Now Gracefully Shutdown");
}

}

}

Takeaways

1. Graceful Shutdown이란?

Kafka Consumer가 Infinite Loop으로 메시지를 계속 처리하다가, 시스템이 종료될 때 강제로 중단되면 다음과 같은 문제가 생길 수 있다.

  • 마지막으로 읽은 메시지의 offset이 commit되지 않아 다시 읽게 됨
  • 리소스가 정리되지 않음 (소켓, 연결 정보 등)
  • 로그나 모니터링이 비정상 종료로 남게 됨

따라서 Graceful Shutdown 처리가 중요하다. 이를 위해 Kafka에서는 consumer.wakeup()을 사용한다.

2. Runtime.getRuntime().addShutdownHook(…)

Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
  • addShutdownHook: JVM 종료 신호(Ctrl+C, SIGTERM)를 감지했을 때 실행할 로직을 등록함
  • consumer.wakeup(): KafkaConsumer의 poll() Method를 강제로 깨워서 종료하도록 유도
  • mainThread.join(): 메인 쓰레드가 끝날 때까지 기다림 → shutdown 순서 보장 (main 메서드가 너무 빨리 끝나는 걸 방지하기 위해)

3. 예외 처리 구조

try {
consumer.subscribe(...);
while (true) {
consumer.poll(...);
}
} catch (WakeupException e) {
log.info("Consumer is starting to shut down...");
} catch (Exception e) {
log.error("Unexpected exception in the consumer", e);
} finally {
consumer.close();
log.info("The Consumer is Now Gracefully Shutdown");
}
  • WakeupException: wakeup() 호출 시 발생하는 예외. 정상 종료 신호로 사용
  • finally: 무조건 실행되는 블록 (리소스 정리용)
  • consumer.close(): offset commit + 연결 정리 포함된 종료 Method

4. 전체 흐름 요약

main() 실행
└─ KafkaConsumer 생성
└─ JVM 종료 감지 (ShutdownHook 등록)
└─ consumer.wakeup() 호출
└─ poll() 중단 → WakeupException 발생
└─ finally 블록 → consumer.close() 실행 → 안전 종료

Java Consumer with Incremental Cooperative Rebalance

ConsumerDemoCooperative.java 파일

package io.conduktor.demos.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

;

public class ConsumerDemoCooperative {

private static final Logger log = LoggerFactory.getLogger(ConsumerDemoCooperative.class.getSimpleName());

public static void main(String[] args) {
log.info("Hello World!");
String groupId = "joshua-app";
String topic = "demo_java";

// Create Consumer Properties.
Properties properties = new Properties();

// Connect to Localhost.
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

// Set Consumer Properties.
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());

// Create the Consumer.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

// Get a Reference to the Main Thread.
final Thread mainThread = Thread.currentThread();

// Adding the Shutdown Hook.
Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// Join the Main Thread to Allow the Execution of the Code in the Main Thread.
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

try {
// Subscribe to a Topic.
consumer.subscribe(Arrays.asList(topic));
// Poll for Data.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
} catch (WakeupException e) {
log.info("Consumer is starting to shuw down...");
} catch (Exception e) {
log.error("Unexpected exception in the consumer", e);
} finally {
consumer.close(); // Close the Consumer. This will also commit offsets.
log.info("The Consumer is Now Gracefully Shutdown");
}

}

}

Takeaways

1. Cooperative Sticky Assignment 전략

  • Kafka 클러스터 내 여러 Consumer가 있을 때 더 안정적이고 효율적인 파티션 재할당을 목표로 함

2. Cooperative Partition Assignment란?

기존의 Kafka 기본 파티션 재할당 전략은 rebalance 중 모든 Consumers가 Subscription을 일시적으로 멈추고, 다시 전체 파티션을 재할당하는 방식. 하지만 이런 방식은 다음과 같은 문제점 존재:

  • 잠시라도 모든 Consumer가 멈춤 (processing gap 발생)
  • 불필요하게 많은 파티션 이동
  • Consumer가 많을수록 downtime 증가

따라서 Kafka는 CooperativeStickyAssignor 전략을 도입함. 이 전략은 “필요한 파티션만 부분적으로 재할당”하기 때문에 다음과 같은 장점 존재:

  • Consumer가 부분적인 파티션 이동만 발생
  • 재할당 시간 감소, Consumer downtime 최소화
  • Sticky 전략과 함께 사용되어 파티션 이동 최소화

3. Cooperative Sticky Assignor 설정

properties.setProperty("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());
  • partition.assignment.strategy: Consumer 그룹 내 파티션을 어떻게 나눌지 지정
  • CooperativeStickyAssignor: 새로운 파티션 할당 전략. 최소한의 변경으로 할당 변경
  • 기본값: 기존 Kafka의 RangeAssignor 또는 StickyAssignor
  • Sticky는 이전 파티션을 최대한 유지, Cooperative은 기존 파티션을 유지하며 천천히 이동(revocation).

3. 전체 흐름 요약

main() 실행
└─ KafkaConsumer 생성
└─ partition.assignment.strategy = CooperativeStickyAssignor 설정
└─ topic 구독
└─ polling 수행
└─ JVM 종료 시 consumer.wakeup() 호출
└─ WakeupException → shutdown → offset commit

4. 언제 CooperativeStickyAssignor를 써야 할까?

  • Consumer 수가 많은 환경: 적극 추천
  • 실시간 처리 시스템: 강력 추천 (downtime을 줄이기 위해)
  • Consumer가 자주 죽거나 생겨남: 추천
  • 작은 테스트 환경: 기본 전략도 충분하므로 필수 X

--

--

Joshua Kim
Joshua Kim

Written by Joshua Kim

Analytics Engineer | 🇰🇷🇺🇸🇹🇼

No responses yet