惯用并发:flatMap() 与 parallel() – RxJava FAQ

位置:首页>文章>详情   分类: Java教程 > 编程技术   阅读(391)   2024-01-27 07:14:57

简单、高效、安全的并发是RxJava的设计原则之一。然而,具有讽刺意味的是,它可能是这个图书馆最容易被误解的方面之一。让我们举一个简单的例子:假设我们有一堆 UUID,我们必须为其中的每一个执行一组任务。第一个问题是对每个 UUID 执行 I/O 密集型操作,例如从数据库加载一个对象:

Flowable<UUID> ids = Flowable
        .fromCallable(UUID::randomUUID)
        .repeat()
        .take(100);
 
ids.subscribe(id -> slowLoadBy(id));

首先,为了测试,我生成了 100 个随机 UUID。然后对于每个 UUID,我想使用以下方法加载一条记录:

Person slowLoadBy(UUID id) {
    //...
}

slowLoadBy() 的实现是无关紧要的,只要记住它很慢并且会阻塞。使用 subscribe() 调用 slowLoadBy() 有很多缺点:

  • subscribe() 在设计上是单线程的,没有办法绕过它。每个UUID按顺序加载
  • 当您调用subscribe() 时,您不能进一步转换Person 对象。这是一个终端操作

一种更健壮,甚至更糟糕的方法是 map() 每个 UUID

Flowable<Person> people = ids
        .map(id -> slowLoadBy(id));  //BROKEN

这是非常可读的,但不幸的是坏了。运营商,就像订阅者一样,是单线程的。这意味着在任何给定时间只能映射一个 UUID,这里也不允许并发。更糟糕的是,我们正在从上游继承线程/工作者。这有几个缺点。如果上游使用某些专用调度程序生成事件,我们将从该调度程序劫持线程。例如,许多运算符,如 interval(),透明地使用 Schedulers.computation() 线程池。我们突然开始在一个完全不适合的池上执行 I/O 密集型操作。此外,我们通过这个阻塞的顺序步骤减慢了整个管道。非常非常糟糕。

您可能听说过此 subscribeOn() 运算符以及它如何实现并发。确实如此,但在应用时必须非常小心。以下示例(再次)错误

import io.reactivex.schedulers.Schedulers;
 
 
Flowable<Person> people = ids
        .subscribeOn(Schedulers.io())
        .map(id -> slowLoadBy(id)); //BROKEN

上面的代码片段仍然是错误的。 subscribeOn()(和 observeOn())几乎不会在不引入任何并发性的情况下将执行切换到不同的工作程序(线程)。流仍然按顺序处理所有事件,但在不同的线程上。换句话说——我们现在不是在从上游继承的线程上顺序消费事件,而是在 io() 线程上顺序消费它们。那么这个神话般的 flatMap() 运算符呢?

flatMap() 运算符来拯救

flatMap() 运算符通过将事件流拆分为子流来实现并发。但首先,还有一个错误的例子:

Flowable<Person> asyncLoadBy(UUID id) {
    return Flowable.fromCallable(() -> slowLoadBy(id));
}
 
Flowable<Person> people = ids
        .subscribeOn(Schedulers.io())
        .flatMap(id -> asyncLoadBy(id)); //BROKEN

哦天哪,这还是坏了flatMap() 运算符在逻辑上做了两件事:

  • 在每个上游事件上应用转换 (id -> asyncLoadBy(id)) – 这会生成 Flowable<Flowable<Person>>。这是有道理的,对于每个上游 UUID,我们都会得到一个 Flowable<Person>,所以我们最终会得到一个 Person 对象流
  • 然后 flatMap() 尝试一次订阅所有 这些内部子流。每当任何子流发出 Person 事件时,它都会作为外部 Flowable 的结果透明地传递。

从技术上讲,flatMap() 仅创建和订阅前 128 个(默认情况下,可选的 maxConcurrency 参数)子流。此外,当最后一个子流完成时,Person 的外部流也将完成。现在,这到底是为什么坏了?除非明确要求,否则 RxJava 不会引入任何线程池。例如这段代码仍然阻塞:

log.info("Setup");
Flowable<String> blocking = Flowable
        .fromCallable(() -> {
            log.info("Starting");
            TimeUnit.SECONDS.sleep(1);
            log.info("Done");
            return "Hello, world!";
        });
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");

仔细查看输出,尤其是涉及的事件和线程的顺序:

19:57:28.847 | INFO  | main | Setup
19:57:28.943 | INFO  | main | Created
19:57:28.949 | INFO  | main | Starting
19:57:29.954 | INFO  | main | Done
19:57:29.955 | INFO  | main | Received Hello, world!
19:57:29.957 | INFO  | main | Done

没有任何并发性,没有额外的线程。仅仅将阻塞代码包装在 Flowable 中并不能神奇地增加并发性。你必须明确地使用……subscribeOn()

log.info("Setup");
Flowable<String> blocking = Flowable
        .fromCallable(() -> {
            log.info("Starting");
            TimeUnit.SECONDS.sleep(1);
            log.info("Done");
            return "Hello, world!";
        })
        .subscribeOn(Schedulers.io());
log.info("Created");
blocking.subscribe(s -> log.info("Received {}", s));
log.info("Done");

这次的输出更有希望:

19:59:10.547 | INFO  | main | Setup
19:59:10.653 | INFO  | main | Created
19:59:10.662 | INFO  | main | Done
19:59:10.664 | INFO  | RxCachedThreadScheduler-1 | Starting
19:59:11.668 | INFO  | RxCachedThreadScheduler-1 | Done
19:59:11.669 | INFO  | RxCachedThreadScheduler-1 | Received Hello, world!

但是我们上次确实使用了subscribeOn(),这是怎么回事?好吧,在外部流级别上的 subscribeOn() 基本上表示所有事件都应该在这个流中的不同线程上按顺序处理。我们并没有说要同时运行多个子流。因为所有子流都是阻塞的,当 RxJava 尝试订阅所有子流时,它实际上是一个接一个地顺序订阅。 asyncLoadBy() 并不是真正的异步,因此当 flatMap() 运算符尝试订阅它时它会阻塞。修复很容易。通常你会将 subscribeOn() 放在 asyncLoadBy() 中,但出于教育目的,我将它直接放在主管道中:

Flowable<Person> people = ids
    .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));

现在它就像一个魅力!默认情况下,RxJava 将获取前 128 个上游事件(UUID),将它们转换为子流并订阅所有这些事件。如果子流是异步的和高度可并行的(例如网络调用),我们会得到 128 个 asyncLoadBy() 并发调用。并发级别 (128) 可通过 maxConcurrency 参数配置:

Flowable<Person> people = ids
    .flatMap(id ->
                asyncLoadBy(id).subscribeOn(Schedulers.io()),
                10  //maxConcurrency
    );

这是很多工作,你不觉得吗?并发性不应该更具声明性吗?我们不再处理 Executor 和 futures,但是,这种方法似乎仍然很容易出错。它不能像 Java 8 流中的 parallel() 一样简单吗?

输入ParallelFlowable

让我们首先再次查看我们的示例,并通过添加 filter() 使其更加复杂:

Flowable<Person> people = ids
        .map(this::slowLoadBy)     //BROKEN
        .filter(this::hasLowRisk); //BROKEN

其中 hasLowRisk() 是一个slow 谓词:

boolean hasLowRisk(Person p) {
    //slow...
}

我们已经知道解决这个问题的惯用方法是使用 flatMap() 两次:

Flowable<Person> people = ids
        .flatMap(id -> asyncLoadBy(id).subscribeOn(io()))
        .flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));

asyncHasLowRisk() 相当晦涩——它要么在谓词通过时返回单元素流,要么在谓词失败时返回空流。这就是使用 filter() 模拟 flatMap() 的方式。我们能做得更好吗?从 RxJava 2.0.5 开始,有一个新的运算符叫做……parallel()!令人惊讶的是,由于许多误解和误用,在 RxJava 成为 1.0 之前,同名运算符 被删除了。 2.x 中的 parallel() 似乎最终以安全和声明的方式解决了惯用的并发问题。首先,让我们看一些漂亮的代码!

Flowable<Person> people = ids
        .parallel(10)
        .runOn(Schedulers.io())
        .map(this::slowLoadBy)
        .filter(this::hasLowRisk)
        .sequential();

就这样! parallel()sequential() 之间的代码块并行运行。我们有什么在这里?首先,新的 parallel() 运算符将 Flowable<UUID> 转换为 ParallelFlowable<UUID> ,它的 API 比 Flowable 小得多。你很快就会明白为什么。可选的 int 参数(在我们的示例中为 10)定义并发性,或者(如文档所述)创建了多少并发“rails”。因此,对于我们来说,我们将单个 Flowable<Person> 拆分为 10 个并发的独立轨道(想想:线程)。来自 UUID 的原始流的事件被拆分 (modulo 10) 到不同的轨道,即彼此独立的子流。将它们视为将上游事件发送到 10 个独立的线程中。但首先我们必须定义这些线程来自何处——使用方便的 runOn() 运算符。这比您无法控制并发级别的 Java 8 流上的 parallel() 要好得多。

此时我们有一个ParallelFlowable。当一个事件出现在上游 (UUID) 中时,它会被委托给 10 个并发的独立管道之一。 Pipeline 提供了有限的运算符子集,可以安全地并发运行,例如map()filter(),还有 reduce()。没有 buffer()take() 等,因为当同时在许多子流上调用时,它们的语义不清楚。我们的阻塞 slowLoadBy()hasLowRisk() 仍然按顺序调用,但只在单个“轨道”内。因为我们现在有 10 个并发“rails”,所以我们不费吹灰之力就有效地将它们并行化了。

当事件到达子流(“rail”)的末尾时,它们会遇到 sequential() 运算符。此运算符将 ParallelFlowable 变回 Flowable。只要我们的映射器和过滤器是线程安全的,parallel()/sequential() 对就可以提供非常简单的并行化流的方法。一个小警告——您将不可避免地收到重新排序的消息。顺序 map()filter() 总是保持顺序(像大多数运算符一样)。但是一旦你在 parallel() 块中运行它们,顺序就会丢失。这允许更大的并发性,但你必须记住这一点。

您应该使用 parallel() 而不是嵌套的 flatMap() 来并行化您的代码吗?这取决于你,但 parallel() 似乎更容易阅读和掌握。

标签2: Java教程
地址:https://www.cundage.com/article/jcg-idiomatic-concurrency-flatmap-vs-parallel-rxjava-faq.html

相关阅读

Java HashSet 教程展示了如何使用 Java HashSet 集合。 Java哈希集 HashSet 是一个不包含重复元素的集合。此类为基本操作(添加、删除、包含和大小)提供恒定时间性...
SpringApplicationBuilder 教程展示了如何使用 SpringApplicationBuilder 创建一个简单的 Spring Boot 应用程序。 春天 是用于创建企业应...
通道是继 buffers 之后 java.nio 的第二个主要新增内容,我们在之前的教程中已经详细了解了这一点。通道提供与 I/O 服务的直接连接。 通道是一种在字节缓冲区和通道另一端的实体(通...
课程大纲 Elasticsearch 是一个基于 Lucene 的搜索引擎。它提供了一个分布式的、支持多租户的全文搜索引擎,带有 HTTP Web 界面和无模式的 JSON 文档。 Elasti...
解析器是强大的工具,使用 ANTLR 可以编写可用于多种不同语言的各种解析器。 在这个完整的教程中,我们将: 解释基础:什么是解析器,它可以用来做什么 查看如何设置 ANTLR 以便在 Java...
Java中的继承是指子类继承或获取父类的所有非私有属性和行为的能力。继承是面向对象编程的四大支柱之一,用于提高层次结构中类之间的代码可重用性。 在本教程中,我们将了解 Java 支持的继承类型,...
Java 是用于开发各种桌面应用程序、Web 应用程序和移动应用程序的最流行的编程语言之一。以下文章将帮助您快速熟悉 Java 语言,并迈向 API 和云开发等更复杂的概念。 1. Java语言...
Java 项目中的一项常见任务是将日期格式化或解析为字符串,反之亦然。解析日期意味着你有一个代表日期的字符串,例如“2017-08-3”,你想把它转换成一个代表 Java 中日期的对象,例如Ja...
Java Message Service 是一种支持正式通信的 API,称为 网络上计算机之间的消息传递。 JMS 为支持 Java 程序的标准消息协议和消息服务提供了一个通用接口。 JMS 提...
之前,我介绍了spring 3 + hibernate 集成 示例和struts 2 hello world 示例。在本教程中,我将讨论在将 spring 框架与 struts 与 hibern...