Appearance
CompletableFuture
When obtaining asynchronous execution results using Future
, you either call the blocking method get()
or poll to check if isDone()
is true
. Neither of these methods is ideal because the main thread is forced to wait.
Starting with Java 8, CompletableFuture
was introduced to improve upon Future
. It allows you to pass in callback objects that are automatically invoked when the asynchronous task completes or an exception occurs.
Let's use the example of fetching stock prices to see how to use CompletableFuture
:
java
// CompletableFuture
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) throws Exception {
// Create an asynchronous task:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
// If execution is successful:
cf.thenAccept((result) -> {
System.out.println("price: " + result);
});
// If an exception occurs:
cf.exceptionally((e) -> {
e.printStackTrace();
return null;
});
// Do not end the main thread immediately, otherwise the default thread pool used by CompletableFuture will shut down immediately:
Thread.sleep(200);
}
static Double fetchPrice() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
return 5 + Math.random() * 20;
}
}
Creating a CompletableFuture
is done via CompletableFuture.supplyAsync()
, which requires an object that implements the Supplier
interface:
java
public interface Supplier<T> {
T get();
}
Here, we use lambda syntax to simplify the code by directly passing Main::fetchPrice
since the signature of the static method Main.fetchPrice()
matches the definition of the Supplier
interface (except for the method name).
Next, the CompletableFuture
is submitted to the default thread pool for execution. We need to define the instances to be called back when the CompletableFuture
completes or when an exception occurs. Upon completion, CompletableFuture
will call the Consumer
object:
java
public interface Consumer<T> {
void accept(T t);
}
In the case of an exception, CompletableFuture
will call the Function
object:
java
public interface Function<T, R> {
R apply(T t);
}
Here, we simplify the code using lambda syntax for both.
The advantages of CompletableFuture
are evident:
- When an asynchronous task ends, it automatically calls a method of a callback object.
- When an asynchronous task encounters an error, it automatically calls a method of a callback object.
- After setting up the callbacks, the main thread no longer needs to manage the execution of the asynchronous task.
While implementing just the asynchronous callback mechanism doesn't yet highlight the advantages of CompletableFuture
over Future
, the more powerful feature of CompletableFuture
is that multiple CompletableFuture
instances can be executed in sequence. For example, defining two CompletableFuture
instances where the first queries a stock code based on the stock name and the second fetches the stock price based on the stock code can be executed sequentially as follows:
java
// CompletableFuture
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) throws Exception {
// First task:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("PetroChina");
});
// After cfQuery succeeds, continue with the next task:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// After cfFetch succeeds, print the result:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// Do not end the main thread immediately, otherwise the default thread pool used by CompletableFuture will shut down immediately:
Thread.sleep(2000);
}
static String queryCode(String name) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
In addition to sequential execution, multiple CompletableFuture
instances can also be executed in parallel. For example, consider the following scenario:
Query the stock code from both Sina and NetEase simultaneously. As long as either one returns a result, proceed to the next step of querying the price. The price query is also done simultaneously from Sina and NetEase, and as long as either one returns a result, the operation is completed:
java
// CompletableFuture
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) throws Exception {
// Two CompletableFutures execute asynchronous queries:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("PetroChina", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("PetroChina", "https://money.163.com/code/");
});
// Combine them into a new CompletableFuture using anyOf:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// Two CompletableFutures execute asynchronous price queries:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = CompletableFuture.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});
// Combine them into a new CompletableFuture using anyOf:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// Final result:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// Do not end the main thread immediately, otherwise the default thread pool used by CompletableFuture will shut down immediately:
Thread.sleep(200);
}
static String queryCode(String name, String url) {
System.out.println("query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code, String url) {
System.out.println("query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 100));
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
The asynchronous query logic implemented above follows these rules:
┌─────────────┐ ┌─────────────┐
│ Query Code │ │ Query Code │
│ from sina │ │ from 163 │
└─────────────┘ └─────────────┘
│ │
└───────┬───────┘
▼
┌─────────────┐
│ anyOf │
└─────────────┘
│
┌───────┴────────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Query Price │ │ Query Price │
│ from sina │ │ from 163 │
└─────────────┘ └─────────────┘
│ │
└────────┬───────┘
▼
┌─────────────┐
│ anyOf │
└─────────────┘
│
▼
┌─────────────┐
│Display Price│
└─────────────┘
In addition to anyOf()
, which allows "any one of the CompletableFuture
instances to succeed," allOf()
can be used to ensure that "all CompletableFuture
instances must succeed." These combination operations enable the implementation of very complex asynchronous workflow controls.
Finally, note the naming conventions of CompletableFuture
:
xxx()
: Indicates that the method will continue to execute in the existing thread.xxxAsync()
: Indicates that the method will execute asynchronously in the thread pool.
Exercise
Use CompletableFuture
.
Summary
CompletableFuture
allows you to define asynchronous processing workflows:thenAccept()
handles normal results.exceptionally()
handles exceptional results.thenApplyAsync()
is used to serialize anotherCompletableFuture
.anyOf()
andallOf()
are used to parallelize multipleCompletableFuture
instances.