深入理解Java多线程(四):锁、锁工具、并发容器
JUC 提供了一套锁对象,分别是:Lock 接口、AbstractQueuedSynchronizaer(队列同步器)、ReentrantLock、ReentrantReadWriteLock、Condition、LockSupport。
Lock 接口
Lock 接口定义了锁工具常用的方法,ReentrantLock、ReentrantReadWriteLock 都是 Lock 的具体实现。
- lock()用于加锁
- unlock()释放锁
- lockInterruptibly()可中断加锁,线程持有锁后可被终端,中断后抛出一个异常以通知线程
- tryLock(), 尝试加锁,并返回获取锁的结果,后续需要根据返回结果自行实现阻塞。
- newConditioin()获取 Condition 对象,Condition 对象以在 Lock 锁的同步块内进行阻塞(await)和通知其他线程执行操作(singal),功能同 Ojbect 的 wait/notify 方法。
AbstractQueuedSynchronizaer 同步器
AbstractQueueSynchronizaer(抽象队列同步器)是一个抽象类,内置 FIFO 队列,是 JUC 的并发包实现的核心,它的设计者希望它能能成为大部分并发需求实现的基础。很多同步器都是继承 AbstractQueueSynchronizaer 实现的,重入锁、读写锁中锁都是继承 AQS。
ReentrantLock 中锁
ReentrantLock 的非公平锁
ReentrantLock 的公平锁
使用 AQS
使用同步器时使用以下三个方法改变状态:
- getState() 获取当前同步状态
- setState() 设置当前同步状态
- compareAndSetState() 使用 CAS 设置当前状态,该方法能保证以原子操作设置状态
使用同步器应该重写的方法:
方法 | 说明 |
---|---|
boolean tryAcquire(int arg) | 独占方式获取同步状态,CAS 更新状态,arg 为锁状态,例如加锁状态是 1,那么调用参数应该传参 1,下同 |
boolean tryRealease(int arg) | 独占释放同步,释放后等待获取同步的线程将有机会获取锁 🔒 |
int tryAcquireShared(int arg) | 共享获取同步状态,返回值大于 0 表示成功,反之失败 |
boolean tryReleaseShared(int arg) | 共享释放同步状态 |
boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程占用 |
同步器提供的模板方法:
这些方法可以直接使用,并且这些方法和 Lock 接口中的方法很像。
方法 | 说明 |
---|---|
void acquire(int arg) | 独占获取同步状态,如果当前线程获取同步状态成功,则有该方法返回,否则进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 与 acquire()相同但该方法会相应中断,当前线程为获取到同步状态就进入同步队列,如果当前线程中断则抛出 InterruptedException并返回 |
voidtryAcquireNanos(int arg, long nanos) | 在 acquireInterruptibly(int arg)方法上增加了超时等待功能,并且有返回值,未超时且获取到同步状态返回 true,超时返回 false |
void acquireShared(int arg) | 共享式获取同步状态如果当前线程未获取到同步状态,将会进入同步队列等待,与独占获取的主要区别是同一时刻可以由多个线程获取到状态 |
void acquireSharedInterruptibly(int arg) | 与 acquireShare(int arg)方法相同,不用点是该方法相应中断 |
boolean acquireSharedNanos(int arg, long nanos) | 在 acquireSharedInterruptibly(int arg)基础上增加了超时等待 |
boolean release(int arg) | 独占方式释放同步状态,释放同步状态后,阻塞队列的第一个线程将被唤醒 |
boolean releaseShared(int arg) | 共享式释放锁 |
词汇解释:
- 超时等待:在执行操作是等待固定的时间,如果超出了这个等待时间就不等待了,然后继续往下执行,通常超时等待方法都有返回值表面请求的结果(如
boolean acquireSharedNanos(int arg, long nanos)
,也有没有返回值的:Object.wait(long timeout)、LockSupport.park(long nanos) - “如果获取到锁,从方法中返回”:意思是不在请求方法中阻塞了,该方法执行完毕或 return 返回值了
- 独占、共享:独占同一时刻被以可线程访问或占用,共享式多个
使用 AQS 实现非重入互斥锁
// 非重入互斥锁
// 锁定义:采用同步非阻塞方式(CAS+volatile),状态0代表可获取锁,状态1代表不可获取锁
public class Mutex implements Lock {
// 继承并覆盖AQS
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
// CAS,期望值为0,更新值为1
if(compareAndSetState(0,1)){
// CAS更新成功,设置当前线程为独占
setExclusiveOwnerThread(Thread.currentThread());
//返回true表锁当前线程获取到了锁
return true;
}
// false未获取到锁
return false;
}
@Override
protected boolean tryRelease(int arg) {
// 非0装才允许释放锁,否则抛异常
if(getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
// 更新状态,0:锁为可获取状态
setState(0);
// 锁释放成功
return true;
}
// 状态为1说明有个线程正在持有锁
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
// 返回条件对象,用与在同步块内线程通信(wait/signal)
Condition newCondition(){
return new ConditionObject();
}
}
private final Sync sync = new Sync();
// 所有加锁传入的状态都是1,然后调用Sync的方法
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
测试
public class TestMutex {
Mutex mutex = new Mutex();
void m1(){
try {
mutex.lock();
System.out.println("m1......");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
mutex.unlock();
}
}
void m2(){
try {
mutex.lock();
System.out.println("m2......");
Thread.sleep(1000);
}catch (InterruptedException e){
e.printStackTrace();
}finally {
mutex.unlock();
}
}
public static void main(String[] args) {
TestMutex o = new TestMutex();
// 执行m1
new Thread(o::m1,"t1").start();
new Thread(o::m1,"t2").start();
// 执行m2
new Thread(o::m2,"t3").start();
}
}
ReentrantLock、ReentrantReadWriteLock
ReentrantLock 是可重入锁,实现了公平锁和非公平锁(默认),除了能提供 synchronized 相同的功能,还提供了尝试加锁,超时加锁,加锁中断等特性。
ReentrantReadWriteLock 是对 ReentrantLock 的再次改进,因为并不所由操作都需要互斥访问的,例如在无写的情况下多线程进行读,这不会使数据变脏,而在写到来时才需要同步,ReentrantReadWriteLock 应用而生,读写锁的特点是读是共享的,写是独占的,且读操作都在一个写操作完成后进行,因此在多读的场景下能提高同步性能。
ReentranLock 的使用
独占加锁,多个线程公用一个 ReentrantLock 对象,对同步代码加锁释放锁。
注意:所有的加锁都要手动释放!
使用模板:
Reentrant lock = new ReentrantLock();
...
lock.lock()
try{
...
同步代码
...
}finally{
lock.unlock();
}
使用“尝试获取锁”,使用尝试获取锁得到返回值后仍要需要我们自行控制同步。
...
try{
while(!tryLock()){
//未获取到锁空转阻塞
}
同步代码...
}finally{
lock.unlock();
}
使用 Condition,Condition 可在同步代码中进一步进行同步操作,如释放锁(await)通知其他线程竞争锁(signal、signalAll)。
Lock lock = new ReentrantLock();
Conditiion con = lock.newCondition();
...
lock.lock()
try{
// 阻塞当前线程,让出锁
con.await();
// 唤醒一个线程执行
con.signal();
}finally{
lock.unlock();
}
ReentranReadWriteLock 的使用
// 互斥锁和读写锁测试
public class TestReadWriteLock {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over! value = "+value);
//模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
value = v;
System.out.println("write over! value = " + value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
/*// 使用互斥锁
Runnable readR = ()-> read(lock);
Runnable writeR = ()->write(lock, new Random().nextInt());*/
// 使用读写锁
Runnable readR = ()-> read(readLock);
Runnable writeR = ()->write(writeLock, new Random().nextInt());
for(int i=0; i<2; i++) new Thread(writeR).start();
for(int i=0; i<20; i++) new Thread(readR).start();
for(int i=0; i<2; i++) new Thread(writeR).start();
}
}
// output
read over! value = 0
read over! value = 0
read over! value = 0
read over! value = 0
read over! value = 0
write over! value = 295078983
write over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
write over! value = -1287679733
read over! value = -1287679733
read over! value = -1287679733
read over! value = -1287679733
read over! value = -1287679733
write over! value = -791313481
LockSupport 使用
LockSupport 可以用来阻塞当前线程(park)或者唤醒线程(unpark). 使用示例如下。
public class TestLockSupport {
public static void main(String[] args) {
// 两个线程打印 aaabbbaaa
Thread t1 = new Thread(() -> {
for (int i = 0; i < 6; i++) {
System.out.printf("a");
if (i == 2) {
// 打印三个字母后阻塞
LockSupport.park();
}
}
});
t1.start();
new Thread(()->{
for (int i = 0; i < 3; i++) {
System.out.printf("b");
}
// 释放锁
LockSupport.unpark(t1);
}).start();
}
}
并发容器
ConcurrentHashMap
为什么要使用 ConcurrentHashMap?
- 线程安全。在并发环境下如果使用 HashMap 可能导致程序死循环,HashMap 在进行 put 操作时会时会使 Entry 形成环,Entry 的 next 引用永不为空就会产生死循环获取 Entry。
- 效率更高。HashTable 对 put、get、remove 方法等方法都加了 synchronizaed 锁,意味着同一时间只能由一个线程访问 HashTable 实例的方法,效率很低,而 ConcurrentHashMap 使用分段锁的方式,不同段可由不同的线程访问,提高了并发度提高了访问效率。
队列容器
并发容器队列分为有界对列和无界队列,有无届指限制添加到队列的元素数量,无界就是插入没有限制。
非阻塞队列和阻塞队列,非阻塞队列对插入和获取进行同步,而阻塞队列除了在插入和获取进行同步外,在没有元素时或插入满时会阻塞当前线程。
阻塞队列不可用时的处理方式,对于不同的插入和移除方法有不同的处理。
处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
Java 中的阻塞队列
- ArrayBlockingQueue:基于数组的有界阻塞队列,默认非公平,支持公平方式
- LinkedBlockingQueue:链式无界阻塞队列
- PriorityBlockingQueue:具有优先级的无界阻塞队列, 使用元素的 compareTo 或专用的 Comparator 进行比较,不保证同级元素的公平性
- DelayQueue:使用优先级队列实现的无界阻塞队列,可以使用该队列设计缓存系统、或者进行任务调度,存储的元素需要实现 Delayed 接口
- SynchronousQueue:不存储元素的阻塞队列
- LinkedTransferQueue:链表结构组成的无界阻塞队列,其 transfer 方法可以使正在等待接受元素的 take/poll 方法理解使用其传入的元素
- LinkedBlockingDequeue:链表结构双向阻塞队列
非并发容器同步实现
早期的 Vector 和 HashTable 都是使用 sychronized 进行同步,Concurrent 包下的同步多是 CAS+volatile 实现,那么非并发的容器怎么实现同步?一种方式是使用 Collections 里的包装其,另外一种是自己实现。
// 方式1
List<String> list = new ArrayList<>();
List<String> syncList = Collections.synchronizedList(list);
其他类型容器的同步方式相同。
public class Test {
static List<Integer> list = new LinkedList<>();
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
list.add(i);
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 方式2: 使用list自身为锁,此时list自身互斥访问性能差
// 方式3: 使用ReentrantLock或者声明一个Object为lock都可以实现同步访问
for (int i = 0; i < 100; i++) {
new Thread(()->{
synchronized (list){
for (int j = 0; j < 10; j++) {
Integer remove = list.remove(0);
System.out.println(remove);
}
}
}).start();
}
}
}
并发容器总结
非并发容器在多线程环境下存在线程安全问题,会导致数据脏读、抛异常或者死锁,多线程下切记使用并发容器。
对于 Map/Set
线程安全的环境下(如单线程),常用 HashMap、TreeMap(带排序功能)、LinkedHashMap;在多线程环境下常用 ConcurrentHashMap、ConcurrentSkipListMap,少用或不用 HashTable、synchronizedMap,它们都是使用 synchronized 实现同步,两者本质区别不大。
对于队列/列表
单线程下,多读少删少插入用 ArrayList,多删多插入用 LinkedList。
并发环境下,尽快少用 Vector, synchronizedList,synchronized 实现同步效率低。多读少写可以使用 CopyOnWriteList,另外根据使用队列是否需要设置大小选择有界或无界队列,然后选择阻塞类型。
(完)