引言
在Java编程语言的发展历程中,并发编程一直是一个核心且不断演进的领域。从早期JDK 1.0中简单的synchronized
关键字,到现代Java中强大的CompletableFuture
API,Java的并发工具不断丰富和完善,为开发者提供了越来越多的选择来构建高效、可靠的并发应用。本文将带您了解Java并发编程的发展历程,探讨各个阶段的关键特性、优缺点以及适用场景。
目录
- 早期并发:synchronized与volatile
- JDK 1.5:java.util.concurrent包的诞生
- JDK 1.7:Fork/Join框架
- JDK 1.8:CompletableFuture与并行流
- JDK 9及以后:响应式编程的影响
- 并发编程的最佳实践
- 总结与展望
早期并发:synchronized与volatile
在Java诞生之初,它就内置了对多线程的支持,但并发工具相对简单。
synchronized关键字
synchronized
是Java最早提供的并发控制机制,它提供了一种简单直接的方式来实现线程间的互斥访问。
public class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
优点:
- 简单易用,语法直观
- 由JVM层面保证原子性、可见性和有序性
- 自动获取和释放锁
缺点:
- 性能开销较大(尤其在早期JDK版本)
- 无法中断一个正在等待获取锁的线程
- 无法设置超时
- 同步粒度较粗
volatile关键字
volatile
关键字用于确保变量的可见性,但不保证原子性。
public class StatusFlag {
private volatile boolean running = true;
public void stop() {
running = false;
}
public void doWork() {
while (running) {
// 执行工作...
}
}
}
优点:
- 轻量级同步机制
- 保证可见性和有序性
缺点:
- 不保证原子性
- 使用场景有限
早期并发编程的挑战
早期Java并发编程面临着几个主要挑战:
- 死锁风险:不当使用
synchronized
容易导致死锁 - 性能问题:
synchronized
在竞争激烈时性能下降明显 - 功能受限:缺乏高级并发工具,如线程池、原子变量等
- 编程复杂性:手动管理线程同步逻辑复杂且容易出错
JDK 1.5:java.util.concurrent包的诞生
JDK 1.5(也称为Java 5)是Java并发编程的一个重要里程碑,引入了java.util.concurrent
包,大幅扩展了并发工具集。
锁机制的改进:Lock接口
Lock
接口提供了比synchronized
更灵活的锁操作:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private int count = 0;
private final Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
主要特性:
- 可中断的锁获取
- 可设置超时的锁获取
- 公平锁选项
- 条件变量支持
原子变量
java.util.concurrent.atomic
包提供了一系列原子变量类,支持无锁的线程安全操作:
import java.util.concurrent.atomic.AtomicInteger;
public class Counter {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
并发集合
引入了线程安全的集合类,如ConcurrentHashMap
、CopyOnWriteArrayList
等:
import java.util.concurrent.ConcurrentHashMap;
public class Cache {
private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
public void put(String key, Object value) {
cache.put(key, value);
}
public Object get(String key) {
return cache.get(key);
}
}
线程池与Executor框架
Executor
框架提供了一种标准化的方式来管理线程:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TaskExecutor {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void executeTask(Runnable task) {
executor.execute(task);
}
public void shutdown() {
executor.shutdown();
}
}
Future接口
Future
接口提供了一种方式来获取异步计算的结果:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class AsyncCalculator {
private final ExecutorService executor = Executors.newCachedThreadPool();
public Future<Integer> calculate(final int input) {
return executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// 复杂计算
Thread.sleep(1000); // 模拟耗时操作
return input * input;
}
});
}
}
BlockingQueue
BlockingQueue
接口支持生产者-消费者模式:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class MessageQueue {
private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
public void send(String message) throws InterruptedException {
queue.put(message);
}
public String receive() throws InterruptedException {
return queue.take();
}
}
JDK 1.7:Fork/Join框架
JDK 1.7引入了Fork/Join框架,这是一个专为并行计算设计的框架,特别适合"分而治之"的问题。
基本概念
Fork/Join框架基于"工作窃取"算法,允许空闲的工作线程从忙碌的工作线程队列中窃取任务。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class SumCalculator extends RecursiveTask<Long> {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 10000;
public SumCalculator(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 小任务直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 大任务分解
int middle = (start + end) / 2;
SumCalculator leftTask = new SumCalculator(array, start, middle);
SumCalculator rightTask = new SumCalculator(array, middle, end);
leftTask.fork(); // 异步执行左半部分
long rightResult = rightTask.compute(); // 同步执行右半部分
long leftResult = leftTask.join(); // 等待左半部分结果
return leftResult + rightResult;
}
}
public static long sum(int[] array) {
ForkJoinPool pool = new ForkJoinPool();
return pool.invoke(new SumCalculator(array, 0, array.length));
}
}
Fork/Join框架的优势
- 适合处理可递归分解的任务
- 自动负载均衡
- 充分利用多核处理器
- 比传统线程池更高效地处理细粒度并行任务
JDK 1.8:CompletableFuture与并行流
JDK 1.8带来了两个重要的并发编程工具:CompletableFuture
和并行流。
CompletableFuture
CompletableFuture
是对Future
接口的增强,支持函数式编程风格的异步编程:
import java.util.concurrent.CompletableFuture;
public class AsyncService {
public CompletableFuture<String> fetchData(String id) {
return CompletableFuture.supplyAsync(() -> {
// 模拟远程调用
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Data for " + id;
});
}
public CompletableFuture<String> processData(String data) {
return CompletableFuture.supplyAsync(() -> {
// 模拟数据处理
return data.toUpperCase();
});
}
public CompletableFuture<String> fetchAndProcess(String id) {
return fetchData(id)
.thenCompose(this::processData)
.exceptionally(ex -> "Error: " + ex.getMessage());
}
public CompletableFuture<String> fetchMultipleAndCombine(String id1, String id2) {
CompletableFuture<String> future1 = fetchData(id1);
CompletableFuture<String> future2 = fetchData(id2);
return future1.thenCombine(future2, (data1, data2) -> data1 + " + " + data2);
}
}
CompletableFuture的主要特性:
- 组合操作:
thenCompose
、thenCombine
、thenAccept
等 - 异常处理:
exceptionally
、handle
等 - 多Future协调:
allOf
、anyOf
等 - 自定义执行器:可指定任务的执行线程池
并行流
并行流利用Fork/Join框架,提供了一种简单的方式来并行处理集合:
import java.util.Arrays;
import java.util.List;
public class ParallelStreamExample {
public long sumOfSquares(List<Integer> numbers) {
return numbers.parallelStream()
.mapToLong(i -> i * i)
.sum();
}
public List<String> processStrings(List<String> strings) {
return strings.parallelStream()
.filter(s -> s.length() > 3)
.map(String::toUpperCase)
.sorted()
.toList();
}
public static void main(String[] args) {
ParallelStreamExample example = new ParallelStreamExample();
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
System.out.println("Sum of squares: " + example.sumOfSquares(numbers));
List<String> strings = Arrays.asList("apple", "banana", "cherry", "date", "elderberry");
System.out.println("Processed strings: " + example.processStrings(strings));
}
}
并行流的注意事项:
- 并不是所有操作都适合并行化
- 小数据集上并行处理可能比顺序处理更慢
- 有状态操作可能影响并行性能
- 共享可变状态可能导致并发问题
JDK 9及以后:响应式编程的影响
从JDK 9开始,Java并发编程开始受到响应式编程范式的影响。
Flow API (JDK 9)
JDK 9引入了java.util.concurrent.Flow
API,提供了响应式编程的基础接口:
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class FlowExample {
// 自定义处理器,实现Processor接口
static class TransformProcessor<T, R> extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Flow.Subscription subscription;
private final Function<T, R> function;
TransformProcessor(Function<T, R> function) {
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
closeExceptionally(throwable);
}
@Override
public void onComplete() {
close();
}
}
public static void main(String[] args) throws InterruptedException {
// 创建发布者
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
// 创建处理器
TransformProcessor<Integer, String> processor =
new TransformProcessor<>(i -> "Transformed: " + i);
// 创建订阅者
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Received: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
};
// 连接发布者、处理器和订阅者
publisher.subscribe(processor);
processor.subscribe(subscriber);
// 发布项目
System.out.println("Publishing items...");
for (int i = 0; i < 5; i++) {
publisher.submit(i);
}
Thread.sleep(1000);
// 关闭发布者
publisher.close();
Thread.sleep(1000);
}
}
CompletableFuture的增强 (JDK 9+)
JDK 9及以后版本对CompletableFuture
进行了增强:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class EnhancedCompletableFutureExample {
// JDK 9: 新增超时方法
public void timeoutExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result";
});
// 设置1秒超时
CompletableFuture<String> futureWithTimeout = future.orTimeout(1, TimeUnit.SECONDS);
try {
String result = futureWithTimeout.join();
System.out.println(result);
} catch (Exception e) {
System.out.println("Operation timed out: " + e.getMessage());
}
}
// JDK 9: 超时后提供默认值
public void completeOnTimeoutExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result";
});
// 1秒后超时并提供默认值
CompletableFuture<String> futureWithDefault =
future.completeOnTimeout("Default", 1, TimeUnit.SECONDS);
String result = futureWithDefault.join();
System.out.println(result); // 输出 "Default"
}
// JDK 12: 新增exceptionallyCompose方法
public void exceptionallyComposeExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Simulated error");
}
return "Success";
});
CompletableFuture<String> recovered = future.exceptionallyCompose(ex -> {
System.out.println("Handling error: " + ex.getMessage());
return CompletableFuture.supplyAsync(() -> "Recovered");
});
String result = recovered.join();
System.out.println(result);
}
}
并发编程的最佳实践
随着Java并发工具的不断演进,一些最佳实践也逐渐形成。
选择合适的并发工具
// 简单互斥场景:使用synchronized
public synchronized void simpleMethod() {
// 简单的互斥操作
}
// 需要更多控制的互斥场景:使用Lock
private final ReentrantLock lock = new ReentrantLock();
public void complexLockMethod() {
lock.lock();
try {
// 需要更多控制的互斥操作
} finally {
lock.unlock();
}
}
// 简单计数器:使用AtomicInteger
private final AtomicInteger counter = new AtomicInteger();
public void incrementCounter() {
counter.incrementAndGet();
}
// 异步任务处理:使用CompletableFuture
public CompletableFuture<String> asyncProcess() {
return CompletableFuture.supplyAsync(this::computeResult)
.thenApply(this::postProcess);
}
// 并行集合处理:使用并行流
public long parallelSum(List<Integer> numbers) {
return numbers.parallelStream().mapToLong(Integer::longValue).sum();
}
避免常见陷阱
- 死锁预防:
// 不好的做法:可能导致死锁
public void transferMoney(Account from, Account to, double amount) {
synchronized(from) {
synchronized(to) {
from.debit(amount);
to.credit(amount);
}
}
}
// 好的做法:按固定顺序获取锁
public void transferMoney(Account from, Account to, double amount) {
// 确保按账户ID顺序获取锁
Account first = from.getId() < to.getId() ? from : to;
Account second = from.getId() < to.getId() ? to : from;
synchronized(first) {
synchronized(second) {
if (from == first) {
from.debit(amount);
to.credit(amount);
} else {
to.credit(amount);
from.debit(amount);
}
}
}
}
- 避免过度同步:
// 不好的做法:过度同步
public class OverSynchronizedCache {
private final Map<String, Object> cache = new HashMap<>();
public synchronized Object get(String key) {
return cache.get(key); // 只读操作不需要同步整个方法
}
public synchronized void put(String key, Object value) {
cache.put(key, value);
}
}
// 好的做法:使用并发集合
public class ConcurrentCache {
private final ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
public Object get(String key) {
return cache.get(key); // 无需额外同步
}
public void put(String key, Object value) {
cache.put(key, value); // 无需额外同步
}
}
- 正确使用volatile:
// 不好的做法:volatile不保证复合操作的原子性
public class VolatileMisuse {
private volatile int count = 0;
public void increment() {
count++; // 非原子操作,即使count是volatile
}
}
// 好的做法:使用AtomicInteger
public class AtomicCounter {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 原子操作
}
}
性能考量
- 避免锁竞争:
// 不好的做法:单一锁导致高竞争
public class SingleLockCache {
private final Map<String, Object> cache = new HashMap<>();
private final Object lock = new Object();
public Object get(String key) {
synchronized(lock) {
return cache.get(key);
}
}
public void put(String key, Object value) {
synchronized(lock) {
cache.put(key, value);
}
}
}
// 好的做法:分段锁减少竞争
public class StripedLockCache {
private static final int SEGMENTS = 16;
private final Map<String, Object>[] segments = new HashMap[SEGMENTS];
private final Object[] locks = new Object[SEGMENTS];
public StripedLockCache() {
for (int i = 0; i < SEGMENTS; i++) {
segments[i] = new HashMap<>();
locks[i] = new Object();
}
}
private int getSegment(String key) {
return Math.abs(key.hashCode() % SEGMENTS);
}
public Object get(String key) {
int segment = getSegment(key);
synchronized(locks[segment]) {
return segments[segment].get(key);
}
}
public void put(String key, Object value) {
int segment = getSegment(key);
synchronized(locks[segment]) {
segments[segment].put(key, value);
}
}
}
- 合理使用线程池:
// 不好的做法:为每个任务创建新线程
public class NewThreadPerTask {
public void executeTask(Runnable task) {
new Thread(task).start(); // 为每个任务创建新线程
}
}
// 好的做法:使用线程池
public class ThreadPoolExecutor {
private final ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
public void executeTask(Runnable task) {
executor.execute(task);
}
public void shutdown() {
executor.shutdown();
}
}
总结与展望
Java并发编程从最初的synchronized
和volatile
,发展到现在的CompletableFuture
和响应式编程,经历了巨大的变革。这一演进过程不仅反映了Java语言自身的成熟,也体现了并发编程范式的发展。
演进总结
- 早期Java:基础的
synchronized
和volatile
- JDK 1.5:引入
java.util.concurrent
包,提供更丰富的并发工具 - JDK 1.7:引入Fork/Join框架,支持并行计算
- JDK 1.8:引入
CompletableFuture
和并行流,支持函数式并发编程 - JDK 9及以后:引入Flow API,支持响应式编程
未来展望
Java并发编程的未来可能会朝着以下方向发展:
- 更强大的响应式编程支持:随着响应式编程的普及,Java可能会提供更多原生支持
- 更好的协程支持:类似Kotlin的协程,提供更轻量级的并发模型
- 更智能的自动并行化:编译器和运行时可能会更智能地自动并行化代码
- 更好的硬件适配:更好地利用现代CPU的多核和SIMD特性
选择合适的并发工具
在实际开发中,应根据具体需求选择合适的并发工具:
- 简单互斥:
synchronized
关键字 - 高级锁功能:
Lock
接口及其实现 - 无锁数据结构:
Atomic*
类 - 线程协调:
CountDownLatch
、CyclicBarrier
、Phaser
- 生产者-消费者:
BlockingQueue
接口及其实现 - 线程池:
ExecutorService
接口及其实现 - 异步编程:
CompletableFuture
- 并行数据处理:并行流
- 响应式编程:Flow API
Java并发编程的演进历程告诉我们,没有万能的并发工具,只有最适合特定场景的工具。理解各种并发工具的特性、优缺点和适用场景,才能在实际开发中做出明智的选择。
参考资料:
- Brian Goetz et al., Java Concurrency in Practice
- Doug Lea, Concurrent Programming in Java: Design Principles and Patterns
- Oracle Java Documentation, java.util.concurrent
- Oracle Java Documentation, java.util.concurrent.Flow