转载

How to create Mono and Flux ?

Reactor is a Java library for creating reactive non-blocking applications on the JVM based on the Reactive Streams Specification.

Working with this library can be difficult at first. This series of articles will guide you through the process of creating, manipulating and managing the execution of the Reactive Streams that offer Reactor through the Mono and Flux classes.

How a Flux works (credit: https://projectreactor.io/docs/core/release/reference/)

Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.

This difference in the semantics of these two streams is very useful, as for example making a request to a http server expects to receive 0 or 1 response, it would be inappropriate to use a Flux in this case. On the opposite, computing the result of a mathematical function on an interval expects one result per number in the interval. In this other case, using a Flux is appropriate.

Mono and Flux are lazy

Being lazy is one of the properties of a reactive stream. It means that whatever the number of function call you make on the stream, they won’t be executed until you consume it.

For Mono and Flux, the method to use to consume them is .subscribe(…).

Nothing happens until you subscribe to a Flux (or Mono)

Most common ways to create a Flux

There are several ways to create a Flux. The below code snippet presents some of the most common:

// Creates a Flux containing the values 1, 2, 3.
Flux<Integer> integerFlux = Flux.just(1, 2, 3);

// Creates a Flux containing "Hello", "foo" and "bar".
Flux<String> stringFlux = Flux.just("Hello", "foo", "bar");

// Creates a Flux from an already existing Iterable, for example a List.
List<String> stringList = Arrays.asList("Hello", "foo", "bar");
Flux<String> fluxFromList = Flux.fromIterable(stringList);

// It works the same with Java Streams (which are not reactive).
Stream<String> stringStream = stringList.stream();
Flux<String> fluxFromStream = Flux.fromStream(stringStream);

// Creates a flux on a range.
Flux<Integer> rangeFlux = Flux.range(1, 5); // Flux(1, 2, 3, 4, 5)

// Creates a flux that generates a new value every 100 ms.
// The value is incremental, starting at 1.
Flux<Integer> intervalFlux = Flux.interval(Duration.ofMillis(100));

// You can also create a Flux from another one, or from a Mono.
Flux<String> fluxCopy = Flux.from(fluxFromList);


Most common ways to create a Mono

Methods are different for a Mono, except the just(T... data) method. The below code snippet presents some of the most common:

// Creating a Mono containing "Hello World !".
Mono<String> helloWorld = Mono.just("Hello World !");

// Creating an empty Mono
Mono<T> empty = Mono.empty();

// Creating a mono from a Callable
Mono<String> helloWorldCallable = Mono.fromCallable(() -> "Hello World !");
// Same with Java 8 method reference
Mono<User> user = Mono.fromCallable(UserService::fetchAnyUser);

// Creating a mono from a Future
CompletableFuture<String> helloWorldFuture = MyApi.getHelloWorldAsync();
Mono<String> monoFromFuture = Mono.fromFuture(helloWorldFuture);

// Creating a mono from a supplier
Random rnd = new Random();
Mono<Double> monoFromSupplier = mono.fromSupplier(rnd::nextDouble);

// You can also create a mono from another one.
Mono<Double> monoCopy = Mono.from(monoFromSupplier);
// Or from a Flux.
Mono<Integer> monoFromFlux = Mono.from(Flux.range(1, 10));
// The above Mono contains the first value of the Flux.


As you may have noticed, being a 0–1 element stream, Mono is a perfect fit to create reactive streams from Futures, Suppliers or even Runnable as Java methods return at most 1 element.

Common methods to create Flux or Mono

Mono and Flux share 3 useful methods to create them: createdefer and error.

Error

Mono.error(Trowable t) and Flux.error(Throwable t) are very useful methods to handle errors while working with these reactive streams. We will discuss this subject in the second article.

Defer

Mono.defer(Supplier<? extends Mono<? extends T>>) is similar to Mono.fromCallable(...) but its supplier should return a Mono<T> whereas fromCallable expects a supplier returning a value of type T. Also, if you call a method that can throw an Exception, with defer you will have to catch it yourself, whereas with FromCallable, the Exception will automatically be wrapped into a Mono.error(...) . Let me show you with a little piece of code:

// Both example give the same result
Integer getAnyInteger() throws Exception {
throw new RuntimeException("An error as occured for no reason.");
}

// Now, comparison between the two methods
void compareMonoCreationMethods() {
Mono<Integer> fromCallable = Mono.fromCallable(this::getAnyInteger);
// result -> Mono.error(RuntimeException("An error as occured for no reason."))

Mono<Integer> defer = Mono.defer(() -> {
try { Integer res = this.getAnyInteger(); return Mono.just(res);}
catch(Exception e) { return Mono.error(e); }
});
// result -> Mono.error(RuntimeException("An error as occured for no reason."))
}


As you have seen, both way of creating the Mono gives the same result. Even though the defer method is more verbose, it makes your code easier to understand and allow fore more flexibility as you can remap the Exception into another kind of Mono, or create a new Exception from it.

Create

The create(Consumer<MonoSink<T>> callback) method is a lower level one than the other methods we saw, because it deals with the signals inside the Mono and Flux. Let’s look at an example for both Mono and Flux.

For the Mono, let’s do the same as above:

Integer getAnyInteger() throws Exception {
throw new RuntimeException("An error as occured for no reason.");
}

void monoCreateExample() {
Mono<Integer> = Mono.create(callback -> {
try { callback.success(this.getAnyInteger()); }
catch (Exception e) { callback.error(e); }
}
}


Here is an example for the Flux.create(…) method.

Flux<Double> flux = Flux.create(emitter -> {
Random rnd = new Random();
for(int i = 0; i <= 10; i++) emitter.next(rnd.nextDouble());
int random = rnd.nextInt(2);
if (random < 1) emitter.complete();
else emitter.error(new RuntimeException(
"Bad luck, you had one chance out of 2 to complete the Flux"
));
});


How to consume a Flux or a Mono ?

Now that we’ve seen how to create a Flux or a Mono, let’s take a look at how to use the values they hold. This is the operation of consuming the stream.

For this example, I think that a piece of code will be sufficient :).

Mono.just("Hello World !").subscribe(
successValue -> System.out.println(successValue),
error -> System.error.println(error.getMessage()),
() -> System.out.println("Mono consumed.")
);
// This will display in the console :
// Hello World !
// Mono consumed.

// In case of error, it would have displayed :
// **the error message**
// Mono consumed.

Flux.range(1, 5).subscribe(
successValue -> System.out.println(successValue),
error -> System.error.println(error.getMessage()),
() -> System.out.println("Flux consumed.")
);
// This will display in the console :
// 1
// 2
// 3
// 4
// 5
// Flux consumed.

// Now imagine that when manipulating the values in the Flux, an exception
// is thrown for the value 4.
// The result in the console would be :
// An error as occured
// 1
// 2
// 3
//
// As you can notice, the "Flux consumed." doesn't display because the Flux
// hasn't been fully consumed. This is because the stream stop handling future values
// if an error occurs. Also, the error is handled before the successful values.


In a Flux, data and operations on these data are treated sequentially. It means that for a Flux.range(1, 5) , the value 3 will never be displayed before 2 as the treatments on it are done before the ones on 3. We’ll discuss that in details in the third article.

Bad practices

One word about the methods you should not call on Flux and Mono.

Mono has a block() method that returns the value it holds. When calling it, the method block the current Thread. When doing it, your program is not reactive anymore. You should always find a way not to use it. Sometimes, it implies refactoring some of your methods.

// This example makes your program synchronous
String helloWorld = Mono.just("Hello World!").block();
System.out.println(helloWorld);
// Even though this case seams stupid, it can be tempting
// to do this when making network calls.

// Now let's take a look at how we can do the same thing in
// a non-blocking way.
Mono.just("Hello World!").subscribe(System.out::println);
// That's it.
// In every case, there is a way to use subscribe to pass
// the resulting value of a Mono (or Flux) to the desired
// method, instead of blocking the current Thread.


Flux has methods that also starts with block. They are blockFirst() , blockFirst(Duration timeout) , blockLast() and blockLast(Duration timeout) . The rationale is the same.

All the methods on the Flux that returns non-asynchronous Java structures cause the same problem. These methods are toStream(...) and toIterable(...) .

Conclusion

In this first article, you have learned how to create a Flux or a Mono, and how to consume it in its simplest form.

In the next article, I’ll show you how to manipulate the data they hold. For example, how to add one to every value in a Flux or how to replace the value with its result to a function call.

Article #2: How to manipulate the data inside Mono and Flux ?

正文到此结束
本文目录