简单、高效、安全的并发是RxJava的设计原则之一。然而,具有讽刺意味的是,它可能是这个图书馆最容易被误解的方面之一。让我们举一个简单的例子:假设我们有一堆 UUID
,我们必须为其中的每一个执行一组任务。第一个问题是对每个 UUID
执行 I/O 密集型操作,例如从数据库加载一个对象:
Flowable<UUID> ids = Flowable .fromCallable(UUID::randomUUID) .repeat() .take(100); ids.subscribe(id -> slowLoadBy(id));
首先,为了测试,我生成了 100 个随机 UUID。然后对于每个 UUID,我想使用以下方法加载一条记录:
Person slowLoadBy(UUID id) { //... }
slowLoadBy()
的实现是无关紧要的,只要记住它很慢并且会阻塞。使用 subscribe()
调用 slowLoadBy()
有很多缺点:
subscribe()
在设计上是单线程的,没有办法绕过它。每个UUID
按顺序加载subscribe()
时,您不能进一步转换Person
对象。这是一个终端操作一种更健壮,甚至更糟糕的方法是 map()
每个 UUID
:
Flowable<Person> people = ids .map(id -> slowLoadBy(id)); //BROKEN
这是非常可读的,但不幸的是坏了。运营商,就像订阅者一样,是单线程的。这意味着在任何给定时间只能映射一个 UUID
,这里也不允许并发。更糟糕的是,我们正在从上游继承线程/工作者。这有几个缺点。如果上游使用某些专用调度程序生成事件,我们将从该调度程序劫持线程。例如,许多运算符,如 interval()
,透明地使用 Schedulers.computation()
线程池。我们突然开始在一个完全不适合的池上执行 I/O 密集型操作。此外,我们通过这个阻塞的顺序步骤减慢了整个管道。非常非常糟糕。
您可能听说过此 subscribeOn()
运算符以及它如何实现并发。确实如此,但在应用时必须非常小心。以下示例(再次)错误:
import io.reactivex.schedulers.Schedulers; Flowable<Person> people = ids .subscribeOn(Schedulers.io()) .map(id -> slowLoadBy(id)); //BROKEN
上面的代码片段仍然是错误的。 subscribeOn()
(和 observeOn()
)几乎不会在不引入任何并发性的情况下将执行切换到不同的工作程序(线程)。流仍然按顺序处理所有事件,但在不同的线程上。换句话说——我们现在不是在从上游继承的线程上顺序消费事件,而是在 io()
线程上顺序消费它们。那么这个神话般的 flatMap()
运算符呢?
flatMap()
运算符来拯救
flatMap()
运算符通过将事件流拆分为子流来实现并发。但首先,还有一个错误的例子:
Flowable<Person> asyncLoadBy(UUID id) { return Flowable.fromCallable(() -> slowLoadBy(id)); } Flowable<Person> people = ids .subscribeOn(Schedulers.io()) .flatMap(id -> asyncLoadBy(id)); //BROKEN
哦天哪,这还是坏了! flatMap()
运算符在逻辑上做了两件事:
id -> asyncLoadBy(id)
) – 这会生成 Flowable<Flowable<Person>>
。这是有道理的,对于每个上游 UUID
,我们都会得到一个 Flowable<Person>
,所以我们最终会得到一个 Person
对象流flatMap()
尝试一次订阅所有 这些内部子流。每当任何子流发出 Person
事件时,它都会作为外部 Flowable
的结果透明地传递。从技术上讲,flatMap()
仅创建和订阅前 128 个(默认情况下,可选的 maxConcurrency
参数)子流。此外,当最后一个子流完成时,Person
的外部流也将完成。现在,这到底是为什么坏了?除非明确要求,否则 RxJava 不会引入任何线程池。例如这段代码仍然阻塞:
log.info("Setup"); Flowable<String> blocking = Flowable .fromCallable(() -> { log.info("Starting"); TimeUnit.SECONDS.sleep(1); log.info("Done"); return "Hello, world!"; }); log.info("Created"); blocking.subscribe(s -> log.info("Received {}", s)); log.info("Done");
仔细查看输出,尤其是涉及的事件和线程的顺序:
19:57:28.847 | INFO | main | Setup 19:57:28.943 | INFO | main | Created 19:57:28.949 | INFO | main | Starting 19:57:29.954 | INFO | main | Done 19:57:29.955 | INFO | main | Received Hello, world! 19:57:29.957 | INFO | main | Done
没有任何并发性,没有额外的线程。仅仅将阻塞代码包装在 Flowable
中并不能神奇地增加并发性。你必须明确地使用……subscribeOn()
:
log.info("Setup"); Flowable<String> blocking = Flowable .fromCallable(() -> { log.info("Starting"); TimeUnit.SECONDS.sleep(1); log.info("Done"); return "Hello, world!"; }) .subscribeOn(Schedulers.io()); log.info("Created"); blocking.subscribe(s -> log.info("Received {}", s)); log.info("Done");
这次的输出更有希望:
19:59:10.547 | INFO | main | Setup 19:59:10.653 | INFO | main | Created 19:59:10.662 | INFO | main | Done 19:59:10.664 | INFO | RxCachedThreadScheduler-1 | Starting 19:59:11.668 | INFO | RxCachedThreadScheduler-1 | Done 19:59:11.669 | INFO | RxCachedThreadScheduler-1 | Received Hello, world!
但是我们上次确实使用了subscribeOn()
,这是怎么回事?好吧,在外部流级别上的 subscribeOn()
基本上表示所有事件都应该在这个流中的不同线程上按顺序处理。我们并没有说要同时运行多个子流。因为所有子流都是阻塞的,当 RxJava 尝试订阅所有子流时,它实际上是一个接一个地顺序订阅。 asyncLoadBy()
并不是真正的异步,因此当 flatMap()
运算符尝试订阅它时它会阻塞。修复很容易。通常你会将 subscribeOn()
放在 asyncLoadBy()
中,但出于教育目的,我将它直接放在主管道中:
Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()));
现在它就像一个魅力!默认情况下,RxJava 将获取前 128 个上游事件(UUID
),将它们转换为子流并订阅所有这些事件。如果子流是异步的和高度可并行的(例如网络调用),我们会得到 128 个 asyncLoadBy()
并发调用。并发级别 (128) 可通过 maxConcurrency
参数配置:
Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(Schedulers.io()), 10 //maxConcurrency );
这是很多工作,你不觉得吗?并发性不应该更具声明性吗?我们不再处理 Executor
和 futures,但是,这种方法似乎仍然很容易出错。它不能像 Java 8 流中的 parallel()
一样简单吗?
输入ParallelFlowable
让我们首先再次查看我们的示例,并通过添加 filter()
使其更加复杂:
Flowable<Person> people = ids .map(this::slowLoadBy) //BROKEN .filter(this::hasLowRisk); //BROKEN
其中 hasLowRisk()
是一个slow 谓词:
boolean hasLowRisk(Person p) { //slow... }
我们已经知道解决这个问题的惯用方法是使用 flatMap()
两次:
Flowable<Person> people = ids .flatMap(id -> asyncLoadBy(id).subscribeOn(io())) .flatMap(p -> asyncHasLowRisk(p).subscribeOn(io()));
asyncHasLowRisk()
相当晦涩——它要么在谓词通过时返回单元素流,要么在谓词失败时返回空流。这就是使用 filter()
模拟 flatMap()
的方式。我们能做得更好吗?从 RxJava 2.0.5 开始,有一个新的运算符叫做……parallel()
!令人惊讶的是,由于许多误解和误用,在 RxJava 成为 1.0 之前,同名运算符 被删除了。 2.x 中的 parallel()
似乎最终以安全和声明的方式解决了惯用的并发问题。首先,让我们看一些漂亮的代码!
Flowable<Person> people = ids .parallel(10) .runOn(Schedulers.io()) .map(this::slowLoadBy) .filter(this::hasLowRisk) .sequential();
就这样! parallel()
和 sequential()
之间的代码块并行运行。我们有什么在这里?首先,新的 parallel()
运算符将 Flowable<UUID>
转换为 ParallelFlowable<UUID>
,它的 API 比 Flowable 小得多。你很快就会明白为什么。可选的 int
参数(在我们的示例中为 10
)定义并发性,或者(如文档所述)创建了多少并发“rails”。因此,对于我们来说,我们将单个 Flowable<Person>
拆分为 10 个并发的独立轨道(想想:线程)。来自 UUID
的原始流的事件被拆分 (modulo 10
) 到不同的轨道,即彼此独立的子流。将它们视为将上游事件发送到 10 个独立的线程中。但首先我们必须定义这些线程来自何处——使用方便的 runOn()
运算符。这比您无法控制并发级别的 Java 8 流上的 parallel()
要好得多。
此时我们有一个ParallelFlowable
。当一个事件出现在上游 (UUID
) 中时,它会被委托给 10 个并发的独立管道之一。 Pipeline 提供了有限的运算符子集,可以安全地并发运行,例如map()
和 filter()
,还有 reduce()
。没有 buffer()
、take()
等,因为当同时在许多子流上调用时,它们的语义不清楚。我们的阻塞 slowLoadBy()
和 hasLowRisk()
仍然按顺序调用,但只在单个“轨道”内。因为我们现在有 10 个并发“rails”,所以我们不费吹灰之力就有效地将它们并行化了。
当事件到达子流(“rail”)的末尾时,它们会遇到 sequential()
运算符。此运算符将 ParallelFlowable
变回 Flowable
。只要我们的映射器和过滤器是线程安全的,parallel()
/sequential()
对就可以提供非常简单的并行化流的方法。一个小警告——您将不可避免地收到重新排序的消息。顺序 map()
和 filter()
总是保持顺序(像大多数运算符一样)。但是一旦你在 parallel()
块中运行它们,顺序就会丢失。这允许更大的并发性,但你必须记住这一点。
您应该使用 parallel()
而不是嵌套的 flatMap()
来并行化您的代码吗?这取决于你,但 parallel()
似乎更容易阅读和掌握。
标签2: Java教程地址:https://www.cundage.com/article/jcg-idiomatic-concurrency-flatmap-vs-parallel-rxjava-faq.html