Futures and Asynchronous Programming
Asynchronous programming allows operations to execute without blocking the main thread, improving responsiveness and throughput. Java provides several mechanisms for asynchronous programming, with CompletableFuture (introduced in Java 8) being the most powerful.
Introduction to Asynchronous Programming
Section titled “Introduction to Asynchronous Programming”Asynchronous programming allows operations to run independently of the main application flow, enabling:
- Non-blocking operations: The calling thread can continue execution without waiting
- Improved responsiveness: UI threads remain responsive during long-running operations
- Better resource utilization: Threads aren’t blocked waiting for I/O or other operations
- Scalability: More concurrent operations can be performed with fewer threads
Future and Callable
Section titled “Future and Callable”Callable Interface
Section titled “Callable Interface”Similar to Runnable, but can return a result and throw checked exceptions.
Callable<String> task = () -> { Thread.sleep(1000); return "Result";};Future Interface
Section titled “Future Interface”Represents the result of an asynchronous computation.
ExecutorService executor = Executors.newSingleThreadExecutor();Future<String> future = executor.submit(task);
// Check if completedboolean isDone = future.isDone();
// Cancel taskboolean canceled = future.cancel(true);
// Get result (blocks until available)try { String result = future.get(); // or with timeout String resultWithTimeout = future.get(2, TimeUnit.SECONDS);} catch (InterruptedException | ExecutionException | TimeoutException e) { // Handle exceptions}
executor.shutdown();Limitations of Future:
- Cannot be manually completed
- Cannot be chained or composed
- No built-in exception handling
- Blocking get() method
CompletableFuture
Section titled “CompletableFuture”Introduced in Java 8, CompletableFuture implements both Future and CompletionStage, providing a rich set of methods for asynchronous programming.
Creation
Section titled “Creation”// Empty futureCompletableFuture<String> future1 = new CompletableFuture<>();future1.complete("Result"); // Manual completion
// Completed futureCompletableFuture<String> future2 = CompletableFuture.completedFuture("Result");
// Run async (no result)CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> { // Some operation});
// Supply async (with result)CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> { return "Result";});
// With custom executorExecutorService executor = Executors.newFixedThreadPool(4);CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> { return "Result";}, executor);Transformation
Section titled “Transformation”CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
// Transform resultCompletableFuture<String> transformed1 = future.thenApply(s -> s + " World");
// Transform to different typeCompletableFuture<Integer> transformed2 = future.thenApply(String::length);
// Transform with async executionCompletableFuture<Integer> transformed3 = future.thenApplyAsync(String::length);
// Transform without resultCompletableFuture<Void> transformed4 = future.thenAccept(System.out::println);
// Just run after completionCompletableFuture<Void> transformed5 = future.thenRun(() -> System.out.println("Done"));Composition
Section titled “Composition”// Combine two futures (both must complete)CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined1 = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
// Chain futures (sequential)CompletableFuture<String> future3 = future1.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
// First to completeCompletableFuture<Object> future4 = CompletableFuture.anyOf(future1, future2);
// All completeCompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
// Get all results after allOfCompletableFuture<List<String>> allResults = allFuture.thenApply(v -> { return Stream.of(future1, future2) .map(CompletableFuture::join) .collect(Collectors.toList());});Exception Handling
Section titled “Exception Handling”CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("Error"); } return "Success";});
// Handle exceptionsCompletableFuture<String> handled1 = future.exceptionally(ex -> { System.err.println("Error: " + ex.getMessage()); return "Default Value";});
// Handle both success and failureCompletableFuture<String> handled2 = future.handle((result, ex) -> { if (ex != null) { return "Error occurred: " + ex.getMessage(); } else { return "Success: " + result; }});
// When complete (callback)future.whenComplete((result, ex) -> { if (ex != null) { System.err.println("Error: " + ex.getMessage()); } else { System.out.println("Result: " + result); }});Completion Stages
Section titled “Completion Stages”The CompletionStage interface (implemented by CompletableFuture) provides a framework for multi-stage asynchronous computation.
CompletableFuture.supplyAsync(() -> "Step 1") .thenApply(s -> s + " -> Step 2") .thenApply(s -> s + " -> Step 3") .thenAccept(System.out::println);Reactive Programming Basics
Section titled “Reactive Programming Basics”Java 9 introduced the Flow API for reactive programming, providing interfaces for Reactive Streams.
// Publisher (produces items)Flow.Publisher<String> publisher = subscriber -> { subscriber.onSubscribe(new Flow.Subscription() { // Implementation });};
// Subscriber (consumes items)Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() { @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); }
@Override public void onNext(String item) { System.out.println(item); }
@Override public void onError(Throwable throwable) { throwable.printStackTrace(); }
@Override public void onComplete() { System.out.println("Complete"); }};
// Connect thempublisher.subscribe(subscriber);Note: For serious reactive programming, consider using libraries like RxJava, Project Reactor, or Akka.
Common Patterns and Best Practices
Section titled “Common Patterns and Best Practices”Async/Await Pattern (Java Style)
Section titled “Async/Await Pattern (Java Style)”// "Async" methodCompletableFuture<String> fetchDataAsync() { return CompletableFuture.supplyAsync(() -> { // Simulate network call try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "Data"; });}
// "Await" usagevoid processData() { fetchDataAsync() .thenAccept(data -> System.out.println("Processed: " + data));
// Or if you need to wait String result = fetchDataAsync().join(); // Similar to await}Timeout Handling
Section titled “Timeout Handling”CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); return "Result"; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; }});
// Add timeoutCompletableFuture<String> futureWithTimeout = future.completeOnTimeout("Timeout", 1, TimeUnit.SECONDS);
// Or handle timeout separatelyfuture.orTimeout(1, TimeUnit.SECONDS) .exceptionally(ex -> { if (ex instanceof TimeoutException) { return "Timeout occurred"; } return "Other error: " + ex.getMessage(); });Cancellation
Section titled “Cancellation”CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // Long-running task return "Result";});
// Cancel the futureboolean canceled = future.cancel(true);boolean isCancelled = future.isCancelled();Code Examples
Section titled “Code Examples”Parallel API Calls
Section titled “Parallel API Calls”List<String> userIds = Arrays.asList("user1", "user2", "user3");
// SequentialList<UserData> users = new ArrayList<>();for (String userId : userIds) { users.add(fetchUserData(userId)); // Blocking call}
// Parallel with CompletableFutureList<CompletableFuture<UserData>> futures = userIds.stream() .map(id -> CompletableFuture.supplyAsync(() -> fetchUserData(id))) .collect(Collectors.toList());
// Wait for all to completeCompletableFuture<Void> allFutures = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]));
// Get resultsCompletableFuture<List<UserData>> allUsersFuture = allFutures.thenApply(v -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()));
List<UserData> allUsers = allUsersFuture.join();
// Helper methodprivate UserData fetchUserData(String userId) { // Simulate API call try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return new UserData(userId, "User " + userId);}
// Data classclass UserData { private final String id; private final String name;
UserData(String id, String name) { this.id = id; this.name = name; }
// Getters}Dependent Tasks
Section titled “Dependent Tasks”CompletableFuture<Double> priceTask = CompletableFuture .supplyAsync(() -> fetchProductPrice("product123")) .thenCompose(price -> CompletableFuture.supplyAsync(() -> fetchTaxRate()) .thenApply(taxRate -> price * (1 + taxRate)) );
double finalPrice = priceTask.join();
// Helper methodsprivate double fetchProductPrice(String productId) { // Simulate API call return 29.99;}
private double fetchTaxRate() { // Simulate API call return 0.08; // 8% tax}Error Recovery Chain
Section titled “Error Recovery Chain”CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { // Primary data source if (Math.random() < 0.7) { throw new RuntimeException("Primary source failed"); } return "Data from primary"; }) .exceptionally(ex -> { System.out.println("Primary failed, trying backup..."); // Try backup on failure return CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.3) { throw new RuntimeException("Backup source failed too"); } return "Data from backup"; }).exceptionally(ex2 -> { System.out.println("Backup failed too, using cached data"); return "Cached data"; }).join(); });
String result = future.join();System.out.println("Final result: " + result);