Apache Kafka 공부 기록 (Part 3)
Table of Contents
Java Consumer
Java Consumer with Graceful Shutdown
Java Consumer with Incremental Cooperative Rebalance
Java Consumer
ConsumerDemo.java
파일
package io.conduktor.demos.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class.getSimpleName());
public static void main(String[] args) {
log.info("Hello World!");
String groupId = "joshua-app";
String topic = "demo_java";
// Create Consumer Properties.
Properties properties = new Properties();
// Connect to Localhost.
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// Set Consumer Properties.
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");
// Create the Consumer.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe to a Topic.
consumer.subscribe(Arrays.asList(topic));
// Poll for Data.
while (true) {
log.info("Polling...");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record: records) {
log.info("Key: " +record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
}
}
Takeaways
1. Kafka Consumer 개념
- Kafka에서 Consumer는 브로커로부터 메시지를 읽어오는 역할
- Consumer Group에 속해 있어야 하며, 메시지는 그룹 내의 각 Consumer에게 분산 소비됨
2. Properties 설정
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");
- bootstrap.servers: Kafka 브로커의 주소
- Deserializer: Consumer가
byte[]
를 Java 객체로 바꾸는 방법 - group.id: COnsumer Group을 구분짓는 ID
- auto.offset.reset:
“earliest”
(가장 처음부터 읽음),“latest”
(최신 메시지부터 읽음),“none”
(offset 없을 시 에러)
3. KafkaConsumer 생성
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
KafkaConsumer<K, V>
는 Generic Type으로 key/value의 데이터 타입을 명시함- 여기에서는 둘 다 String임
4. Topic Subscription
consumer.subscribe(Arrays.asList(topic));
- 하나 이상의 topic에 구독 가능 (
Arrays.asList()
로 리스트 전달) - Kafka는 내부적으로 rebalancing을 통해 partition을 group 내에 분배함
5. Polling Loop
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record: records) {
log.info("Key: " +record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
- poll(): Kafka에서 데이터를 읽는 API
- 시간 내에 데이터가 없어도 리턴됨 (non-blocking 형태)
- ConsumerRecord는 단일 메시지임
- offset: Kafka는 메시지를 저장하고 있고, 읽은 위치(offset)를 기억함
Java Consumer with Graceful Shutdown
ConsumerDemoWithShutdown.java 파일
package io.conduktor.demos.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
;
public class ConsumerDemoWithShutdown {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWithShutdown.class.getSimpleName());
public static void main(String[] args) {
log.info("Hello World!");
String groupId = "joshua-app";
String topic = "demo_java";
// Create Consumer Properties.
Properties properties = new Properties();
// Connect to Localhost.
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// Set Consumer Properties.
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");
// Create the Consumer.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Get a Reference to the Main Thread.
final Thread mainThread = Thread.currentThread();
// Adding the Shutdown Hook.
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// Join the Main Thread to Allow the Execution of the Code in the Main Thread.
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
// Subscribe to a Topic.
consumer.subscribe(Arrays.asList(topic));
// Poll for Data.
while (true) {
log.info("Polling...");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
} catch (WakeupException e) {
log.info("Consumer is starting to shuw down...");
} catch (Exception e) {
log.error("Unexpected exception in the consumer", e);
} finally {
consumer.close(); // Close the Consumer. This will also commit offsets.
log.info("The Consumer is Now Gracefully Shutdown");
}
}
}
Takeaways
1. Graceful Shutdown이란?
Kafka Consumer가 Infinite Loop으로 메시지를 계속 처리하다가, 시스템이 종료될 때 강제로 중단되면 다음과 같은 문제가 생길 수 있다.
- 마지막으로 읽은 메시지의 offset이 commit되지 않아 다시 읽게 됨
- 리소스가 정리되지 않음 (소켓, 연결 정보 등)
- 로그나 모니터링이 비정상 종료로 남게 됨
따라서 Graceful Shutdown 처리가 중요하다. 이를 위해 Kafka에서는 consumer.wakeup()
을 사용한다.
2. Runtime.getRuntime().addShutdownHook(…)
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
addShutdownHook
: JVM 종료 신호(Ctrl+C, SIGTERM)를 감지했을 때 실행할 로직을 등록함consumer.wakeup()
: KafkaConsumer의poll()
Method를 강제로 깨워서 종료하도록 유도mainThread.join()
: 메인 쓰레드가 끝날 때까지 기다림 → shutdown 순서 보장 (main 메서드가 너무 빨리 끝나는 걸 방지하기 위해)
3. 예외 처리 구조
try {
consumer.subscribe(...);
while (true) {
consumer.poll(...);
}
} catch (WakeupException e) {
log.info("Consumer is starting to shut down...");
} catch (Exception e) {
log.error("Unexpected exception in the consumer", e);
} finally {
consumer.close();
log.info("The Consumer is Now Gracefully Shutdown");
}
- WakeupException:
wakeup()
호출 시 발생하는 예외. 정상 종료 신호로 사용 - finally: 무조건 실행되는 블록 (리소스 정리용)
- consumer.close(): offset commit + 연결 정리 포함된 종료 Method
4. 전체 흐름 요약
main() 실행
└─ KafkaConsumer 생성
└─ JVM 종료 감지 (ShutdownHook 등록)
└─ consumer.wakeup() 호출
└─ poll() 중단 → WakeupException 발생
└─ finally 블록 → consumer.close() 실행 → 안전 종료
Java Consumer with Incremental Cooperative Rebalance
ConsumerDemoCooperative.java
파일
package io.conduktor.demos.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
;
public class ConsumerDemoCooperative {
private static final Logger log = LoggerFactory.getLogger(ConsumerDemoCooperative.class.getSimpleName());
public static void main(String[] args) {
log.info("Hello World!");
String groupId = "joshua-app";
String topic = "demo_java";
// Create Consumer Properties.
Properties properties = new Properties();
// Connect to Localhost.
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// Set Consumer Properties.
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());
// Create the Consumer.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Get a Reference to the Main Thread.
final Thread mainThread = Thread.currentThread();
// Adding the Shutdown Hook.
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();
// Join the Main Thread to Allow the Execution of the Code in the Main Thread.
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
// Subscribe to a Topic.
consumer.subscribe(Arrays.asList(topic));
// Poll for Data.
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset: " + record.offset());
}
}
} catch (WakeupException e) {
log.info("Consumer is starting to shuw down...");
} catch (Exception e) {
log.error("Unexpected exception in the consumer", e);
} finally {
consumer.close(); // Close the Consumer. This will also commit offsets.
log.info("The Consumer is Now Gracefully Shutdown");
}
}
}
Takeaways
1. Cooperative Sticky Assignment 전략
- Kafka 클러스터 내 여러 Consumer가 있을 때 더 안정적이고 효율적인 파티션 재할당을 목표로 함
2. Cooperative Partition Assignment란?
기존의 Kafka 기본 파티션 재할당 전략은 rebalance 중 모든 Consumers가 Subscription을 일시적으로 멈추고, 다시 전체 파티션을 재할당하는 방식. 하지만 이런 방식은 다음과 같은 문제점 존재:
- 잠시라도 모든 Consumer가 멈춤 (processing gap 발생)
- 불필요하게 많은 파티션 이동
- Consumer가 많을수록 downtime 증가
따라서 Kafka는 CooperativeStickyAssignor 전략을 도입함. 이 전략은 “필요한 파티션만 부분적으로 재할당”하기 때문에 다음과 같은 장점 존재:
- Consumer가 부분적인 파티션 이동만 발생
- 재할당 시간 감소, Consumer downtime 최소화
- Sticky 전략과 함께 사용되어 파티션 이동 최소화
3. Cooperative Sticky Assignor 설정
properties.setProperty("partition.assignment.strategy", CooperativeStickyAssignor.class.getName());
partition.assignment.strategy
: Consumer 그룹 내 파티션을 어떻게 나눌지 지정CooperativeStickyAssignor
: 새로운 파티션 할당 전략. 최소한의 변경으로 할당 변경- 기본값: 기존 Kafka의
RangeAssignor
또는StickyAssignor
- Sticky는 이전 파티션을 최대한 유지, Cooperative은 기존 파티션을 유지하며 천천히 이동(revocation).
3. 전체 흐름 요약
main() 실행
└─ KafkaConsumer 생성
└─ partition.assignment.strategy = CooperativeStickyAssignor 설정
└─ topic 구독
└─ polling 수행
└─ JVM 종료 시 consumer.wakeup() 호출
└─ WakeupException → shutdown → offset commit
4. 언제 CooperativeStickyAssignor를 써야 할까?
- Consumer 수가 많은 환경: 적극 추천
- 실시간 처리 시스템: 강력 추천 (downtime을 줄이기 위해)
- Consumer가 자주 죽거나 생겨남: 추천
- 작은 테스트 환경: 기본 전략도 충분하므로 필수 X