将 Kafka 与 Junit 结合使用

位置:首页>文章>详情   分类: Java教程 > 编程技术   阅读(303)   2023-09-05 09:14:57

出色的 Spring Kafka 项目提供的一个简洁功能,除了比 raw Kafka Producer and Consumer 更易于使用的抽象之外,是一种在测试中使用 Kafka 的方法。它通过提供一个嵌入式版本的 Kafka 来实现这一点,该版本可以非常容易地设置和拆除。

项目需要包含此支持的所有内容是“spring-kafka-test”模块,对于 gradle 构建方式如下:

testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"

请注意,我使用的是该项目的快照版本,因为它支持 Kafka 0.10+。

有了这种依赖性,就可以使用 JUnit 的 @ClassRule 在测试中启动嵌入式 Kafka:

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");

这将启动一个具有 2 个代理的 Kafka 集群,一个名为“消息”的主题使用 2 个分区,类规则将确保 Kafka 集群在测试运行之前启动,然后在测试结束时关闭。

以下是使用嵌入式 Kafka 集群的 Raw Kafka 生产者/消费者的示例,嵌入式 Kafka 可用于检索 Kafka 生产者/消费者所需的属性:

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get();
producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get();
producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get();
producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();


Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");

final CountDownLatch latch = new CountDownLatch(4);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
    KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
    kafkaConsumer.subscribe(Collections.singletonList("messages"));
    try {
        while (true) {
            ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
                latch.countDown();
            }
        }
    } finally {
        kafkaConsumer.close();
    }
});

assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();

此处提供了更全面的测试

标签2: Java教程
地址:https://www.cundage.com/article/jcg-using-kafka-junit.html

相关阅读

Java HashSet 教程展示了如何使用 Java HashSet 集合。 Java哈希集 HashSet 是一个不包含重复元素的集合。此类为基本操作(添加、删除、包含和大小)提供恒定时间性...
SpringApplicationBuilder 教程展示了如何使用 SpringApplicationBuilder 创建一个简单的 Spring Boot 应用程序。 春天 是用于创建企业应...
通道是继 buffers 之后 java.nio 的第二个主要新增内容,我们在之前的教程中已经详细了解了这一点。通道提供与 I/O 服务的直接连接。 通道是一种在字节缓冲区和通道另一端的实体(通...
课程大纲 Elasticsearch 是一个基于 Lucene 的搜索引擎。它提供了一个分布式的、支持多租户的全文搜索引擎,带有 HTTP Web 界面和无模式的 JSON 文档。 Elasti...
解析器是强大的工具,使用 ANTLR 可以编写可用于多种不同语言的各种解析器。 在这个完整的教程中,我们将: 解释基础:什么是解析器,它可以用来做什么 查看如何设置 ANTLR 以便在 Java...
Java 是用于开发各种桌面应用程序、Web 应用程序和移动应用程序的最流行的编程语言之一。以下文章将帮助您快速熟悉 Java 语言,并迈向 API 和云开发等更复杂的概念。 1. Java语言...
Java中的继承是指子类继承或获取父类的所有非私有属性和行为的能力。继承是面向对象编程的四大支柱之一,用于提高层次结构中类之间的代码可重用性。 在本教程中,我们将了解 Java 支持的继承类型,...
Java Message Service 是一种支持正式通信的 API,称为 网络上计算机之间的消息传递。 JMS 为支持 Java 程序的标准消息协议和消息服务提供了一个通用接口。 JMS 提...
Java 项目中的一项常见任务是将日期格式化或解析为字符串,反之亦然。解析日期意味着你有一个代表日期的字符串,例如“2017-08-3”,你想把它转换成一个代表 Java 中日期的对象,例如Ja...
之前,我介绍了spring 3 + hibernate 集成 示例和struts 2 hello world 示例。在本教程中,我将讨论在将 spring 框架与 struts 与 hibern...