在上一篇文章中,我们创建了一个从 ElasticSearch 的 API 到 Reactor 的 Mono
的简单适配器,如下所示:
import reactor.core.publisher.Mono; private Mono indexDoc(Doc doc) { //... }
现在我们想在受控的并发级别上运行此方法数百万次。基本上,我们想看看我们的索引代码在负载下的表现如何,对其进行基准测试。
首先,我们需要一些好看的测试数据。为此,我们将使用方便的 jFairy 库。我们要索引的文档是一个简单的 POJO:
@Value class Doc { private final String username; private final String json; }
生成逻辑包装在一个 Java 类中:
import io.codearte.jfairy.Fairy; import io.codearte.jfairy.producer.person.Address; import io.codearte.jfairy.producer.person.Person; import org.apache.commons.lang3.RandomUtils; @Component class PersonGenerator { private final ObjectMapper objectMapper; private final Fairy fairy; private Doc generate() { Person person = fairy.person(); final String username = person.getUsername() + RandomUtils.nextInt(1_000_000, 9_000_000); final ImmutableMap<String, Object> map = ImmutableMap.<String, Object>builder() .put("address", toMap(person.getAddress())) .put("firstName", person.getFirstName()) .put("middleName", person.getMiddleName()) .put("lastName", person.getLastName()) .put("email", person.getEmail()) .put("companyEmail", person.getCompanyEmail()) .put("username", username) .put("password", person.getPassword()) .put("sex", person.getSex()) .put("telephoneNumber", person.getTelephoneNumber()) .put("dateOfBirth", person.getDateOfBirth()) .put("company", person.getCompany()) .put("nationalIdentityCardNumber", person.getNationalIdentityCardNumber()) .put("nationalIdentificationNumber", person.getNationalIdentificationNumber()) .put("passportNumber", person.getPassportNumber()) .build(); final String json = objectMapper.writeValueAsString(map); return new Doc(username, json); } private ImmutableMap<String, Object> toMap(Address address) { return ImmutableMap.<String, Object>builder() .put("street", address.getStreet()) .put("streetNumber", address.getStreetNumber()) .put("apartmentNumber", address.getApartmentNumber()) .put("postalCode", address.getPostalCode()) .put("city", address.getCity()) .put("lines", Arrays.asList(address.getAddressLine1(), address.getAddressLine2())) .build(); } }
相当无聊的代码实际上做了一些很酷的事情。每次我们运行它时,它都会生成随机但合理的 JSON,如下所示:
{ "address": { "street": "Ford Street", "streetNumber": "32", "apartmentNumber": "", "postalCode": "63913", "city": "San Francisco", "lines": [ "32 Ford Street", "San Francisco 63913" ] }, "firstName": "Evelyn", "middleName": "", "lastName": "Pittman", "email": "pittman@mail.com", "companyEmail": "evelyn.pittman@woodsllc.eu", "username": "epittman5795354", "password": "VpEfFmzG", "sex": "FEMALE", "telephoneNumber": "368-005-109", "dateOfBirth": "1917-05-14T16:47:06.273Z", "company": { "name": "Woods LLC", "domain": "woodsllc.eu", "email": "contact@woodsllc.eu", "vatIdentificationNumber": "30-0005081", "url": "http://www.woodsllc.eu" }, "nationalIdentityCardNumber": "713-79-5185", "nationalIdentificationNumber": "", "passportNumber": "jVeyZLSt3" }
整洁的!不幸的是,没有记录jFairy 是否是线程安全的,所以为了以防万一在实际代码中,我使用了ThreadLocal
。好吧,我们只有一份文件,但我们需要数百万份!使用 for
-loop 太过时了。关于无穷无尽的随机人群,您会说些什么?
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; private final Scheduler scheduler = Schedulers.newParallel(PersonGenerator.class.getSimpleName()); Mono<Doc> generateOne() { return Mono .fromCallable(this::generate) .subscribeOn(scheduler); } Flux<Doc> infinite() { return generateOne().repeat(); }
generateOne()
将阻塞的 generate()
方法包装在 Mono<Doc>
中。此外,generate()
在 parallelScheduler
上运行。为什么?事实证明,jFairy 在单核上不够快(大量随机数生成、表查找等),所以我不得不并行化数据生成。通常应该不是问题。但是当生成假数据比你接触外部服务器的反应式应用程序慢时——它告诉你一些关于基于 Netty 的 Spring web-flux 的性能(!)
好吧,我们现在想要在 ElasticSearch 中为它建立索引,拥有无穷无尽的好看的假测试数据流。
@PostConstruct void startIndexing() { index(1_000_000, 1_000); } private void index(int count, int maxConcurrency) { personGenerator .infinite() .take(count) .flatMap(this::indexDocSwallowErrors, maxConcurrency) .window(Duration.ofSeconds(1)) .flatMap(Flux::count) .subscribe(winSize -> log.debug("Got {} responses in last second", winSize)); } private Mono<IndexResponse> indexDocSwallowErrors(Doc doc) { return indexDoc(doc) .doOnError(e -> log.error("Unable to index {}", doc, e)) .onErrorResume(e -> Mono.empty()); }
当应用程序启动时,它会启动对 100 万个文档的索引。请注意,告诉 Reactor(对于 RxJava 也是如此)它应该向 ElasticSearch 调用多达一千个并发请求是多么容易。我们每秒计算一次我们收到了多少响应:
Got 2925 responses in last second Got 2415 responses in last second Got 3336 responses in last second Got 2199 responses in last second Got 1861 responses in last second
不错!尤其是考虑到有多达 1000 个并发 HTTP 请求,而我们的应用程序开始时峰值只有 30 个线程(!)好吧,它是 localhost
<-> localhost
,有罪!但是我们怎么知道所有这些呢?伐木很好,但现在是二十一世纪,我们可以做得更好!监控将是下一部分的主题。
源代码在 reactive-elastic-search
分支中可用 github.com/nurkiewicz/elastic-flux 。
标签2: Java教程地址:https://www.cundage.com/article/jcg-spring-reactor-elasticsearch-bechmarking-fake-test-data.html