出色的 Spring Kafka 项目提供的一个简洁功能,除了比 raw Kafka Producer and Consumer 更易于使用的抽象之外,是一种在测试中使用 Kafka 的方法。它通过提供一个嵌入式版本的 Kafka 来实现这一点,该版本可以非常容易地设置和拆除。
项目需要包含此支持的所有内容是“spring-kafka-test”模块,对于 gradle 构建方式如下:
testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"
请注意,我使用的是该项目的快照版本,因为它支持 Kafka 0.10+。
有了这种依赖性,就可以使用 JUnit 的 @ClassRule 在测试中启动嵌入式 Kafka:
@ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");
这将启动一个具有 2 个代理的 Kafka 集群,一个名为“消息”的主题使用 2 个分区,类规则将确保 Kafka 集群在测试运行之前启动,然后在测试结束时关闭。
以下是使用嵌入式 Kafka 集群的 Raw Kafka 生产者/消费者的示例,嵌入式 Kafka 可用于检索 Kafka 生产者/消费者所需的属性:
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get(); producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get(); producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get(); producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get(); Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka); consumerProps.put("auto.offset.reset", "earliest"); final CountDownLatch latch = new CountDownLatch(4); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> { KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps); kafkaConsumer.subscribe(Collections.singletonList("messages")); try { while (true) { ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<Integer, String> record : records) { LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); latch.countDown(); } } } finally { kafkaConsumer.close(); } }); assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();
此处提供了更全面的测试
标签2: Java教程地址:https://www.cundage.com/article/jcg-using-kafka-junit.html