Skip to content

Futures and Asynchronous Programming

Asynchronous programming allows operations to execute without blocking the main thread, improving responsiveness and throughput. Java provides several mechanisms for asynchronous programming, with CompletableFuture (introduced in Java 8) being the most powerful.

Asynchronous programming allows operations to run independently of the main application flow, enabling:

  • Non-blocking operations: The calling thread can continue execution without waiting
  • Improved responsiveness: UI threads remain responsive during long-running operations
  • Better resource utilization: Threads aren’t blocked waiting for I/O or other operations
  • Scalability: More concurrent operations can be performed with fewer threads

Similar to Runnable, but can return a result and throw checked exceptions.

Callable<String> task = () -> {
Thread.sleep(1000);
return "Result";
};

Represents the result of an asynchronous computation.

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(task);
// Check if completed
boolean isDone = future.isDone();
// Cancel task
boolean canceled = future.cancel(true);
// Get result (blocks until available)
try {
String result = future.get();
// or with timeout
String resultWithTimeout = future.get(2, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// Handle exceptions
}
executor.shutdown();

Limitations of Future:

  • Cannot be manually completed
  • Cannot be chained or composed
  • No built-in exception handling
  • Blocking get() method

Introduced in Java 8, CompletableFuture implements both Future and CompletionStage, providing a rich set of methods for asynchronous programming.

// Empty future
CompletableFuture<String> future1 = new CompletableFuture<>();
future1.complete("Result"); // Manual completion
// Completed future
CompletableFuture<String> future2 = CompletableFuture.completedFuture("Result");
// Run async (no result)
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
// Some operation
});
// Supply async (with result)
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
return "Result";
});
// With custom executor
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> {
return "Result";
}, executor);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
// Transform result
CompletableFuture<String> transformed1 = future.thenApply(s -> s + " World");
// Transform to different type
CompletableFuture<Integer> transformed2 = future.thenApply(String::length);
// Transform with async execution
CompletableFuture<Integer> transformed3 = future.thenApplyAsync(String::length);
// Transform without result
CompletableFuture<Void> transformed4 = future.thenAccept(System.out::println);
// Just run after completion
CompletableFuture<Void> transformed5 = future.thenRun(() -> System.out.println("Done"));
// Combine two futures (both must complete)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined1 = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
// Chain futures (sequential)
CompletableFuture<String> future3 = future1.thenCompose(s ->
CompletableFuture.supplyAsync(() -> s + " World")
);
// First to complete
CompletableFuture<Object> future4 = CompletableFuture.anyOf(future1, future2);
// All complete
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
// Get all results after allOf
CompletableFuture<List<String>> allResults = allFuture.thenApply(v -> {
return Stream.of(future1, future2)
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Error");
}
return "Success";
});
// Handle exceptions
CompletableFuture<String> handled1 = future.exceptionally(ex -> {
System.err.println("Error: " + ex.getMessage());
return "Default Value";
});
// Handle both success and failure
CompletableFuture<String> handled2 = future.handle((result, ex) -> {
if (ex != null) {
return "Error occurred: " + ex.getMessage();
} else {
return "Success: " + result;
}
});
// When complete (callback)
future.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Error: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
});

The CompletionStage interface (implemented by CompletableFuture) provides a framework for multi-stage asynchronous computation.

CompletableFuture.supplyAsync(() -> "Step 1")
.thenApply(s -> s + " -> Step 2")
.thenApply(s -> s + " -> Step 3")
.thenAccept(System.out::println);

Java 9 introduced the Flow API for reactive programming, providing interfaces for Reactive Streams.

// Publisher (produces items)
Flow.Publisher<String> publisher = subscriber -> {
subscriber.onSubscribe(new Flow.Subscription() {
// Implementation
});
};
// Subscriber (consumes items)
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(String item) {
System.out.println(item);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Complete");
}
};
// Connect them
publisher.subscribe(subscriber);

Note: For serious reactive programming, consider using libraries like RxJava, Project Reactor, or Akka.


// "Async" method
CompletableFuture<String> fetchDataAsync() {
return CompletableFuture.supplyAsync(() -> {
// Simulate network call
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Data";
});
}
// "Await" usage
void processData() {
fetchDataAsync()
.thenAccept(data -> System.out.println("Processed: " + data));
// Or if you need to wait
String result = fetchDataAsync().join(); // Similar to await
}
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
return "Result";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
});
// Add timeout
CompletableFuture<String> futureWithTimeout = future.completeOnTimeout("Timeout", 1, TimeUnit.SECONDS);
// Or handle timeout separately
future.orTimeout(1, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
return "Timeout occurred";
}
return "Other error: " + ex.getMessage();
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// Long-running task
return "Result";
});
// Cancel the future
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();

List<String> userIds = Arrays.asList("user1", "user2", "user3");
// Sequential
List<UserData> users = new ArrayList<>();
for (String userId : userIds) {
users.add(fetchUserData(userId)); // Blocking call
}
// Parallel with CompletableFuture
List<CompletableFuture<UserData>> futures = userIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> fetchUserData(id)))
.collect(Collectors.toList());
// Wait for all to complete
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// Get results
CompletableFuture<List<UserData>> allUsersFuture = allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
List<UserData> allUsers = allUsersFuture.join();
// Helper method
private UserData fetchUserData(String userId) {
// Simulate API call
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new UserData(userId, "User " + userId);
}
// Data class
class UserData {
private final String id;
private final String name;
UserData(String id, String name) {
this.id = id;
this.name = name;
}
// Getters
}
CompletableFuture<Double> priceTask = CompletableFuture
.supplyAsync(() -> fetchProductPrice("product123"))
.thenCompose(price ->
CompletableFuture.supplyAsync(() -> fetchTaxRate())
.thenApply(taxRate -> price * (1 + taxRate))
);
double finalPrice = priceTask.join();
// Helper methods
private double fetchProductPrice(String productId) {
// Simulate API call
return 29.99;
}
private double fetchTaxRate() {
// Simulate API call
return 0.08; // 8% tax
}
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
// Primary data source
if (Math.random() < 0.7) {
throw new RuntimeException("Primary source failed");
}
return "Data from primary";
})
.exceptionally(ex -> {
System.out.println("Primary failed, trying backup...");
// Try backup on failure
return CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.3) {
throw new RuntimeException("Backup source failed too");
}
return "Data from backup";
}).exceptionally(ex2 -> {
System.out.println("Backup failed too, using cached data");
return "Cached data";
}).join();
});
String result = future.join();
System.out.println("Final result: " + result);