转载

How to manipulate the data inside Mono and Flux ?

Reactor is a Java library for creating reactive non-blocking applications on the JVM based on the Reactive Streams Specification.

This article is the second of a series which goal is to guide you through the process of creating, manipulating and managing the execution of the Reactive Streams that offer Reactor through the Mono and Flux classes.

In this second article, I’ll show you how values in Mono and Flux can be modified and transformed. We’ll do this through examples.

Applying mathematical operations on your Flux or Mono

Let’s say you want to compute the square of every integer value in the interval 1 to 100. To do this with a Flux, you first have to create a Flux containing all the integer values from 1 to 100, doing Flux.range(1, 100) .

Then, how do you compute the square of each value ?

Mono and Flux provide a map method which interface is Flux<B> map(Function<A, B> mapper) . This means that passing a function that transforms an element of type A to a type B to the map method, your Flux<A> will apply this function to each element in the Flux, which results in transforming your Flux<A> into a Flux<B>. Note that it is possible that B is the same type as A. Using this method, we can apply the square function on our Flux. Let me demonstrate:

Flux<Integer> squared = Flux.range(1, 100).map(x -> x * x);
squared.subscribe(x -> System.out.print(x + ", ");
// -> will display 1, 4, 9, 16, ..., 9801, 10000


This works the same with Mono.

Applying several transformations on Flux and Mono

Now, let’s take a look at an example that chains multiple map calls.

public Disposable findFirstUsers(int count, Response response) {
return Flux.range(1, count).
map(id -> UserApi.findUser(id)).
map(user -> UserApi.getUserDetails(user)).
collectList(). // will convert the Flux<UserDetails> into a Mono<List<UserDetails>>
map(listUserDetails -> JsonWriter.toJson(listUserDetails)).
subscribe(json -> response.status(HttpStatus.OK).send(json));
}


The flatMap method

The flatMap method is similar to the map method with the key difference that the supplier you provide to it should return a Mono<T> or Flux<T> . Using the map method would result in a Mono<Mono<T>> whereas using flatMap results in a Mono<T> .

For example, it is useful when you have to make a network call to retrieve a data, with a java api that returns a Mono, and then another network call that needs the result of the first one.

// Signature of the HttpClient.get method
Mono<JsonObject> get(String url);

// The two urls to call
String firstUserUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details/"; // needs the id at the end

// Example with map
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl).
map(user -> HttpClient.get(userDetailsUrl + user.getId()));
// This results with a Mono<Mono<...>> because HttpClient.get(...)
// returns a Mono

// Same example with flatMap
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl).
flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
// Now the result has the type we expected


Also, it allows to handle errors precisely. Let me demonstrate with another example below.

public UserApi {

private HttpClient httpClient;

Mono<User> findUser(String username) {
String queryUrl = "http://my-api-address/users/" + username;

return Mono.fromCallable(() -> httpClient.get(queryUrl)).
flatMap(response -> {
if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found"));
else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException());
else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api"));
return Mono.just(response.data);
});
}

}


The zip method

Let’s understand this method with an example. Imagine you want to write a method that retrieve the information of a user plus all the comments she wrote on your website.

Using a flatMap, you would write:

public Mono<UserWithComments> getUserWithComments(String userId) {
Mono<UserInfo> userInfo = Mono.fromCallable(() -> BlockingUserApi.getUserInfo(userId));
Mono<Comments> userComments = Mono.fromCallable(() -> BlockingCommentsApi.getCommentsForUser(userId));

Mono<UserWithComments> userWithComments = userInfo.
flatMap(info -> userComments.
map(comments -> new UserWithComments(info, comments))
);

return userWithComments;
}


The problem with the above code is that it will make the call to get the user comments after your program received the userInfo. It’s not good at all for your users experience.

The zip method allows to easily combine the results of several Mono with the great benefit that the execution of your zip method will last as much as the longest Mono , not the sum of all the executions.

So, doing the same thing as above can now be done with the below code:

public Mono<UserWithComments> getUserWithComments(String userId) {
Mono<UserInfo> userInfo = Mono.fromCallable(() -> BlockingUserApi.getUserInfo(userId));
Mono<Comments> userComments = Mono.fromCallable(() -> BlockingCommentsApi.getCommentsForUser(userId));

Mono<UserWithComments> userWithComments = userInfo.zipWith(userComments).
map(tuple -> {
UserInfo info = tuple.getT1();
Comments comments = tuple.getT2();
return new UserWithComments(info, comments);
});

return userWithComments;
}


Filtering the elements of a Flux or Mono

You can also apply a filter on the elements that holds a Flux or a Mono with the filter method.

Flux<User> getAllAdmins () {
List<User> allUsers = Users.getAllUsers();
return Flux.fromIterable(allUsers).
filter(user -> user.getRoles().equals("ADMIN"));
}


Selecting a subset of a Flux

While working with Flux, you can select a subset of the Flux. The class provides 7 methods to select the subset that best fits your needs. These methods are:

public Flux<T> take(long n)
public Flux<T> take(Duration timespan)
public Flux<T> take(Duration timespan, Scheduler timer)
public Flux<T> takeLast(int n)
public Flux<T> takeUntil(Predicate<? super T> predicate)
public Flux<T> takeUntilOther(Publisher<?> other)
public Flux<T> takeWhile(Predicate<? super T> continuePredicate)

By default, the take methods take the first elements of the Flux.

Conclusion

This second article guided you trough applying transformations on the elements of a Flux or a Mono.

It the next article, I’ll show you how Mono and Flux behave.

Article #3: https://www.v8en.com/article/226

正文到此结束
本文目录