Spring、Reactor 和 ElasticSearch从回调到反应流

位置:首页>文章>详情   分类: Java教程 > 编程技术   阅读(49)   2024-06-06 06:21:36

Spring 5(和 Boot 2,当它在几个星期后到达时)是一场革命。不是“基于 XML 的注释”或“基于注释的 Java 类”类型的革命。它确实是一个革命性的框架,可以编写全新类别的应用程序。近年来,我对这个框架有点畏惧。 “Spring Cloud 是简化 Spring Boot 使用的框架,是简化 Spring 使用的框架,是简化企业开发的框架。” start.spring.io(也称为“start…dot spring…dot I…O”)列出了 120 个不同的模块 (!),您可以将它们添加到您的服务中。如今,Spring 成为了一个庞大的伞形项目,我可以想象为什么有些人(仍然!)更喜欢 Java EE(或现在的任何称呼)。

但是 Spring 5 带来了响应式革命。它不再只是阻塞 servlet API 和各种 Web 框架的包装器。在 Project Reactor 之上的 Spring 5 允许编写高性能、极快和可扩展的服务器,完全避免了 servlet 堆栈。该死的,CLASSPATH 上没有 Jetty,甚至没有 servlet API!在 Spring 5 web-flux 的核心,我们会发现 Netty,这是一个用于编写异步客户端和服务器的低级框架。最后,Spring 成为响应式框架家族中的一等公民。 Java 开发人员可以在不离开舒适区并使用 https://doc.akka.io/docs/akka-http/current/https://www.playframework 的情况下实现快速服务.com/. Spring 5 是一个完全响应式的现代工具,用于构建高度可扩展和有弹性的应用程序。尽管如此,控制器、bean、依赖注入等基本原理都是相同的。而且,升级路径很顺畅,我们可以逐步添加功能,而不是学习全新的、陌生的框架。说的够多了,让我们写一些代码。

在本文中,我们将编写一个简单的无头应用程序,用于在 ElasticSearch 中大量索引文档。我们的目标是仅使用少数线程实现数千个并发连接,即使服务器变慢也是如此。但是,与例如Spring Data MongoDB、Spring Data ElasticSearch 本身不支持非阻塞存储库。好吧,后者似乎甚至不再维护,当前版本已有 3 年历史。许多文章以 Spring 5 + MongoDB 为目标,其存储库返回非阻塞流(来自 RxJava 的 FluxFlowable)。这个会更高级一点。

ElasticSearch 6 Java API 使用 RESTful 接口并使用非阻塞 HTTP 客户端实现。不幸的是,它使用回调而不是像 CompletableFuture 这样理智的东西。因此,让我们自己构建客户端适配器。

使用 Fluxes 和 Monos 的 ElasticSearch 客户端

本文的源代码可在 reactive-elastic-search 分支上的 github.com/nurkiewicz/elastic-flux 获得。

我们想通过返回 FluxMono 来构建支持 Project Reactor 的 ElasticSearch Java 客户端。当然,如果底层流是完全异步的并且不消耗线程,我们将获得最大的好处。幸运的是,Java API 就是这样。首先,让我们将 ElasticSearch 的客户端设置为 Spring bean:

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
 
@Bean
RestHighLevelClient restHighLevelClient() {
    return new RestHighLevelClient(
            RestClient
                    .builder(new HttpHost("localhost", 9200))
                    .setRequestConfigCallback(config -> config
                            .setConnectTimeout(5_000)
                            .setConnectionRequestTimeout(5_000)
                            .setSocketTimeout(5_000)
                    )
                    .setMaxRetryTimeoutMillis(5_000));
}

在现实生活中,我们显然会将大部分内容参数化。我们将索引简单的 JSON 文档,暂时,它们的内容并不重要:

@Value
class Doc {
    private final String username;
    private final String json;
}

我们将编写的代码包装 RestHighLevelClient 并通过返回 Mono<IndexResponse> 使其变得更加高级MonoCompletableFuture 非常相似,但有两个例外:

  • 它是惰性的——只要你不订阅,就不会开始计算
  • CompletableFuture不同,Mono可以正常完成而不会发出任何值

第二个区别对我来说总是有点误导。在 RxJava 2.x 中有两种不同的类型:Single(总是以值或错误完成)和 Maybe(如 Mono)。可惜 Reactor 没有做出这种区分。没关系,适配器层是什么样子的?普通 Elastic 的 API 如下所示:

client.indexAsync(indexRequest, new ActionListener() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        //got response
    }
 
    @Override
    public void onFailure(Exception e) {
        //got error
    }
});

您可以看到这是怎么回事:回调地狱。与其将自定义 ActionListener 公开为该逻辑的参数,不如将其包装在 Mono 中:

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
 
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
 
private Mono<IndexResponse> indexDoc(Doc doc) {
    return Mono.create(sink -> {
        IndexRequest indexRequest = new IndexRequest("people", "person", doc.getUsername());
        indexRequest.source(doc.getJson(), XContentType.JSON);
        client.indexAsync(indexRequest, new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                sink.success(indexResponse);
            }
 
            @Override
            public void onFailure(Exception e) {
                sink.error(e);
            }
        });
    });
}

我们必须创建 IndexRequest 包装 JSON 文档并通过 RESTful API 发送它。但这不是重点。我们正在使用 Mono.create() 方法,它有一些缺点,但稍后会详细介绍。 Mono 是惰性的,所以仅仅调用 indexDoc() 是不够的,没有向 ElasticSearch 发出 HTTP 请求。然而,每当有人订阅这个单元素源时,create() 中的逻辑就会被执行。关键行是 sink.success()sink.error()。它们将来自 ElasticSearch(来自后台、异步线程)的结果传播到流中。如何在实践中使用这种方法?很简单!

Doc doc = //...
indexDoc(doc)
        .subscribe(
                indexResponse -> log.info("Got response")
        );

当然,反应式流处理的真正力量来自组合多个流。但我们迈出了第一步:将基于回调的异步 API 转换为通用流。如果您(不)幸运地使用 MongoDB,它在存储库中内置支持响应式类型,如 MonoFluxCassandraRedis 也是如此。在下一篇文章中,我们将学习如何生成一些假数据并同时对其进行索引。

标签2: Java教程
地址:https://www.cundage.com/article/jcg-spring-reactor-elasticsearch-callbacks-reactive-streams.html

相关阅读

Java HashSet 教程展示了如何使用 Java HashSet 集合。 Java哈希集 HashSet 是一个不包含重复元素的集合。此类为基本操作(添加、删除、包含和大小)提供恒定时间性...
SpringApplicationBuilder 教程展示了如何使用 SpringApplicationBuilder 创建一个简单的 Spring Boot 应用程序。 春天 是用于创建企业应...
通道是继 buffers 之后 java.nio 的第二个主要新增内容,我们在之前的教程中已经详细了解了这一点。通道提供与 I/O 服务的直接连接。 通道是一种在字节缓冲区和通道另一端的实体(通...
课程大纲 Elasticsearch 是一个基于 Lucene 的搜索引擎。它提供了一个分布式的、支持多租户的全文搜索引擎,带有 HTTP Web 界面和无模式的 JSON 文档。 Elasti...
解析器是强大的工具,使用 ANTLR 可以编写可用于多种不同语言的各种解析器。 在这个完整的教程中,我们将: 解释基础:什么是解析器,它可以用来做什么 查看如何设置 ANTLR 以便在 Java...
Java 是用于开发各种桌面应用程序、Web 应用程序和移动应用程序的最流行的编程语言之一。以下文章将帮助您快速熟悉 Java 语言,并迈向 API 和云开发等更复杂的概念。 1. Java语言...
Java中的继承是指子类继承或获取父类的所有非私有属性和行为的能力。继承是面向对象编程的四大支柱之一,用于提高层次结构中类之间的代码可重用性。 在本教程中,我们将了解 Java 支持的继承类型,...
Java Message Service 是一种支持正式通信的 API,称为 网络上计算机之间的消息传递。 JMS 为支持 Java 程序的标准消息协议和消息服务提供了一个通用接口。 JMS 提...
之前,我介绍了spring 3 + hibernate 集成 示例和struts 2 hello world 示例。在本教程中,我将讨论在将 spring 框架与 struts 与 hibern...
Java 项目中的一项常见任务是将日期格式化或解析为字符串,反之亦然。解析日期意味着你有一个代表日期的字符串,例如“2017-08-3”,你想把它转换成一个代表 Java 中日期的对象,例如Ja...