Executors and Thread Pools
Thread pools provide a solution to the overhead of thread creation and management by reusing threads for multiple tasks. Java’s Executor framework provides a standardized way to work with thread pools.
Introduction to Executors
Section titled “Introduction to Executors”The Executor framework separates task submission from task execution, providing a higher-level replacement for manually working with threads.
Key Interfaces:
- Executor: Simple interface for executing tasks
- ExecutorService: Extended interface with lifecycle methods and task submission
- ScheduledExecutorService: Adds scheduling capabilities
// Basic exampleExecutorService executor = Executors.newFixedThreadPool(4);executor.submit(() -> System.out.println("Task executed by " + Thread.currentThread().getName()));executor.shutdown();Types of Executor Services
Section titled “Types of Executor Services”ThreadPoolExecutor
Section titled “ThreadPoolExecutor”The core implementation of ExecutorService that provides a configurable thread pool.
ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, // core pool size 5, // max pool size 60, TimeUnit.SECONDS, // keep-alive time new LinkedBlockingQueue<>(), // work queue Executors.defaultThreadFactory(), // thread factory new ThreadPoolExecutor.AbortPolicy() // rejection policy);ScheduledThreadPoolExecutor
Section titled “ScheduledThreadPoolExecutor”Extends ThreadPoolExecutor to support delayed and periodic task execution.
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);scheduler.schedule(() -> System.out.println("Delayed task"), 1, TimeUnit.SECONDS);scheduler.scheduleAtFixedRate(() -> System.out.println("Periodic task"), 0, 1, TimeUnit.SECONDS);Executors Factory Methods
Section titled “Executors Factory Methods”Convenience methods for creating pre-configured executor services:
- newFixedThreadPool(n): Fixed number of threads
- newCachedThreadPool(): Expandable pool, reuses threads
- newSingleThreadExecutor(): Single worker thread
- newScheduledThreadPool(n): Fixed-size pool for scheduled tasks
- newWorkStealingPool(): Work-stealing pool (Java 8+)
ExecutorService fixed = Executors.newFixedThreadPool(4);ExecutorService cached = Executors.newCachedThreadPool();ExecutorService single = Executors.newSingleThreadExecutor();ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);ExecutorService workStealing = Executors.newWorkStealingPool();Thread Pool Configuration
Section titled “Thread Pool Configuration”Core Pool Size
Section titled “Core Pool Size”- Minimum number of threads to keep alive
- Threads up to this size are kept even when idle
Maximum Pool Size
Section titled “Maximum Pool Size”- Upper limit on thread count
- Additional threads are created only when the work queue is full
Keep-Alive Time
Section titled “Keep-Alive Time”- How long excess idle threads (beyond core size) are kept alive
Work Queue
Section titled “Work Queue”- Holds tasks before they’re executed
- Types: unbounded, bounded, synchronous
Thread Factory
Section titled “Thread Factory”- Creates new threads for the pool
- Can customize thread names, priorities, daemon status
Rejection Policy
Section titled “Rejection Policy”- Handles tasks when the executor is saturated or shut down
- Standard policies:
- AbortPolicy: Throws RejectedExecutionException
- CallerRunsPolicy: Executes task in the caller’s thread
- DiscardPolicy: Silently discards the task
- DiscardOldestPolicy: Discards oldest task and tries again
Fork/Join Framework
Section titled “Fork/Join Framework”A specialized implementation of ExecutorService designed for recursive divide-and-conquer tasks.
RecursiveTask and RecursiveAction
Section titled “RecursiveTask and RecursiveAction”- RecursiveTask
: Returns a result - RecursiveAction: Performs an action without returning a result
class SumTask extends RecursiveTask<Long> { private final long[] array; private final int start; private final int end; private static final int THRESHOLD = 10000;
SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected Long compute() { if (end - start <= THRESHOLD) { // Sequential computation for small enough tasks long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { // Split task into smaller subtasks int mid = start + (end - start) / 2; SumTask leftTask = new SumTask(array, start, mid); SumTask rightTask = new SumTask(array, mid, end);
// Fork right task rightTask.fork();
// Compute left task long leftResult = leftTask.compute();
// Join right task long rightResult = rightTask.join();
// Combine results return leftResult + rightResult; } }}
// UsageForkJoinPool pool = new ForkJoinPool();long[] array = new long[100000];// Initialize array...long sum = pool.invoke(new SumTask(array, 0, array.length));Work Stealing
Section titled “Work Stealing”- Each worker thread has its own task queue
- Idle threads can “steal” tasks from busy threads’ queues
- Improves load balancing and reduces contention
Common Patterns and Best Practices
Section titled “Common Patterns and Best Practices”Proper Shutdown
Section titled “Proper Shutdown”ExecutorService executor = Executors.newFixedThreadPool(4);try { // Submit tasks...} finally { executor.shutdown(); if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow(); }}Handling Exceptions
Section titled “Handling Exceptions”Future<?> future = executor.submit(() -> { throw new RuntimeException("Task failed");});
try { future.get(); // Will throw ExecutionException} catch (ExecutionException e) { Throwable cause = e.getCause(); // The actual exception // Handle exception}Configuring Thread Names
Section titled “Configuring Thread Names”ExecutorService executor = Executors.newFixedThreadPool(4, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(0);
@Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("Worker-" + counter.incrementAndGet()); return thread; }});Choosing the Right Pool Size
Section titled “Choosing the Right Pool Size”// A common formula for CPU-bound tasksint poolSize = Runtime.getRuntime().availableProcessors();
// For I/O-bound tasks, you might want more threadsint poolSize = Runtime.getRuntime().availableProcessors() * (1 + waitTime/computeTime);Code Examples
Section titled “Code Examples”Basic Task Submission
Section titled “Basic Task Submission”ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit Runnable (no result)executor.execute(() -> System.out.println("Simple task"));
// Submit Runnable with Future (for completion tracking)Future<?> future1 = executor.submit(() -> System.out.println("Task with Future"));
// Submit Callable (with result)Future<String> future2 = executor.submit(() -> "Task result");
// Get result (blocks until available)String result = future2.get();
executor.shutdown();Scheduled Tasks
Section titled “Scheduled Tasks”ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// Run once after delayscheduler.schedule( () -> System.out.println("Delayed task"), 2, TimeUnit.SECONDS);
// Run repeatedly at fixed rate (from start time)scheduler.scheduleAtFixedRate( () -> System.out.println("Fixed rate task"), 1, 5, TimeUnit.SECONDS);
// Run repeatedly with fixed delay between tasksscheduler.scheduleWithFixedDelay( () -> System.out.println("Fixed delay task"), 1, 5, TimeUnit.SECONDS);
// Later...scheduler.shutdown();Parallel Processing with CompletableFuture (Java 8+)
Section titled “Parallel Processing with CompletableFuture (Java 8+)”ExecutorService executor = Executors.newFixedThreadPool(4);
List<String> urls = Arrays.asList( "https://example.com/1", "https://example.com/2", "https://example.com/3");
List<CompletableFuture<String>> futures = urls.stream() .map(url -> CompletableFuture.supplyAsync(() -> fetchUrl(url), executor)) .collect(Collectors.toList());
// Wait for all to completeCompletableFuture<Void> allFutures = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]));
// Get all resultsList<String> results = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList());
executor.shutdown();
// Helper methodprivate static String fetchUrl(String url) { // Simulating HTTP request try { Thread.sleep(100 + new Random().nextInt(900)); return "Result from " + url; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return "Error"; }}Custom ThreadPoolExecutor
Section titled “Custom ThreadPoolExecutor”ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), r -> { Thread t = new Thread(r); t.setName("CustomWorker-" + t.getId()); t.setDaemon(true); return t; }, new ThreadPoolExecutor.CallerRunsPolicy());
// Monitor the poolScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();monitor.scheduleAtFixedRate(() -> { System.out.println( String.format("Pool stats: Active=%d, Pool=%d, Queue=%d, Completed=%d", executor.getActiveCount(), executor.getPoolSize(), executor.getQueue().size(), executor.getCompletedTaskCount() ) );}, 0, 1, TimeUnit.SECONDS);
// Submit some tasksfor (int i = 0; i < 200; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Executing task " + taskId); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } });}
// Shutdown after tasks completeexecutor.shutdown();monitor.shutdown();