Introduce FutureUtils to reduce duplicated code around CompletableFuture (#868)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
This commit is contained in:
Adrian Sutton
2019-02-16 06:42:54 +10:00
committed by GitHub
parent 48986b70c2
commit 463fc3994e
8 changed files with 291 additions and 88 deletions

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.util;
import static java.util.concurrent.CompletableFuture.completedFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
public class FutureUtils {
/**
* Creates a {@link CompletableFuture} that is exceptionally completed by <code>error</code>.
*
* @param error the error to exceptionally complete the future with
* @param <T> the type of CompletableFuture
* @return a CompletableFuture exceptionally completed by <code>error</code>.
*/
public static <T> CompletableFuture<T> completedExceptionally(final Throwable error) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(error);
return future;
}
/**
* Returns a new CompletionStage that, when the provided stage completes exceptionally, is
* executed with the provided stage's exception as the argument to the supplied function.
* Otherwise the returned stage completes successfully with the same value as the provided stage.
*
* <p>This is the exceptional equivalent to {@link CompletionStage#thenCompose(Function)}
*
* @param future the future to handle results or exceptions from
* @param errorHandler the function returning a new CompletionStage
* @param <T> the type of the CompletionStage's result
* @return the CompletionStage
*/
public static <T> CompletableFuture<T> exceptionallyCompose(
final CompletableFuture<T> future,
final Function<Throwable, CompletionStage<T>> errorHandler) {
final CompletableFuture<T> result = new CompletableFuture<>();
future.whenComplete(
(value, error) -> {
try {
final CompletionStage<T> nextStep =
error != null ? errorHandler.apply(error) : completedFuture(value);
propagateResult(nextStep, result);
} catch (final Throwable t) {
result.completeExceptionally(t);
}
});
return result;
}
/**
* Propagates the result of one {@link CompletionStage} to a different {@link CompletableFuture}.
*
* <p>When <code>from</code> completes successfully, <code>to</code> will be completed
* successfully with the same value. When <code>from</code> completes exceptionally, <code>to
* </code> will be completed exceptionally with the same exception.
*
* @param from the CompletionStage to take results and exceptions from
* @param to the CompletableFuture to propagate results and exceptions to
* @param <T> the type of the success value
*/
public static <T> void propagateResult(
final CompletionStage<T> from, final CompletableFuture<T> to) {
from.whenComplete(
(value, error) -> {
if (error != null) {
to.completeExceptionally(error);
} else {
to.complete(value);
}
});
}
}

View File

@@ -0,0 +1,169 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.util;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose;
import static tech.pegasys.pantheon.util.FutureUtils.propagateResult;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.junit.Test;
public class FutureUtilsTest {
private static final RuntimeException ERROR = new RuntimeException("Oh no!");
@Test
public void shouldCreateExceptionallyCompletedFuture() {
final CompletableFuture<Void> future = FutureUtils.completedExceptionally(ERROR);
assertCompletedExceptionally(future, ERROR);
}
@Test
public void shouldPropagateSuccessfulResult() {
final CompletableFuture<String> input = new CompletableFuture<>();
final CompletableFuture<String> output = new CompletableFuture<>();
propagateResult(input, output);
assertThat(output).isNotDone();
input.complete("Yay");
assertThat(output).isCompletedWithValue("Yay");
}
@Test
public void shouldPropagateSuccessfulNullResult() {
final CompletableFuture<String> input = new CompletableFuture<>();
final CompletableFuture<String> output = new CompletableFuture<>();
propagateResult(input, output);
assertThat(output).isNotDone();
input.complete(null);
assertThat(output).isCompletedWithValue(null);
}
@Test
public void shouldPropagateExceptionalResult() {
final CompletableFuture<String> input = new CompletableFuture<>();
final CompletableFuture<String> output = new CompletableFuture<>();
propagateResult(input, output);
assertThat(output).isNotDone();
input.completeExceptionally(ERROR);
assertCompletedExceptionally(output, ERROR);
}
@Test
public void shouldComposeExceptionallyWhenErrorOccurs() {
final Function<Throwable, CompletionStage<String>> errorHandler = mockFunction();
final CompletableFuture<String> input = new CompletableFuture<>();
final CompletableFuture<String> afterException = new CompletableFuture<>();
when(errorHandler.apply(ERROR)).thenReturn(afterException);
final CompletableFuture<String> result = exceptionallyCompose(input, errorHandler);
verifyZeroInteractions(errorHandler);
assertThat(result).isNotDone();
// Completing input should trigger our error handler but not complete the result yet.
input.completeExceptionally(ERROR);
verify(errorHandler).apply(ERROR);
assertThat(result).isNotDone();
afterException.complete("Done");
assertThat(result).isCompletedWithValue("Done");
}
@Test
public void shouldComposeExceptionallyWhenErrorOccursAndComposedFutureFails() {
final RuntimeException secondError = new RuntimeException("Again?");
final Function<Throwable, CompletionStage<String>> errorHandler = mockFunction();
final CompletableFuture<String> input = new CompletableFuture<>();
final CompletableFuture<String> afterException = new CompletableFuture<>();
when(errorHandler.apply(ERROR)).thenReturn(afterException);
final CompletableFuture<String> result = exceptionallyCompose(input, errorHandler);
verifyZeroInteractions(errorHandler);
assertThat(result).isNotDone();
// Completing input should trigger our error handler but not complete the result yet.
input.completeExceptionally(ERROR);
verify(errorHandler).apply(ERROR);
assertThat(result).isNotDone();
afterException.completeExceptionally(secondError);
assertCompletedExceptionally(result, secondError);
}
@Test
public void shouldComposeExceptionallyWhenErrorOccursAndErrorHandlerThrowsException() {
final Function<Throwable, CompletionStage<String>> errorHandler = mockFunction();
final CompletableFuture<String> input = new CompletableFuture<>();
final IllegalStateException thrownException = new IllegalStateException("Oops");
when(errorHandler.apply(ERROR)).thenThrow(thrownException);
final CompletableFuture<String> result = exceptionallyCompose(input, errorHandler);
verifyZeroInteractions(errorHandler);
assertThat(result).isNotDone();
// Completing input should trigger our error handler but not complete the result yet.
input.completeExceptionally(ERROR);
verify(errorHandler).apply(ERROR);
assertCompletedExceptionally(result, thrownException);
}
@Test
public void shouldNotCallErrorHandlerWhenFutureCompletesSuccessfully() {
final Function<Throwable, CompletionStage<String>> errorHandler = mockFunction();
final CompletableFuture<String> input = new CompletableFuture<>();
final CompletableFuture<String> afterException = new CompletableFuture<>();
when(errorHandler.apply(ERROR)).thenReturn(afterException);
final CompletableFuture<String> result = exceptionallyCompose(input, errorHandler);
verifyZeroInteractions(errorHandler);
assertThat(result).isNotDone();
input.complete("Done");
verifyZeroInteractions(errorHandler);
assertThat(result).isCompletedWithValue("Done");
}
private void assertCompletedExceptionally(
final CompletableFuture<?> future, final RuntimeException expectedError) {
assertThat(future).isCompletedExceptionally();
assertThatThrownBy(future::get)
.isInstanceOf(ExecutionException.class)
.extracting(Throwable::getCause)
.isSameAs(expectedError);
}
@SuppressWarnings("unchecked")
private <I, O> Function<I, O> mockFunction() {
return mock(Function.class);
}
}