Spring 5(和 Boot 2,当它在几个星期后到达时)是一场革命。不是“基于 XML 的注释”或“基于注释的 Java 类”类型的革命。它确实是一个革命性的框架,可以编写全新类别的应用程序。近年来,我对这个框架有点畏惧。 “Spring Cloud 是简化 Spring Boot 使用的框架,是简化 Spring 使用的框架,是简化企业开发的框架。” start.spring.io(也称为“start…dot spring…dot I…O”)列出了 120 个不同的模块 (!),您可以将它们添加到您的服务中。如今,Spring 成为了一个庞大的伞形项目,我可以想象为什么有些人(仍然!)更喜欢 Java EE(或现在的任何称呼)。
但是 Spring 5 带来了响应式革命。它不再只是阻塞 servlet API 和各种 Web 框架的包装器。在 Project Reactor 之上的 Spring 5 允许编写高性能、极快和可扩展的服务器,完全避免了 servlet 堆栈。该死的,CLASSPATH 上没有 Jetty,甚至没有 servlet API!在 Spring 5 web-flux 的核心,我们会发现 Netty,这是一个用于编写异步客户端和服务器的低级框架。最后,Spring 成为响应式框架家族中的一等公民。 Java 开发人员可以在不离开舒适区并使用 https://doc.akka.io/docs/akka-http/current/ 或 https://www.playframework 的情况下实现快速服务.com/. Spring 5 是一个完全响应式的现代工具,用于构建高度可扩展和有弹性的应用程序。尽管如此,控制器、bean、依赖注入等基本原理都是相同的。而且,升级路径很顺畅,我们可以逐步添加功能,而不是学习全新的、陌生的框架。说的够多了,让我们写一些代码。
在本文中,我们将编写一个简单的无头应用程序,用于在 ElasticSearch 中大量索引文档。我们的目标是仅使用少数线程实现数千个并发连接,即使服务器变慢也是如此。但是,与例如Spring Data MongoDB、Spring Data ElasticSearch 本身不支持非阻塞存储库。好吧,后者似乎甚至不再维护,当前版本已有 3 年历史。许多文章以 Spring 5 + MongoDB 为目标,其存储库返回非阻塞流(来自 RxJava 的 Flux
或 Flowable
)。这个会更高级一点。
ElasticSearch 6 Java API 使用 RESTful 接口并使用非阻塞 HTTP 客户端实现。不幸的是,它使用回调而不是像 CompletableFuture
这样理智的东西。因此,让我们自己构建客户端适配器。
使用 Fluxes 和 Monos 的 ElasticSearch 客户端
本文的源代码可在 reactive-elastic-search
分支上的 github.com/nurkiewicz/elastic-flux 获得。
我们想通过返回 Flux
或 Mono
来构建支持 Project Reactor 的 ElasticSearch Java 客户端。当然,如果底层流是完全异步的并且不消耗线程,我们将获得最大的好处。幸运的是,Java API 就是这样。首先,让我们将 ElasticSearch 的客户端设置为 Spring bean:
import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; @Bean RestHighLevelClient restHighLevelClient() { return new RestHighLevelClient( RestClient .builder(new HttpHost("localhost", 9200)) .setRequestConfigCallback(config -> config .setConnectTimeout(5_000) .setConnectionRequestTimeout(5_000) .setSocketTimeout(5_000) ) .setMaxRetryTimeoutMillis(5_000)); }
在现实生活中,我们显然会将大部分内容参数化。我们将索引简单的 JSON 文档,暂时,它们的内容并不重要:
@Value class Doc { private final String username; private final String json; }
我们将编写的代码包装 RestHighLevelClient
并通过返回 Mono<IndexResponse>
使其变得更加高级。 Mono
与 CompletableFuture
非常相似,但有两个例外:
CompletableFuture
不同,Mono
可以正常完成而不会发出任何值第二个区别对我来说总是有点误导。在 RxJava 2.x 中有两种不同的类型:Single
(总是以值或错误完成)和 Maybe
(如 Mono
)。可惜 Reactor 没有做出这种区分。没关系,适配器层是什么样子的?普通 Elastic 的 API 如下所示:
client.indexAsync(indexRequest, new ActionListener() { @Override public void onResponse(IndexResponse indexResponse) { //got response } @Override public void onFailure(Exception e) { //got error } });
您可以看到这是怎么回事:回调地狱。与其将自定义 ActionListener
公开为该逻辑的参数,不如将其包装在 Mono
中:
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; private Mono<IndexResponse> indexDoc(Doc doc) { return Mono.create(sink -> { IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername()); indexRequest.source(doc.getJson(), XContentType.JSON); client.indexAsync(indexRequest, new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { sink.success(indexResponse); } @Override public void onFailure(Exception e) { sink.error(e); } }); }); }
我们必须创建 IndexRequest
包装 JSON 文档并通过 RESTful API 发送它。但这不是重点。我们正在使用 Mono.create()
方法,它有一些缺点,但稍后会详细介绍。 Mono
是惰性的,所以仅仅调用 indexDoc()
是不够的,没有向 ElasticSearch 发出 HTTP 请求。然而,每当有人订阅这个单元素源时,create()
中的逻辑就会被执行。关键行是 sink.success()
和 sink.error()
。它们将来自 ElasticSearch(来自后台、异步线程)的结果传播到流中。如何在实践中使用这种方法?很简单!
Doc doc = //... indexDoc(doc) .subscribe( indexResponse -> log.info("Got response") );
当然,反应式流处理的真正力量来自组合多个流。但我们迈出了第一步:将基于回调的异步 API 转换为通用流。如果您(不)幸运地使用 MongoDB,它在存储库中内置支持响应式类型,如 Mono
或 Flux
。 Cassandra 和 Redis 也是如此。在下一篇文章中,我们将学习如何生成一些假数据并同时对其进行索引。
标签2: Java教程地址:https://www.cundage.com/article/jcg-spring-reactor-elasticsearch-callbacks-reactive-streams.html