Spring Kafka 生产者/消费者示例

位置:首页>文章>详情   分类: Java教程 > 编程技术   阅读(178)   2023-10-21 07:14:57

我的目标是展示 Spring Kafka 如何提供对原始 Kafka 生产者和消费者 API 的抽象,该 API 易于使用且具有 Spring 背景的人熟悉。

示例场景

示例场景很简单,我有一个生成消息的系统和另一个处理消息的系统

使用 Raw Kafka Producer/Consumer API 的实现

首先,我使用原始的 Kafka Producer 和 Consumer API 来实现这个场景。如果您想查看代码,可以在我的 github 存储库 中找到它。

制作人

下面设置了一个 KafkaProducer 实例,用于向 Kafka 主题发送消息:

KafkaProducer<String, WorkUnit> producer 
    = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());

我使用了 KafkaProducer 构造函数的变体,它采用自定义 Serializer 将域对象转换为 json 表示形式。

一旦 KafkaProducer 的实例可用,它就可以用于向 Kafka 集群发送消息,这里我使用了同步版本的发送器,它等待响应返回。

ProducerRecord<String, WorkUnit> record 
                = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);

RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();

消费者

在 Consumer 端,我们创建了一个 KafkaConsumer,它包含一个 Deserializer 构造函数的变体,它知道如何读取 json 消息并将其转换为域实例:

KafkaConsumer<String, WorkUnit> consumer
= new KafkaConsumer<>(props, stringKeyDeserializer()
, workUnitJsonValueDeserializer());

一旦 KafkaConsumer 的实例可用,就可以放置一个侦听器循环来读取一批记录,处理它们并等待更多记录通过:

consumer.subscribe("workunits);

try {
    while (true) {
        ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);
        for (ConsumerRecord<String, WorkUnit> record : records) {
            log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());

        }
    }
} finally {
    this.consumer.close();
}

使用Spring Kafka实现

我在 my github repo 中使用 Spring-kafka 进行了实现。

制作人

Spring-Kafka 提供了一个 KafkaTemplate 类作为 KafkaProducer 的包装器来向 Kafka 主题发送消息:

@Bean
public ProducerFactory<String, WorkUnit> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());
}

@Bean
public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {
    KafkaTemplate<String, WorkUnit> kafkaTemplate =  new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic("workunits");
    return kafkaTemplate;
}

需要注意的一件事是,虽然早些时候我实现了自定义序列化器/反序列化器以将域类型作为 json 发送,然后将其转换回来,但 Spring-Kafka 为 json 提供了开箱即用的序列化器/反序列化器。

并使用 KafkaTemplate 发送消息:

SendResult<String, WorkUnit> sendResult = 
    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();

RecordMetadata recordMetadata = sendResult.getRecordMetadata();

LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",
        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);

消费者

消费者部分是使用监听器模式实现的,任何为 RabbitMQ/ActiveMQ 实现监听器的人都应该熟悉这种模式。首先是设置侦听器容器的配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(1);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, WorkUnit> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());
}

以及响应容器读取的消息的服务:

@Service
public class WorkUnitsConsumer {
    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);

    @KafkaListener(topics = "workunits")
    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
                topic, partition, offset, workUnit);
    }
}

在这里,像原始消费者一样设置监听器循环的所有复杂性都被避免了,并且被监听器容器很好地隐藏了。

总结

我已经了解了很多关于设置批量大小、确认变化、不同 API 签名的内部细节。我的目的只是演示一个使用原始 Kafka API 的常见用例,并展示 Spring-Kafka 包装器如何简化它。

如果您有兴趣进一步探索,可以在此处获取原始生产者消费者示例,在此处获取Spring Kafka示例

标签2: Java教程
地址:https://www.cundage.com/article/jcg-spring-kafka-producerconsumer-sample.html

相关阅读

Java HashSet 教程展示了如何使用 Java HashSet 集合。 Java哈希集 HashSet 是一个不包含重复元素的集合。此类为基本操作(添加、删除、包含和大小)提供恒定时间性...
SpringApplicationBuilder 教程展示了如何使用 SpringApplicationBuilder 创建一个简单的 Spring Boot 应用程序。 春天 是用于创建企业应...
通道是继 buffers 之后 java.nio 的第二个主要新增内容,我们在之前的教程中已经详细了解了这一点。通道提供与 I/O 服务的直接连接。 通道是一种在字节缓冲区和通道另一端的实体(通...
课程大纲 Elasticsearch 是一个基于 Lucene 的搜索引擎。它提供了一个分布式的、支持多租户的全文搜索引擎,带有 HTTP Web 界面和无模式的 JSON 文档。 Elasti...
解析器是强大的工具,使用 ANTLR 可以编写可用于多种不同语言的各种解析器。 在这个完整的教程中,我们将: 解释基础:什么是解析器,它可以用来做什么 查看如何设置 ANTLR 以便在 Java...
Java 是用于开发各种桌面应用程序、Web 应用程序和移动应用程序的最流行的编程语言之一。以下文章将帮助您快速熟悉 Java 语言,并迈向 API 和云开发等更复杂的概念。 1. Java语言...
Java中的继承是指子类继承或获取父类的所有非私有属性和行为的能力。继承是面向对象编程的四大支柱之一,用于提高层次结构中类之间的代码可重用性。 在本教程中,我们将了解 Java 支持的继承类型,...
Java Message Service 是一种支持正式通信的 API,称为 网络上计算机之间的消息传递。 JMS 为支持 Java 程序的标准消息协议和消息服务提供了一个通用接口。 JMS 提...
Java 项目中的一项常见任务是将日期格式化或解析为字符串,反之亦然。解析日期意味着你有一个代表日期的字符串,例如“2017-08-3”,你想把它转换成一个代表 Java 中日期的对象,例如Ja...
之前,我介绍了spring 3 + hibernate 集成 示例和struts 2 hello world 示例。在本教程中,我将讨论在将 spring 框架与 struts 与 hibern...