Reactor Core 实现了 Reactive Streams 规范并处理(可能是无限的)数据流。如果您对它感兴趣,请查看它提供的出色的文档。在这里,我假设对 Reactor Core 库 Flux 和 Mono 类型有一些基本的了解,并将介绍 Reactor Core 提供了时间本身的抽象,以便能够测试依赖于通道的函数的时间。
对于 Reactor-core 的某些操作符,时间是一个重要的考虑因素——例如,“间隔”函数的变体在 10 秒的初始“延迟”后每 5 秒发出一个递增的数字:
val flux = Flux .interval(Duration.ofSeconds(10), Duration.ofSeconds(5)) .take(3)
根据正常的时间流逝来测试这样的数据流是很糟糕的,这样的测试大约需要 20 秒才能完成。
Reactor-Core 提供了一种解决方案,一种对时间本身的抽象——基于虚拟时间的调度程序,它提供了一种以确定性方式测试这些类型操作的巧妙方法。
让我以两种方式展示它,一种显式方式应该使基于虚拟时间的调度程序的操作非常清楚,然后是使用 Reactor Core 进行测试的推荐方法。
import org.assertj.core.api.Assertions.assertThat import org.junit.Test import reactor.core.publisher.Flux import reactor.test.scheduler.VirtualTimeScheduler import java.time.Duration import java.util.concurrent.CountDownLatch class VirtualTimeTest { @Test fun testExplicit() { val mutableList = mutableListOf<Long>() val scheduler = VirtualTimeScheduler.getOrSet() val flux = Flux .interval(Duration.ofSeconds(10), Duration.ofSeconds(5), scheduler) .take(3) val latch = CountDownLatch(1) flux.subscribe({ l -> mutableList.add(l) }, { _ -> }, { latch.countDown() }) scheduler.advanceTimeBy(Duration.ofSeconds(10)) assertThat(mutableList).containsExactly(0L) scheduler.advanceTimeBy(Duration.ofSeconds(5)) assertThat(mutableList).containsExactly(0L, 1L) scheduler.advanceTimeBy(Duration.ofSeconds(5)) assertThat(mutableList).containsExactly(0L, 1L, 2L) latch.await() } }
1. 首先,“Flux.interval”函数的调度器被设置为基于虚拟时间的调度器。
2. 数据流预计在 10 秒延迟后每 5 秒发射一次
3. VirtualTimeScheduler 提供了一个“advanceTimeBy”方法来将虚拟时间提前一个 Duration,因此时间首先被提前 10 秒的延迟时间,此时第一个元素(0)预计将被发射
4. 随后两次提前5秒,分别得到1和2。
这是确定性的,测试会很快完成。这个版本的测试虽然丑陋,但它使用一个列表来收集和断言结果,并使用 CountDownLatch 来控制测试何时终止。一种更简洁的测试 Reactor-Core 类型的方法是使用优秀的 StepVerifier 类,使用此类的测试如下所示:
import org.junit.Test import reactor.core.publisher.Flux import reactor.test.StepVerifier import reactor.test.scheduler.VirtualTimeScheduler import java.time.Duration class VirtualTimeTest { @Test fun testWithStepVerifier() { VirtualTimeScheduler.getOrSet() val flux = Flux .interval(Duration.ofSeconds(10), Duration.ofSeconds(5)) .take(3) StepVerifier.withVirtualTime({ flux }) .expectSubscription() .thenAwait(Duration.ofSeconds(10)) .expectNext(0) .thenAwait(Duration.ofSeconds(5)) .expectNext(1) .thenAwait(Duration.ofSeconds(5)) .expectNext(2) .verifyComplete() } }
这个使用 StepVerifier 的新测试读起来很好,每一步都会提前时间并断言当时的预期。
标签2: Java教程地址:https://www.cundage.com/article/jcg-testing-time-based-reactor-core-streams-virtual-time.html