Flux.merge 使用说明书

merge

public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source,
                                int concurrency)
Merge data from  Publisher sequences emitted by the passed  Publisher into an interleaved merged sequence. Unlike  concat, inner sources are subscribed to eagerly (but at most concurrency sources are subscribed to at the same time).

将来自传入的 Publisher 发出的序列中的数据合并为一个交错的合并序列。与 concat 不同,内部源是被迫早期订阅的(但同时最多只能订阅指定数量的源)。

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

请注意,merge 是专门为异步源或有限源设计的。在处理无限源且该源并未在专用的 Scheduler 上发布时,必须将该源隔离到其自己的 Scheduler 中,因为如果不这样做,merge 将会试图在订阅其他源之前耗尽该源的所有数据。

Type Parameters:

T - the merged type

Parameters:

source - a  Publisher of  Publisher sources to merge
concurrency - the request produced to the main source thus limiting concurrent merge backlog

Returns:

a merged Flux

类型参数:

T - 合并后的类型。

参数说明:

source - 一个 Publisher,其发出多个要合并的 Publisher 源。
concurrency - 发给主源的请求量,用于限制并发合并的积压。

返回值:

返回一个合并后的 Flux。

public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency) 

是 Reactor 提供的用于将多个 Publisher 合并成一个 Flux 的方法。

它可以从一个上游 Publisher 发射的 Publisher 序列中获取元素,并合并这些 Publisher 所发射的数据到一个 Flux 流中。

通过 concurrency 参数,可以限制并发的合并数量。

1. 方法介绍

  • merge(Publisher<? extends Publisher<? extends T>> source, int concurrency): 该方法接受一个发射 Publisher 的 Publisher 作为数据源,然后将这些 Publisher 合并成一个 Flux,最终发射这些内部 Publisher 中的数据流。并发参数控制了最多有多少个内部 Publisher 同时进行数据发射。

2. 参数

  • source: 这是一个 Publisher,它发射多个 Publisher(这些 Publisher 又各自发射元素)。
  • concurrency: 表示同时并发处理的 Publisher 的最大数量。它限制了最多有多少个 Publisher 可以并发发射数据。

3. 返回值

  • Flux<T>: 返回一个包含所有合并数据的 Flux。这个 Flux 发射的元素来自 source 提供的多个 Publisher 中的数据,并以并发方式处理这些 Publisher。

4. 示例代码

合并多个 Publisher,限制并发数
java

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FluxMergeConcurrencyExample {
    public static void main(String[] args) {
        // 创建多个 Mono
        Mono<String> publisher1 = Mono.just("A");
        Mono<String> publisher2 = Mono.just("B");
        Mono<String> publisher3 = Mono.just("C");

        // 将这些 Mono 放入一个 Flux 中
        Flux<Mono<String>> publishers = Flux.just(publisher1, publisher2, publisher3);

        // 使用 merge 方法,将 Flux<Mono<String>> 合并成 Flux<String>,并限制并发为 2
        Flux<String> mergedFlux = Flux.merge(publishers, 2);

        // 订阅并打印合并后的结果
        mergedFlux.subscribe(System.out::println);
    }
}

输出结果:

A
B
C

在这个例子中,我们有三个 Mono(publisher1、publisher2 和 publisher3),这些 Mono 各自发射一个字符串。我们将这些 Mono 包装在一个 Flux 中,并使用 merge() 方法合并它们。concurrency 参数设为 2,限制了最多两个 Publisher 同时处理。

5. 并发控制的重要性

  • concurrency 参数: 控制并发的 Publisher 数量非常重要,尤其是在处理 IO 密集型任务(例如网络请求、数据库查询)时。通过限制并发数量,可以避免同时过多的任务消耗系统资源。如果你设置 concurrency 为一个较高的值,可能会导致资源耗尽或性能下降;设置为较低的值,则会限制系统吞吐量。

示例:设置较低的并发数

将并发数设置为 1,表示按顺序处理每个 Publisher:

java

Flux<String> mergedFlux = Flux.merge(publishers, 1);

在这种情况下,publisher1 完成后,才会处理 publisher2,依次类推。这种方式避免了并发执行,所有 Publisher 将按顺序执行。

6. 异步处理

Flux.merge() 是异步处理的,它会并发处理多个 Publisher,并将所有发射的数据合并成一个流。如果 Publisher 是异步的(例如网络请求或异步任务),合并后的 Flux 将会保持异步行为,尽快发射数据。

7. 其他相关方法

  • Flux.merge(Publisher<T>... sources): 用于合并多个直接提供的 Publisher。
  • Flux.mergeSequential(): 和 merge() 不同的是,mergeSequential() 会按照 Publisher 的发射顺序来合并,而不是并发处理。
  • Flux.concat(): concat() 和 merge() 类似,但是它会按顺序串行处理所有 Publisher,而不是并发处理。

8. 使用场景

  • API 请求的批量处理: 在处理多个外部 API 请求时,可以使用 merge() 来同时发起多个请求,并发处理返回结果。
  • 异步任务的调度: 在调度多个异步任务时,可以通过限制并发的 Publisher 数量,来控制系统的负载。

9. 注意事项

  • 资源管理: 当使用高并发数处理大量 Publisher 时,可能会占用大量的系统资源,特别是在处理 IO 密集型任务时,需要适当调整 concurrency 参数以平衡性能和资源消耗。
  • 错误处理: 合并过程中,如果其中一个 Publisher 发射了错误,整个合并的 Flux 可能会立即终止。可以通过 onErrorResume() 或 onErrorContinue() 等操作符来处理这种情况。

10. 总结

  • Flux.merge(Publisher<? extends Publisher<? extends T>> source, int concurrency) 是一个强大的方法,它允许我们并发地合并来自多个 Publisher 的数据流。通过 concurrency 参数,我们可以精细控制并发的数量,以提高系统性能或限制资源使用。
  • 适用于异步任务、大量数据流合并等场景,同时也需要注意资源消耗和错误处理。

merge

public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source,
                                int concurrency,
                                int prefetch)
Merge data from  Publisher sequences emitted by the passed  Publisher into an interleaved merged sequence. Unlike  concat, inner sources are subscribed to eagerly (but at most concurrency sources are subscribed to at the same time).

将来自传入的 Publisher 发出的多个序列中的数据合并为一个交错的合并序列。与 concat 不同,内部源会被立即订阅(但最多同时订阅指定数量的源)。

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

请注意,merge 是专门为异步源或有限源设计的。在处理一个无限源时,如果该源并未在专用的 Scheduler 上发布,则必须将该源隔离到其自己的 Scheduler 中,因为如果不这样做,merge 将尝试在订阅其他源之前先耗尽该源的所有数据。

Type Parameters:

T - the merged type

Parameters:

source - a  Publisher of  Publisher sources to merge
concurrency - the request produced to the main source thus limiting concurrent merge backlog
prefetch - the inner source request size

Returns:

a merged  Flux

类型参数:

T - 合并后的类型。

参数说明:

source - 一个 Publisher,其发出多个要合并的 Publisher 源。
concurrency - 发给主源的请求量,用于限制并发合并的积压。
prefetch - 内部源的请求大小。

返回值:

返回一个合并后的 Flux。

public static <T> Flux<T> merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch)

是 Reactor 提供的用于将多个 Publisher 合并为一个 Flux 的方法。

它与 merge(Publisher<? extends Publisher<? extends T>> source, int concurrency) 类似,但多了一个 prefetch 参数,控制在每个 Publisher 中预取(预读取)的数据量。

这种设计可以在流量控制和并发性能之间进行更细致的调节。

1. 方法介绍

  • merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch): 该方法从一个发射 Publisher 的 Publisher 中合并内部发射的多个 Publisher 的数据流,并使用 concurrency 控制最大并发数量,使用 prefetch 控制每个内部 Publisher 在合并过程中预取的数据量。

2. 参数

  • source: 这是一个 Publisher,它会发射多个 Publisher。这些 Publisher 各自会发射一系列元素。
  • concurrency: 并发的最大数量,表示同时可以处理的 Publisher 的最大数量。这个参数控制合并操作中最多有多少个 Publisher 可以同时并行发射数据。
  • prefetch: 每个内部 Publisher 的预取量,表示在订阅时从每个 Publisher 中最多预取多少个数据项。这可以帮助平衡响应时间和内存占用量。

3. 返回值

  • Flux<T>: 返回一个包含所有合并数据的 Flux,这个 Flux 发射的数据来自 source 中的多个 Publisher 的合并。

4. 示例代码

合并多个 Publisher,并设置并发数和预取量
java

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FluxMergePrefetchExample {
    public static void main(String[] args) {
        // 创建多个 Mono
        Mono<String> publisher1 = Mono.just("A");
        Mono<String> publisher2 = Mono.just("B");
        Mono<String> publisher3 = Mono.just("C");

        // 将这些 Mono 放入一个 Flux 中
        Flux<Mono<String>> publishers = Flux.just(publisher1, publisher2, publisher3);

        // 使用 merge 方法,设置并发为 2,预取量为 1
        Flux<String> mergedFlux = Flux.merge(publishers, 2, 1);

        // 订阅并打印合并后的结果
        mergedFlux.subscribe(System.out::println);
    }
}

输出结果:

A
B
C

在这个例子中,我们创建了三个 Mono,将它们封装在一个 Flux 中,使用 merge() 方法合并它们,并且设定了最大并发数为 2,预取量为 1。concurrency 控制并发数,而 prefetch 控制每次从 Publisher 中预读取多少数据。

5. 参数作用

  • concurrency: 决定了最多可以同时处理多少个 Publisher。在高并发场景中,这个参数可以防止一次性发起太多任务,进而过度占用系统资源。
  • prefetch: 控制了从每个 Publisher 中最多可以预取的数据量。预取的数据会保存在内存中,等到被处理时再发射给下游。prefetch 可以用来优化性能:
  • 较大的 prefetch 适合高吞吐量的场景,减少请求数据的频率。
  • 较小的 prefetch 适合低延迟的场景,确保每次只获取少量数据,从而保持响应快速。

预取的重要性

prefetch 影响的是上游 Publisher 如何从下游请求数据。较大的预取量意味着下游可以一次性获取更多的数据,这样可以减少流控(Flow Control)请求的频率。但是,如果 prefetch 设置过大,可能会导致内存占用过高。反之,预取量较小则可以降低内存占用,但可能会导致频繁的数据请求。

6. 使用场景

  • API 请求: 当从多个 API 请求数据时,concurrency 可以控制同时发起的请求数量,prefetch 可以控制每次请求获取的批次大小。
  • 批量处理任务: 当处理需要分批次获取数据的任务时,prefetch 可以控制批次的大小,从而平衡性能和资源消耗。

7. 相关方法

  • Flux.merge(Publisher<T>... sources): 合并多个 Publisher,不带并发和预取设置。
  • Flux.merge(Publisher<? extends Publisher<? extends T>> source, int concurrency): 带并发控制的合并,适合并发处理数据流,但不控制每个 Publisher 的预取量。
  • Flux.concat(): 按顺序串行处理所有 Publisher,不进行并发合并。

8. 示例:动态控制并发和预取

在处理需要动态数据的场景下,concurrency 和 prefetch 的控制可以为系统提供更灵活的解决方案。比如你希望每次只处理两个数据流,但每个数据流只预取3个元素:

java

Flux<String> mergedFlux = Flux.merge(publishers, 2, 3);

这样,在并发处理的情况下,你能保证最多有两个数据流并行发射,每个数据流最多预取3个元素。

9. 总结

  • Flux.merge(Publisher<? extends Publisher<? extends T>> source, int concurrency, int prefetch) 提供了一种灵活的方式来控制多个数据流的合并,同时通过 concurrency 和 prefetch 参数调整并发度和内存消耗。
  • 它适用于需要处理大量异步数据源的场景,通过限制并发数量和预取量来确保系统资源的合理利用,并优化性能。

merge

public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources)
Merge data from  Publisher sequences contained in an  Iterable into an interleaved merged sequence. Unlike  concat, inner sources are subscribed to eagerly. A new  Iterator will be created for each subscriber.

将来自 Iterable 中的多个 Publisher 序列的数据合并为一个交错的合并序列。与 concat 不同,内部源会被立即订阅。每个订阅者将为每个源创建一个新的 Iterator。

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

请注意,merge 是专门为异步源或有限源设计的。在处理一个无限源时,如果该源并未在专用的 Scheduler 上发布,则必须将该源隔离到其自己的 Scheduler 中,因为如果不这样做,merge 将尝试在订阅其他源之前先耗尽该源的所有数据。

Type Parameters:

I - The source type of the data sequence

Parameters:

sources - the  Iterable of sources to merge (will be lazily iterated on subscribe)

Returns:

a merged  Flux

类型参数:

I - 数据序列的源类型。

参数说明:

sources - 要合并的源的 Iterable(将在订阅时进行懒惰迭代)。

返回值:

返回一个合并后的 Flux。

public static <I> Flux<I> merge(Iterable<? extends Publisher<? extends I>> sources) 

是 Reactor 中用于合并多个 Publisher 的静态方法。

与其他 merge 方法类似,这个方法也将多个 Publisher 合并为一个 Flux,不过它接受的是一个 Iterable 类型的 Publisher 集合,允许从一个 Iterable 集合中动态合并多个异步流。

1. 方法介绍

  • merge(Iterable<? extends Publisher<? extends I>> sources): 这个方法将多个异步数据流 Publisher 合并成一个 Flux,并行处理每个 Publisher 中发射的数据项,不保证它们发射的顺序,合并后的 Flux 按照最早可用的数据项顺序发射。

2. 参数

  • sources: 这是一个 Iterable,其中的元素是 Publisher,即每个 Publisher 是一个异步的数据流。它可能是 Flux、Mono 等类型,这些 Publisher 的数据将被并行合并。

3. 返回值

  • Flux<I>: 返回一个 Flux,其中包含来自 sources 中所有 Publisher 的数据流。这些 Publisher 的数据被并行处理并合并到一个 Flux 中发射。

4. 示例代码

假设我们有多个 Mono 类型的 Publisher,可以将它们放入一个 List(实现了 Iterable 接口)中,并使用 merge 方法将它们合并为一个 Flux:

java

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.List;

public class FluxMergeIterableExample {
    public static void main(String[] args) {
        // 创建多个 Mono
        Mono<String> publisher1 = Mono.just("A");
        Mono<String> publisher2 = Mono.just("B");
        Mono<String> publisher3 = Mono.just("C");

        // 将这些 Mono 放入一个 List 中
        List<Mono<String>> publishers = Arrays.asList(publisher1, publisher2, publisher3);

        // 使用 merge 方法将 List 中的 Publisher 合并成一个 Flux
        Flux<String> mergedFlux = Flux.merge(publishers);

        // 订阅并打印合并后的结果
        mergedFlux.subscribe(System.out::println);
    }
}

输出结果:

A
B
C

在这个例子中,三个 Mono 分别发射了 "A"、"B" 和 "C",我们将它们放入一个 List 中,并通过 Flux.merge 将它们合并成一个 Flux,合并后的数据按顺序发射出来。

5. 使用场景

  • 动态数据源: 当你有一组动态数据源(Publisher),需要在运行时合并它们,可以使用 Iterable 来处理这些动态生成的 Publisher。
  • 批量异步操作: 当需要处理一批异步操作,每个操作返回一个 Publisher,可以将这些 Publisher 放入 Iterable 集合中,然后通过 merge 并行处理它们。

6. 特点

  • 并行处理: 该方法并行处理 sources 中的每个 Publisher,它们可以同时发射数据,并合并到一个流中。
  • 无序发射: merge 方法不保证发射的顺序,因此,最终合并的 Flux 中数据项的顺序可能与各 Publisher 发射的顺序不同。这与 concat 方法不同,concat 是严格按顺序串行处理每个 Publisher。

7. 相关方法

  • Flux.concat(Iterable<? extends Publisher<? extends I>> sources): 按顺序串联多个 Publisher,确保合并后的数据流中元素的顺序与各个 Publisher 中的顺序一致。适用于希望严格按顺序处理多个数据流的场景。
  • Flux.merge(Publisher<T>... sources): 这是合并多个 Publisher 的另一个变体,接受可变参数的 Publisher。
  • Flux.merge(Iterable<? extends Publisher<? extends I>> sources, int concurrency): 限制并发数量的 merge 方法,concurrency 参数控制可以并行处理的 Publisher 的最大数量。

8. 预期的行为

在 Reactor 中使用 merge 时,每个 Publisher 都会被并行处理,意味着下游订阅者会立即收到第一个可用的数据项,而不需要等待其他 Publisher 完成。因此,当使用多个 Publisher 时,merge 能够最大化地利用并行性。

9. 总结

  • Flux.merge(Iterable<? extends Publisher<? extends I>> sources) 为多个 Publisher 的数据流提供了并行合并的能力,适用于动态或批量处理异步操作的场景。
  • 它不保证顺序,会并行处理所有 Publisher 的数据流,因此如果对顺序有严格要求,可能需要使用其他方法如 concat。
  • 通过 Iterable 接口的实现,灵活性更强,可以从集合或其他结构中动态获取 Publisher 并合并处理。

merge

@SafeVarargs
public static <I> Flux<I> merge(Publisher<? extends I>... sources)
Merge data from  Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike  concat, sources are subscribed to eagerly.

将来自数组或可变参数中的多个 Publisher 序列的数据合并为一个交错的合并序列。与 concat 不同,源会被立即订阅。

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

请注意,merge 是专门为异步源或有限源设计的。在处理一个无限源时,如果该源并未在专用的 Scheduler 上发布,则必须将该源隔离到其自己的 Scheduler 中,因为如果不这样做,merge 将尝试在订阅其他源之前先耗尽该源的所有数据。

Type Parameters:

I - The source type of the data sequence

Parameters:

sources - the array of  Publisher sources to merge

Returns:

a merged  Flux

类型参数:

I - 数据序列的源类型。

参数说明:

sources - 要合并的 Publisher 源的数组。

返回值:

返回一个合并后的 Flux。

@SafeVarargs public static <I> Flux<I> merge(Publisher<? extends I>... sources) 

是 Reactor 框架中用于将多个 Publisher 合并成一个 Flux 的静态方法。通过可变参数 sources,你可以传入任意多个 Publisher,并将它们的元素合并成一个异步流,进行并行处理。

1. 方法介绍

  • merge(Publisher<? extends I>... sources): 该方法可以将多个 Publisher(例如 Flux、Mono)的流合并为一个 Flux,以并行的方式从每个 Publisher 获取数据,并按它们生成的顺序进行发射。该方法不保证顺序。

2. 参数

  • sources: 这是一个可变参数,表示多个 Publisher(如 Flux 或 Mono),可以是任意数量的 Publisher 实例。

3. 返回值

  • Flux<I>: 返回一个 Flux,这个 Flux 会从所有输入的 Publisher 中合并它们的数据,并将数据发射给下游的订阅者。

4. @SafeVarargs 注解

@SafeVarargs 注解的作用是抑制编译器对可能不安全的可变参数数组的警告,特别是当泛型类型作为可变参数时。由于可变参数可能引入类型安全问题,使用这个注解可以告诉编译器在这种情况下不需要发出警告。

5. 示例代码

合并多个 Mono
java

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FluxMergeExample {
    public static void main(String[] args) {
        // 创建多个 Mono
        Mono<String> mono1 = Mono.just("A");
        Mono<String> mono2 = Mono.just("B");
        Mono<String> mono3 = Mono.just("C");

        // 使用 merge 方法合并多个 Mono
        Flux<String> mergedFlux = Flux.merge(mono1, mono2, mono3);

        // 订阅并打印合并后的结果
        mergedFlux.subscribe(System.out::println);
    }
}

输出结果:

A
B
C

在这个示例中,我们创建了三个 Mono,通过 Flux.merge(mono1, mono2, mono3) 将它们合并成一个 Flux,并订阅了合并后的 Flux。结果中会并行发射 Mono 中的数据,最终输出是 "A", "B", "C"。

6. 使用场景

  • 并行处理多个异步任务: 例如当需要从多个异步数据源(如 API 或数据库)中获取数据时,可以使用 merge 并行处理这些任务,并将结果合并成一个流。
  • 合并多个 Flux 或 Mono: 当你有多个数据流需要合并时,可以使用 merge 将它们的结果并行合并为一个 Flux。这特别适合多个操作结果需要同时处理的情况。

7. 特点

  • 并行处理: 所有传入的 Publisher(例如 Flux 或 Mono)将会被并行处理,它们的数据项会立即合并并发射给下游,不等待其他 Publisher 完成。这使得 merge 是并发处理的一个有效方法。
  • 无序发射: 由于 merge 是并行处理的,合并后的 Flux 发射的数据顺序不一定与各个 Publisher 的顺序一致。换句话说,它不会严格按顺序发射每个 Publisher 的数据。

8. 与 concat 的区别

merge 方法与 concat 方法最大的区别是顺序性:

  • merge: 并行处理所有 Publisher,并且无序发射数据。
  • concat: 串行处理多个 Publisher,即严格按顺序等待一个 Publisher 完成后再处理下一个,因此是有序的。

例如,使用 Flux.concat(mono1, mono2, mono3) 将保证 "A"、"B"、"C" 的顺序,而 Flux.merge 则不保证顺序。

9. 相关方法

  • Flux.merge(Iterable<? extends Publisher<? extends I>> sources): 接受一个 Iterable 类型的 Publisher 集合,将它们合并为一个 Flux,与可变参数形式的 merge 方法类似。
  • Flux.merge(Publisher<? extends Publisher<? extends T>> source, int concurrency): 接受一个发射 Publisher 的 Publisher,并设置并发度来限制同时处理的 Publisher 的数量。

10. 性能优化

  • 控制并发: 如果你需要对并发数量进行控制,可以使用 merge 的其他重载版本,例如 merge(Publisher<? extends Publisher<? extends T>> source, int concurrency),其中 concurrency 参数控制同时处理的 Publisher 数量。这样你可以防止系统资源被过度占用。
  • 流量控制: 对于大规模数据流的合并,可以结合使用 prefetch 参数来控制每个 Publisher 的数据预取量,从而优化内存和性能。

11. 总结

  • Flux.merge(Publisher<? extends I>... sources) 是一个非常灵活且并发友好的方法,允许你将多个 Publisher 合并为一个 Flux。
  • 它不保证顺序,会并行处理每个 Publisher,因此适合高效的异步并行任务。
  • 通过 @SafeVarargs 注解消除了泛型可变参数的警告。
  • 如果你需要顺序处理,可以考虑使用 Flux.concat,否则 merge 提供了高性能的并行数据合并功能。

merge

@SafeVarargs
public static <I> Flux<I> merge(int prefetch,
                                             Publisher<? extends I>... sources)
Merge data from  Publisher sequences contained in an array / vararg into an interleaved merged sequence. Unlike  concat, sources are subscribed to eagerly.

将来自数组或可变参数中的多个 Publisher 序列的数据合并为一个交错的合并序列。与 concat 不同,源会被立即订阅。

Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

请注意,merge 是专门为异步源或有限源设计的。在处理一个无限源时,如果该源并未在专用的 Scheduler 上发布,则必须将该源隔离到其自己的 Scheduler 中,因为如果不这样做,merge 将尝试在订阅其他源之前先耗尽该源的所有数据。

Type Parameters:

I - The source type of the data sequence

Parameters:

sources - the array of  Publisher sources to merge
prefetch - the inner source request size

Returns:

a fresh Reactive  Flux publisher ready to be subscribed

类型参数:

I - 数据序列的源类型。

参数说明:

sources - 要合并的 Publisher 源的数组。
prefetch - 内部源的请求大小。

返回值:

返回一个新的 Reactive Flux 发布者,准备好进行订阅。

@SafeVarargs public static <I> Flux<I> merge(int prefetch, Publisher<? extends I>... sources) 

是 Reactor 中的一个方法,用于合并多个 Publisher,并允许你指定 prefetch 参数来控制每个 Publisher 中的预取数据量。

1. 方法介绍

  • merge(int prefetch, Publisher<? extends I>... sources): 该方法将多个 Publisher(如 Flux、Mono)合并为一个并行发射数据的 Flux,并且允许通过 prefetch 参数控制每个 Publisher 预取的数据量。预取指的是在请求下游订阅之前,提前从上游请求的数据数量,这样可以提高吞吐量,减少上下游交互的延迟。

2. 参数

  • prefetch: 这是一个整数,指定从每个 Publisher 预取的数据量。它定义了从源 Publisher 中提前获取并缓存的数据的数量。在并行合并的情况下,合理设置 prefetch 值有助于提升性能。
  • sources: 这是一个可变参数,表示多个 Publisher(如 Flux 或 Mono)。这些 Publisher 的数据会被并行合并为一个 Flux,并按最早可用的数据发射给下游。

3. 返回值

  • Flux<I>: 返回一个 Flux,它将从所有输入的 Publisher 中并行地获取数据,并将合并后的数据发射给下游的订阅者。

4. @SafeVarargs 注解

@SafeVarargs 注解是为了避免在编译时对使用可变参数泛型时发出的 "堆污染" 警告。由于该方法使用了可变参数的泛型,@SafeVarargs 注解告诉编译器这种使用是安全的,不需要发出警告。

5. 示例代码

假设我们有多个 Mono,并希望使用 merge 方法进行合并,同时通过 prefetch 控制预取的数据量:

java

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FluxMergeWithPrefetchExample {
    public static void main(String[] args) {
        // 创建多个 Mono
        Mono<String> mono1 = Mono.just("A");
        Mono<String> mono2 = Mono.just("B");
        Mono<String> mono3 = Mono.just("C");

        // 使用 merge 方法合并多个 Mono,指定 prefetch 为 1
        Flux<String> mergedFlux = Flux.merge(1, mono1, mono2, mono3);

        // 订阅并打印合并后的结果
        mergedFlux.subscribe(System.out::println);
    }
}

输出结果:

A
B
C

在这个例子中,我们创建了三个 Mono,通过 Flux.merge(1, mono1, mono2, mono3) 将它们合并。这里的 prefetch 值为 1,表示每个 Publisher 最多一次只预取一个数据项。

6. 使用场景

  • 高效处理多个异步流: 当你需要合并多个异步流,并希望通过调整预取量来优化吞吐量时,可以使用这个方法。例如,当下游消费速度较慢时,可以设置较小的 prefetch 值,以减少内存占用;而当下游消费速度较快时,可以增加 prefetch 值以提高性能。
  • 网络请求: 在处理多个异步网络请求时,合理设置 prefetch 可以优化带宽使用和请求响应时间。

7. 特点

  • 并行处理: 该方法与其他 merge 方法一样,是并行处理的。它会从多个 Publisher 中并行获取数据,并立即发射最早可用的数据。
  • 控制预取: 通过 prefetch 参数,可以控制从每个 Publisher 中提前请求的数据量。合理的 prefetch 设置能够平衡内存使用与处理速度之间的关系。

8. 与其他 merge 方法的区别

  • merge(Publisher<? extends I>... sources): 该方法直接合并多个 Publisher,但没有提供控制预取量的能力。
  • merge(int prefetch, Publisher<? extends I>... sources): 在前者的基础上,增加了对 prefetch 的控制,允许更灵活的资源管理。

9. 性能调优

prefetch 参数是性能调优的关键点之一,特别是在处理大量数据时。

  • 小的 prefetch 值: 适合需要减少内存占用的情况,特别是在每个 Publisher 发射大量数据时,较小的 prefetch 值能控制数据的积累速度,避免过多的数据积压在内存中。
  • 大的 prefetch 值: 适合下游消费速度较快的情况,大的 prefetch 值能减少上游和下游之间的交互次数,进而提高吞吐量。

10. 相关方法

  • Flux.merge(Publisher<? e
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值