教育行业A股IPO第一股(股票代码 003032)

全国咨询/投诉热线:400-618-4000

Kafka中的HW、LEO等分别代表什么?

更新时间:2023年10月19日11时37分 来源:传智教育 浏览次数:

好口碑IT培训

  在Apache Kafka中,HW(High Watermark)和 LEO(Log End Offset)是与分区的复制和消息传递相关的两个关键概念。

  1.HW(High Watermark):

  High Watermark是一个分区的消息复制进度的指示器。它表示了已经成功复制到所有副本的消息的位置。HW之前的所有消息都被认为是已提交的消息,这意味着消费者可以安全地消费这些消息。HW通常是消费者组维护的偏移量的参考点。

  2.LEO(Log End Offset):

  Log End Offset表示一个分区中消息日志的最后一个位置,即下一条消息要写入的位置。LEO是动态变化的,因为消息不断被追加到分区。它表示了分区中的最新消息位置。

  接下来笔者用一段具体的示例代码,来演示下如何使用Java和Kafka Consumer API来获取分区的HW和LEO:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaHWLEOExample {
    public static void main(String[] args) {
        // 设置Kafka消费者的配置
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建Kafka消费者
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 指定要订阅的主题
        String topic = "my-topic";
        consumer.subscribe(Collections.singletonList(topic));

        // 获取分区信息
        PartitionInfo partitionInfo = consumer.partitionsFor(topic).get(0);
        int partition = partitionInfo.partition();

        // 在消费者循环中获取HW和LEO
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (TopicPartition topicPartition : records.partitions()) {
                long hw = consumer.position(topicPartition); // 获取HW
                long leo = consumer.endOffsets(Collections.singleton(topicPartition)).get(topicPartition); // 获取LEO
                System.out.println("Partition " + topicPartition.partition() + ": HW = " + hw + ", LEO = " + leo);
            }
        }
    }
}

  上面的代码创建了一个Kafka消费者,并订阅了一个主题。在消费者循环中,我们使用position()方法来获取分区的HW,并使用endOffsets()方法来获取分区的LEO。这可以帮助我们监视分区的消息复制进度和消息日志的结束位置。

0 分享到:
和我们在线交谈!