深入理解Java多线程(四)锁、锁工具、并发容器

JUC提供了一套锁对象,分别是:Lock接口、AbstractQueuedSynchronizaer(队列同步器)、ReentrantLock、ReentrantReadWriteLock、Condition、LockSupport。

Lock接口

Lock接口定义了锁工具常用的方法,ReentrantLock、ReentrantReadWriteLock都是Lock的具体实现。

image.png

  • lock()用于加锁
  • unlock()释放锁
  • lockInterruptibly()可中断加锁,线程持有锁后可被终端,中断后抛出一个异常以通知线程
  • tryLock(), 尝试加锁,并返回获取锁的结果,后续需要根据返回结果自行实现阻塞。
  • newConditioin()获取Condition对象,Condition对象以在Lock锁的同步块内进行阻塞(await)和通知其他线程执行操作(singal),功能同Ojbect的wait/notify方法。

AbstractQueuedSynchronizaer 同步器

AbstractQueueSynchronizaer(抽象队列同步器)是一个抽象类,内置FIFO队列,是JUC的并发包实现的核心,它的设计者希望它能能成为大部分并发需求实现的基础。很多同步器都是继承AbstractQueueSynchronizaer实现的,重入锁、读写锁中锁都是继承AQS。

ReentrantLock中锁
image.png

ReentrantLock的非公平锁

image.png

ReentrantLock的公平锁

image.png

使用AQS

使用同步器时使用以下三个方法改变状态:

  • getState() 获取当前同步状态
  • setState() 设置当前同步状态
  • compareAndSetState() 使用CAS设置当前状态,该方法能保证以原子操作设置状态

使用同步器应该重写的方法:

方法说明
boolean tryAcquire(int arg)独占方式获取同步状态,CAS更新状态,arg为锁状态,例如加锁状态是1,那么调用参数应该传参1,下同
boolean tryRealease(int arg)独占释放同步,释放后等待获取同步的线程将有机会获取锁🔒
int tryAcquireShared(int arg)共享获取同步状态,返回值大于0表示成功,反之失败
boolean tryReleaseShared(int arg)共享释放同步状态
boolean isHeldExclusively()当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程占用

同步器提供的模板方法:

这些方法可以直接使用,并且这些方法和Lock接口中的方法很像。

方法说明
void acquire(int arg)独占获取同步状态,如果当前线程获取同步状态成功,则有该方法返回,否则进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法
void acquireInterruptibly(int arg)与acquire()相同但该方法会相应中断,当前线程为获取到同步状态就进入同步队列,如果当前线程中断则抛出InterruptedException并返回
voidtryAcquireNanos(int arg, long nanos)在acquireInterruptibly(int arg)方法上增加了超时等待功能,并且有返回值,未超时且获取到同步状态返回true,超时返回false
void acquireShared(int arg)共享式获取同步状态如果当前线程未获取到同步状态,将会进入同步队列等待,与独占获取的主要区别是同一时刻可以由多个线程获取到状态
void acquireSharedInterruptibly(int arg)与acquireShare(int arg)方法相同,不用点是该方法相应中断
boolean acquireSharedNanos(int arg, long nanos)在acquireSharedInterruptibly(int arg)基础上增加了超时等待
boolean release(int arg)独占方式释放同步状态,释放同步状态后,阻塞队列的第一个线程将被唤醒
boolean releaseShared(int arg)共享式释放锁

词汇解释:

  • 超时等待:在执行操作是等待固定的时间,如果超出了这个等待时间就不等待了,然后继续往下执行,通常超时等待方法都有返回值表面请求的结果(如boolean acquireSharedNanos(int arg, long nanos),也有没有返回值的:Object.wait(long timeout)、LockSupport.park(long nanos)
  • “如果获取到锁,从方法中返回”:意思是不在请求方法中阻塞了,该方法执行完毕或return返回值了
  • 独占、共享:独占同一时刻被以可线程访问或占用,共享式多个

使用AQS实现非重入互斥锁

// 非重入互斥锁
// 锁定义:采用同步非阻塞方式(CAS+volatile),状态0代表可获取锁,状态1代表不可获取锁
public class Mutex implements Lock {

    // 继承并覆盖AQS
    private static class Sync extends AbstractQueuedSynchronizer{

        @Override
        protected boolean tryAcquire(int arg) {
            // CAS,期望值为0,更新值为1
            if(compareAndSetState(0,1)){
                // CAS更新成功,设置当前线程为独占
                setExclusiveOwnerThread(Thread.currentThread());
                //返回true表锁当前线程获取到了锁
                return true;
            }
            // false未获取到锁
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            // 非0装才允许释放锁,否则抛异常
            if(getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            // 更新状态,0:锁为可获取状态
            setState(0);
            // 锁释放成功
            return true;
        }

        // 状态为1说明有个线程正在持有锁
        @Override
        protected boolean isHeldExclusively() {
            return getState()==1;
        }

        // 返回条件对象,用与在同步块内线程通信(wait/signal)
        Condition newCondition(){
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    // 所有加锁传入的状态都是1,然后调用Sync的方法
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

测试

public class TestMutex {
    Mutex mutex = new Mutex();

    void m1(){
        try {
            mutex.lock();
            System.out.println("m1......");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            mutex.unlock();
        }
    }

    void m2(){
        try {
            mutex.lock();
            System.out.println("m2......");
            Thread.sleep(1000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            mutex.unlock();
        }

    }
    
    public static void main(String[] args) {
        TestMutex o = new TestMutex();

        // 执行m1
        new Thread(o::m1,"t1").start();
        new Thread(o::m1,"t2").start();
        
        // 执行m2
        new Thread(o::m2,"t3").start();

    }
}

ReentrantLock、ReentrantReadWriteLock

ReentrantLock是可重入锁,实现了公平锁和非公平锁(默认),除了能提供synchronized相同的功能,还提供了尝试加锁,超时加锁,加锁中断等特性。

ReentrantReadWriteLock是对ReentrantLock的再次改进,因为并不所由操作都需要互斥访问的,例如在无写的情况下多线程进行读,这不会使数据变脏,而在写到来时才需要同步,ReentrantReadWriteLock应用而生,读写锁的特点是读是共享的,写是独占的,且读操作都在一个写操作完成后进行,因此在多读的场景下能提高同步性能。

ReentranLock的使用

独占加锁,多个线程公用一个ReentrantLock对象,对同步代码加锁释放锁。

注意:所有的加锁都要手动释放!

使用模板:

Reentrant lock = new ReentrantLock();
...

lock.lock()
try{
    ...
    同步代码
    ...
}finally{
    lock.unlock();
}

使用“尝试获取锁”,使用尝试获取锁得到返回值后仍要需要我们自行控制同步。

...
try{
    while(!tryLock()){
	//未获取到锁空转阻塞
    }
    同步代码...
}finally{
    lock.unlock();
}

使用Condition,Condition可在同步代码中进一步进行同步操作,如释放锁(await)通知其他线程竞争锁(signal、signalAll)。

Lock lock = new ReentrantLock();
Conditiion con = lock.newCondition();
   ...

lock.lock()

try{
    // 阻塞当前线程,让出锁
    con.await();
    // 唤醒一个线程执行
    con.signal(); 
}finally{
    lock.unlock();
}

ReentranReadWriteLock的使用

// 互斥锁和读写锁测试
public class TestReadWriteLock {
    static Lock lock = new ReentrantLock();
    private static int value;

    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static Lock readLock = readWriteLock.readLock();
    static Lock writeLock = readWriteLock.writeLock();

    public static void read(Lock lock) {
        try {
            lock.lock();
            Thread.sleep(1000);
            System.out.println("read over! value = "+value);
            //模拟读取操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void write(Lock lock, int v) {
        try {
            lock.lock();
            Thread.sleep(1000);
            value = v;
            System.out.println("write over!     value = " + value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {

        /*// 使用互斥锁
        Runnable readR = ()-> read(lock);
        Runnable writeR = ()->write(lock, new Random().nextInt());*/

        // 使用读写锁
        Runnable readR = ()-> read(readLock);
        Runnable writeR = ()->write(writeLock, new Random().nextInt());

        for(int i=0; i<2; i++) new Thread(writeR).start();
        for(int i=0; i<20; i++) new Thread(readR).start();
        for(int i=0; i<2; i++) new Thread(writeR).start();
    }
}

// output
read over! value = 0
read over! value = 0
read over! value = 0
read over! value = 0
read over! value = 0
write over!     value = 295078983
write over!     value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
read over! value = 2106533192
write over!     value = -1287679733
read over! value = -1287679733
read over! value = -1287679733
read over! value = -1287679733
read over! value = -1287679733
write over!     value = -791313481

LockSupport使用

LockSupport可以用来阻塞当前线程(park)或者唤醒线程(unpark). 使用示例如下。

public class TestLockSupport {
    public static void main(String[] args) {
        // 两个线程打印 aaabbbaaa

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 6; i++) {

                System.out.printf("a");
                if (i == 2) {
                    // 打印三个字母后阻塞
                    LockSupport.park();
                }
            }
        });
        t1.start();

        new Thread(()->{
            for (int i = 0; i < 3; i++) {
                System.out.printf("b");
            }
            // 释放锁
            LockSupport.unpark(t1);
        }).start();
    }
}

并发容器

ConcurrentHashMap

为什么要使用ConcurrentHashMap?

  1. 线程安全。在并发环境下如果使用HashMap可能导致程序死循环,HashMap在进行put操作时会时会使Entry形成环,Entry的next引用永不为空就会产生死循环获取Entry。
  2. 效率更高。HashTable对put、get、remove方法等方法都加了synchronizaed锁,意味着同一时间只能由一个线程访问HashTable实例的方法,效率很低,而ConcurrentHashMap使用分段锁的方式,不同段可由不同的线程访问,提高了并发度提高了访问效率。

队列容器

并发容器队列分为有界对列和无界队列,有无届指限制添加到队列的元素数量,无界就是插入没有限制。

非阻塞队列和阻塞队列,非阻塞队列对插入和获取进行同步,而阻塞队列除了在插入和获取进行同步外,在没有元素时或插入满时会阻塞当前线程。

阻塞队列不可用时的处理方式,对于不同的插入和移除方法有不同的处理。

处理方式抛出异常返回特殊值一直阻塞超时退出
插入add(e)offer(e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
检查element()peek()不可用不可用

Java中的阻塞队列

  • ArrayBlockingQueue:基于数组的有界阻塞队列,默认非公平,支持公平方式
  • LinkedBlockingQueue:链式无界阻塞队列
  • PriorityBlockingQueue:具有优先级的无界阻塞队列, 使用元素的compareTo或专用的Comparator进行比较,不保证同级元素的公平性
  • DelayQueue:使用优先级队列实现的无界阻塞队列,可以使用该队列设计缓存系统、或者进行任务调度,存储的元素需要实现Delayed接口
  • SynchronousQueue:不存储元素的阻塞队列
  • LinkedTransferQueue:链表结构组成的无界阻塞队列,其transfer方法可以使正在等待接受元素的take/poll方法理解使用其传入的元素
  • LinkedBlockingDequeue:链表结构双向阻塞队列

非并发容器同步实现

早期的Vector和HashTable都是使用sychronized进行同步,Concurrent包下的同步多是CAS+volatile实现,那么非并发的容器怎么实现同步?一种方式是使用Collections里的包装其,另外一种是自己实现。

        // 方式1
        List<String> list = new ArrayList<>();

        List<String> syncList = Collections.synchronizedList(list);

其他类型容器的同步方式相同。
image.png

public class Test {
    static List<Integer> list = new LinkedList<>();
    
    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            list.add(i);
        }

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 方式2: 使用list自身为锁,此时list自身互斥访问性能差
        // 方式3: 使用ReentrantLock或者声明一个Object为lock都可以实现同步访问
        for (int i = 0; i < 100; i++) {
            new Thread(()->{
                synchronized (list){
                    for (int j = 0; j < 10; j++) {
                        Integer remove = list.remove(0);
                        System.out.println(remove);
                    }
                }
            }).start();
        }
    }
}

并发容器总结

非并发容器在多线程环境下存在线程安全问题,会导致数据脏读、抛异常或者死锁,多线程下切记使用并发容器。

对于Map/Set

线程安全的环境下(如单线程),常用HashMap、TreeMap(带排序功能)、LinkedHashMap;在多线程环境下常用ConcurrentHashMap、ConcurrentSkipListMap,少用或不用HashTable、synchronizedMap,它们都是使用synchronized实现同步,两者本质区别不大。

对于队列/列表

单线程下,多读少删少插入用ArrayList,多删多插入用LinkedList。

并发环境下,尽快少用Vector, synchronizedList,synchronized实现同步效率低。多读少写可以使用CopyOnWriteList,另外根据使用队列是否需要设置大小选择有界或无界队列,然后选择阻塞类型。

(完)

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×