深入理解Java多线程(四):锁、锁工具、并发容器

#Java #多线程 [字体 ··]

JUC 提供了一套锁对象,分别是:Lock 接口、AbstractQueuedSynchronizaer(队列同步器)、ReentrantLock、ReentrantReadWriteLock、Condition、LockSupport。

Lock 接口

Lock 接口定义了锁工具常用的方法,ReentrantLock、ReentrantReadWriteLock 都是 Lock 的具体实现。

image.png

  • lock()用于加锁
  • unlock()释放锁
  • lockInterruptibly()可中断加锁,线程持有锁后可被终端,中断后抛出一个异常以通知线程
  • tryLock(), 尝试加锁,并返回获取锁的结果,后续需要根据返回结果自行实现阻塞。
  • newConditioin()获取 Condition 对象,Condition 对象以在 Lock 锁的同步块内进行阻塞(await)和通知其他线程执行操作(singal),功能同 Ojbect 的 wait/notify 方法。

AbstractQueuedSynchronizaer 同步器

AbstractQueueSynchronizaer(抽象队列同步器)是一个抽象类,内置 FIFO 队列,是 JUC 的并发包实现的核心,它的设计者希望它能能成为大部分并发需求实现的基础。很多同步器都是继承 AbstractQueueSynchronizaer 实现的,重入锁、读写锁中锁都是继承 AQS。

ReentrantLock 中锁

image.png

ReentrantLock 的非公平锁

image.png

ReentrantLock 的公平锁

image.png

使用 AQS

使用同步器时使用以下三个方法改变状态:

  • getState() 获取当前同步状态
  • setState() 设置当前同步状态
  • compareAndSetState() 使用 CAS 设置当前状态,该方法能保证以原子操作设置状态

使用同步器应该重写的方法:

方法说明
boolean tryAcquire(int arg)独占方式获取同步状态,CAS 更新状态,arg 为锁状态,例如加锁状态是 1,那么调用参数应该传参 1,下同
boolean tryRealease(int arg)独占释放同步,释放后等待获取同步的线程将有机会获取锁 🔒
int tryAcquireShared(int arg)共享获取同步状态,返回值大于 0 表示成功,反之失败
boolean tryReleaseShared(int arg)共享释放同步状态
boolean isHeldExclusively()当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程占用

同步器提供的模板方法:

这些方法可以直接使用,并且这些方法和 Lock 接口中的方法很像。

方法说明
void acquire(int arg)独占获取同步状态,如果当前线程获取同步状态成功,则有该方法返回,否则进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg)方法
void acquireInterruptibly(int arg)与 acquire()相同但该方法会相应中断,当前线程为获取到同步状态就进入同步队列,如果当前线程中断则抛出 InterruptedException并返回
voidtryAcquireNanos(int arg, long nanos)在 acquireInterruptibly(int arg)方法上增加了超时等待功能,并且有返回值,未超时且获取到同步状态返回 true,超时返回 false
void acquireShared(int arg)共享式获取同步状态如果当前线程未获取到同步状态,将会进入同步队列等待,与独占获取的主要区别是同一时刻可以由多个线程获取到状态
void acquireSharedInterruptibly(int arg)与 acquireShare(int arg)方法相同,不用点是该方法相应中断
boolean acquireSharedNanos(int arg, long nanos)在 acquireSharedInterruptibly(int arg)基础上增加了超时等待
boolean release(int arg)独占方式释放同步状态,释放同步状态后,阻塞队列的第一个线程将被唤醒
boolean releaseShared(int arg)共享式释放锁

词汇解释:

  • 超时等待:在执行操作是等待固定的时间,如果超出了这个等待时间就不等待了,然后继续往下执行,通常超时等待方法都有返回值表面请求的结果(如boolean acquireSharedNanos(int arg, long nanos),也有没有返回值的:Object.wait(long timeout)、LockSupport.park(long nanos)
  • “如果获取到锁,从方法中返回”:意思是不在请求方法中阻塞了,该方法执行完毕或 return 返回值了
  • 独占、共享:独占同一时刻被以可线程访问或占用,共享式多个

使用 AQS 实现非重入互斥锁

 1// 非重入互斥锁
 2// 锁定义:采用同步非阻塞方式(CAS+volatile),状态0代表可获取锁,状态1代表不可获取锁
 3public class Mutex implements Lock {
 4
 5    // 继承并覆盖AQS
 6    private static class Sync extends AbstractQueuedSynchronizer{
 7
 8        @Override
 9        protected boolean tryAcquire(int arg) {
10            // CAS,期望值为0,更新值为1
11            if(compareAndSetState(0,1)){
12                // CAS更新成功,设置当前线程为独占
13                setExclusiveOwnerThread(Thread.currentThread());
14                //返回true表锁当前线程获取到了锁
15                return true;
16            }
17            // false未获取到锁
18            return false;
19        }
20
21        @Override
22        protected boolean tryRelease(int arg) {
23            // 非0装才允许释放锁,否则抛异常
24            if(getState() == 0) throw new IllegalMonitorStateException();
25            setExclusiveOwnerThread(null);
26            // 更新状态,0:锁为可获取状态
27            setState(0);
28            // 锁释放成功
29            return true;
30        }
31
32        // 状态为1说明有个线程正在持有锁
33        @Override
34        protected boolean isHeldExclusively() {
35            return getState()==1;
36        }
37
38        // 返回条件对象,用与在同步块内线程通信(wait/signal)
39        Condition newCondition(){
40            return new ConditionObject();
41        }
42    }
43
44    private final Sync sync = new Sync();
45
46    // 所有加锁传入的状态都是1,然后调用Sync的方法
47    @Override
48    public void lock() {
49        sync.acquire(1);
50    }
51
52    @Override
53    public void lockInterruptibly() throws InterruptedException {
54        sync.acquireInterruptibly(1);
55    }
56
57    @Override
58    public boolean tryLock() {
59        return sync.tryAcquire(1);
60    }
61
62    @Override
63    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
64        return sync.tryAcquireNanos(1,unit.toNanos(time));
65    }
66
67    @Override
68    public void unlock() {
69        sync.release(1);
70    }
71
72    @Override
73    public Condition newCondition() {
74        return sync.newCondition();
75    }
76}

测试

 1public class TestMutex {
 2    Mutex mutex = new Mutex();
 3
 4    void m1(){
 5        try {
 6            mutex.lock();
 7            System.out.println("m1......");
 8            Thread.sleep(2000);
 9        } catch (InterruptedException e) {
10            e.printStackTrace();
11        }finally {
12            mutex.unlock();
13        }
14    }
15
16    void m2(){
17        try {
18            mutex.lock();
19            System.out.println("m2......");
20            Thread.sleep(1000);
21        }catch (InterruptedException e){
22            e.printStackTrace();
23        }finally {
24            mutex.unlock();
25        }
26
27    }
28
29    public static void main(String[] args) {
30        TestMutex o = new TestMutex();
31
32        // 执行m1
33        new Thread(o::m1,"t1").start();
34        new Thread(o::m1,"t2").start();
35
36        // 执行m2
37        new Thread(o::m2,"t3").start();
38
39    }
40}

ReentrantLock、ReentrantReadWriteLock

ReentrantLock 是可重入锁,实现了公平锁和非公平锁(默认),除了能提供 synchronized 相同的功能,还提供了尝试加锁,超时加锁,加锁中断等特性。

ReentrantReadWriteLock 是对 ReentrantLock 的再次改进,因为并不所由操作都需要互斥访问的,例如在无写的情况下多线程进行读,这不会使数据变脏,而在写到来时才需要同步,ReentrantReadWriteLock 应用而生,读写锁的特点是读是共享的,写是独占的,且读操作都在一个写操作完成后进行,因此在多读的场景下能提高同步性能。

ReentranLock 的使用

独占加锁,多个线程公用一个 ReentrantLock 对象,对同步代码加锁释放锁。

注意:所有的加锁都要手动释放!

使用模板:

 1Reentrant lock = new ReentrantLock();
 2...
 3
 4lock.lock()
 5try{
 6    ...
 7    同步代码
 8    ...
 9}finally{
10    lock.unlock();
11}

使用“尝试获取锁”,使用尝试获取锁得到返回值后仍要需要我们自行控制同步。

1...
2try{
3    while(!tryLock()){
4	//未获取到锁空转阻塞
5    }
6    同步代码...
7}finally{
8    lock.unlock();
9}

使用 Condition,Condition 可在同步代码中进一步进行同步操作,如释放锁(await)通知其他线程竞争锁(signal、signalAll)。

 1Lock lock = new ReentrantLock();
 2Conditiion con = lock.newCondition();
 3   ...
 4
 5lock.lock()
 6
 7try{
 8    // 阻塞当前线程,让出锁
 9    con.await();
10    // 唤醒一个线程执行
11    con.signal();
12}finally{
13    lock.unlock();
14}

ReentranReadWriteLock 的使用

 1// 互斥锁和读写锁测试
 2public class TestReadWriteLock {
 3    static Lock lock = new ReentrantLock();
 4    private static int value;
 5
 6    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 7    static Lock readLock = readWriteLock.readLock();
 8    static Lock writeLock = readWriteLock.writeLock();
 9
10    public static void read(Lock lock) {
11        try {
12            lock.lock();
13            Thread.sleep(1000);
14            System.out.println("read over! value = "+value);
15            //模拟读取操作
16        } catch (InterruptedException e) {
17            e.printStackTrace();
18        } finally {
19            lock.unlock();
20        }
21    }
22
23    public static void write(Lock lock, int v) {
24        try {
25            lock.lock();
26            Thread.sleep(1000);
27            value = v;
28            System.out.println("write over!     value = " + value);
29        } catch (InterruptedException e) {
30            e.printStackTrace();
31        } finally {
32            lock.unlock();
33        }
34    }
35
36    public static void main(String[] args) {
37
38        /*// 使用互斥锁
39        Runnable readR = ()-> read(lock);
40        Runnable writeR = ()->write(lock, new Random().nextInt());*/
41
42        // 使用读写锁
43        Runnable readR = ()-> read(readLock);
44        Runnable writeR = ()->write(writeLock, new Random().nextInt());
45
46        for(int i=0; i<2; i++) new Thread(writeR).start();
47        for(int i=0; i<20; i++) new Thread(readR).start();
48        for(int i=0; i<2; i++) new Thread(writeR).start();
49    }
50}
51
52// output
53read over! value = 0
54read over! value = 0
55read over! value = 0
56read over! value = 0
57read over! value = 0
58write over!     value = 295078983
59write over!     value = 2106533192
60read over! value = 2106533192
61read over! value = 2106533192
62read over! value = 2106533192
63read over! value = 2106533192
64read over! value = 2106533192
65read over! value = 2106533192
66read over! value = 2106533192
67read over! value = 2106533192
68read over! value = 2106533192
69write over!     value = -1287679733
70read over! value = -1287679733
71read over! value = -1287679733
72read over! value = -1287679733
73read over! value = -1287679733
74write over!     value = -791313481

LockSupport 使用

LockSupport 可以用来阻塞当前线程(park)或者唤醒线程(unpark). 使用示例如下。

 1public class TestLockSupport {
 2    public static void main(String[] args) {
 3        // 两个线程打印 aaabbbaaa
 4
 5        Thread t1 = new Thread(() -> {
 6            for (int i = 0; i < 6; i++) {
 7
 8                System.out.printf("a");
 9                if (i == 2) {
10                    // 打印三个字母后阻塞
11                    LockSupport.park();
12                }
13            }
14        });
15        t1.start();
16
17        new Thread(()->{
18            for (int i = 0; i < 3; i++) {
19                System.out.printf("b");
20            }
21            // 释放锁
22            LockSupport.unpark(t1);
23        }).start();
24    }
25}

并发容器

ConcurrentHashMap

为什么要使用 ConcurrentHashMap?

  1. 线程安全。在并发环境下如果使用 HashMap 可能导致程序死循环,HashMap 在进行 put 操作时会时会使 Entry 形成环,Entry 的 next 引用永不为空就会产生死循环获取 Entry。
  2. 效率更高。HashTable 对 put、get、remove 方法等方法都加了 synchronizaed 锁,意味着同一时间只能由一个线程访问 HashTable 实例的方法,效率很低,而 ConcurrentHashMap 使用分段锁的方式,不同段可由不同的线程访问,提高了并发度提高了访问效率。

队列容器

并发容器队列分为有界对列和无界队列,有无届指限制添加到队列的元素数量,无界就是插入没有限制。

非阻塞队列和阻塞队列,非阻塞队列对插入和获取进行同步,而阻塞队列除了在插入和获取进行同步外,在没有元素时或插入满时会阻塞当前线程。

阻塞队列不可用时的处理方式,对于不同的插入和移除方法有不同的处理。

处理方式抛出异常返回特殊值一直阻塞超时退出
插入add(e)offer(e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
检查element()peek()不可用不可用

Java 中的阻塞队列

  • ArrayBlockingQueue:基于数组的有界阻塞队列,默认非公平,支持公平方式
  • LinkedBlockingQueue:链式无界阻塞队列
  • PriorityBlockingQueue:具有优先级的无界阻塞队列, 使用元素的 compareTo 或专用的 Comparator 进行比较,不保证同级元素的公平性
  • DelayQueue:使用优先级队列实现的无界阻塞队列,可以使用该队列设计缓存系统、或者进行任务调度,存储的元素需要实现 Delayed 接口
  • SynchronousQueue:不存储元素的阻塞队列
  • LinkedTransferQueue:链表结构组成的无界阻塞队列,其 transfer 方法可以使正在等待接受元素的 take/poll 方法理解使用其传入的元素
  • LinkedBlockingDequeue:链表结构双向阻塞队列

非并发容器同步实现

早期的 Vector 和 HashTable 都是使用 sychronized 进行同步,Concurrent 包下的同步多是 CAS+volatile 实现,那么非并发的容器怎么实现同步?一种方式是使用 Collections 里的包装其,另外一种是自己实现。

1        // 方式1
2        List<String> list = new ArrayList<>();
3
4        List<String> syncList = Collections.synchronizedList(list);

其他类型容器的同步方式相同。

image.png

 1public class Test {
 2    static List<Integer> list = new LinkedList<>();
 3
 4    public static void main(String[] args) {
 5        for (int i = 0; i < 1000; i++) {
 6            list.add(i);
 7        }
 8
 9        try {
10            TimeUnit.SECONDS.sleep(1);
11        } catch (InterruptedException e) {
12            e.printStackTrace();
13        }
14
15        // 方式2: 使用list自身为锁,此时list自身互斥访问性能差
16        // 方式3: 使用ReentrantLock或者声明一个Object为lock都可以实现同步访问
17        for (int i = 0; i < 100; i++) {
18            new Thread(()->{
19                synchronized (list){
20                    for (int j = 0; j < 10; j++) {
21                        Integer remove = list.remove(0);
22                        System.out.println(remove);
23                    }
24                }
25            }).start();
26        }
27    }
28}

并发容器总结

非并发容器在多线程环境下存在线程安全问题,会导致数据脏读、抛异常或者死锁,多线程下切记使用并发容器。

对于 Map/Set

线程安全的环境下(如单线程),常用 HashMap、TreeMap(带排序功能)、LinkedHashMap;在多线程环境下常用 ConcurrentHashMap、ConcurrentSkipListMap,少用或不用 HashTable、synchronizedMap,它们都是使用 synchronized 实现同步,两者本质区别不大。

对于队列/列表

单线程下,多读少删少插入用 ArrayList,多删多插入用 LinkedList。

并发环境下,尽快少用 Vector, synchronizedList,synchronized 实现同步效率低。多读少写可以使用 CopyOnWriteList,另外根据使用队列是否需要设置大小选择有界或无界队列,然后选择阻塞类型。

(完)


博客没有评论系统,可以通过 邮件 评论和交流。 Top↑