我的目标是展示 Spring Kafka 如何提供对原始 Kafka 生产者和消费者 API 的抽象,该 API 易于使用且具有 Spring 背景的人熟悉。
示例场景很简单,我有一个生成消息的系统和另一个处理消息的系统
首先,我使用原始的 Kafka Producer 和 Consumer API 来实现这个场景。如果您想查看代码,可以在我的 github 存储库 中找到它。
下面设置了一个 KafkaProducer 实例,用于向 Kafka 主题发送消息:
KafkaProducer<String, WorkUnit> producer = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());
我使用了 KafkaProducer 构造函数的变体,它采用自定义 Serializer 将域对象转换为 json 表示形式。
一旦 KafkaProducer 的实例可用,它就可以用于向 Kafka 集群发送消息,这里我使用了同步版本的发送器,它等待响应返回。
ProducerRecord<String, WorkUnit> record = new ProducerRecord<>("workunits", workUnit.getId(), workUnit); RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();
在 Consumer 端,我们创建了一个 KafkaConsumer,它包含一个 Deserializer 构造函数的变体,它知道如何读取 json 消息并将其转换为域实例:
KafkaConsumer<String, WorkUnit> consumer
= new KafkaConsumer<>(props, stringKeyDeserializer()
, workUnitJsonValueDeserializer());
一旦 KafkaConsumer 的实例可用,就可以放置一个侦听器循环来读取一批记录,处理它们并等待更多记录通过:
consumer.subscribe("workunits); try { while (true) { ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100); for (ConsumerRecord<String, WorkUnit> record : records) { log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } finally { this.consumer.close(); }
我在 my github repo 中使用 Spring-kafka 进行了实现。
Spring-Kafka 提供了一个 KafkaTemplate 类作为 KafkaProducer 的包装器来向 Kafka 主题发送消息:
@Bean public ProducerFactory<String, WorkUnit> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer()); } @Bean public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() { KafkaTemplate<String, WorkUnit> kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setDefaultTopic("workunits"); return kafkaTemplate; }
需要注意的一件事是,虽然早些时候我实现了自定义序列化器/反序列化器以将域类型作为 json 发送,然后将其转换回来,但 Spring-Kafka 为 json 提供了开箱即用的序列化器/反序列化器。
并使用 KafkaTemplate 发送消息:
SendResult<String, WorkUnit> sendResult = workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get(); RecordMetadata recordMetadata = sendResult.getRecordMetadata(); LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);
消费者部分是使用监听器模式实现的,任何为 RabbitMQ/ActiveMQ 实现监听器的人都应该熟悉这种模式。首先是设置侦听器容器的配置:
@Bean public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConcurrency(1); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<String, WorkUnit> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer()); }
以及响应容器读取的消息的服务:
@Service public class WorkUnitsConsumer { private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class); @KafkaListener(topics = "workunits") public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}", topic, partition, offset, workUnit); } }
在这里,像原始消费者一样设置监听器循环的所有复杂性都被避免了,并且被监听器容器很好地隐藏了。
我已经了解了很多关于设置批量大小、确认变化、不同 API 签名的内部细节。我的目的只是演示一个使用原始 Kafka API 的常见用例,并展示 Spring-Kafka 包装器如何简化它。
如果您有兴趣进一步探索,可以在此处获取原始生产者消费者示例,在此处获取Spring Kafka示例
标签2: Java教程地址:https://www.cundage.com/article/jcg-spring-kafka-producerconsumer-sample.html