深入理解Java多线程(三):JUC基础篇

这篇文章主要侧重讲 JUC 的多数类的使用,文章里贴了很多练习的代码,可以通过代码更加深刻的了解这些类的功能。

这篇文章主要总结了 volatile、原子类、ReentrantLock、CountDownLatch、CyclicBarrier、Phaser、Semphore、Exchanger 的使用,然后进行了一些对比。

volatile

volatile 使用

/*
 * 演示volatile的作用
 * 若running不使用volatile修饰则在主线程(main)修改running=false线程不会停下,而是继续运行
 *
 * 结论:通过volatile修饰running可以让线程使用running时获得最新值
 *
 * volatile的作用:
 * 1)使共享变量在线程间可见,通过JMM内存协议,CPU缓存一致性协议使缓存失效,直接读取内存中的数据
 * 2)阻止指令重排序
 */
public class T_VolatileTest {

    static class Task implements Runnable{
        /*volatile*/ boolean running  = true;
        @Override
        public void run() {
            System.out.println("thread start");
            while (running){

            }
            System.out.println("thread end");
        }
    }

    public static void main(String[] args) {
        Task t = new Task();
        new Thread(t).start();

        try {
            Thread.sleep(1000); //1s
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t.running = false;
        System.out.println("set running=false");
    }
}

volatile 并不是锁

/*
 * volatile并不能代替volatile,多并发环境下的m方法如果不同步执行,结果是错的。
 * 在对m方法同步后,结果输出正确。
 *
 * 结论:
 * 1)volatile不能够做锁
 * 2)它仅具有使多线程间共享变量可见,但不具有同步的功能
 */
public class T_VolatileNotSync {
	volatile int count = 0;
	/*synchronized*/ void m() {
		for(int i=0; i<1000; i++) count++;
	}

	public static void main(String[] args) {
		T_VolatileNotSync t = new T_VolatileNotSync();

		List<Thread> threads = new ArrayList<Thread>();

		for(int i=0; i<10; i++) {
			threads.add(new Thread(t::m, "thread-"+i));
		}

		// 启动线程
        for (Thread th : threads) {
            th.start();

        }

        // 阻塞到所有线程执行完毕
		threads.forEach((o)->{
			try {
				o.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});

        // 输出结果
		System.out.println(t.count);
	}
}

volatile 只能保证引用本身可见,内部数据不能保证

/*
 * 在引用上t加volatile,只能保证引用(类、数组实例)本身可见,不能保证内部可见
 */
public class T_VolatileReference {
    static volatile Task t = new Task();

    static class Task implements Runnable{
        boolean running = true;

        @Override
        public void run() {
            System.out.println("run start");
            while(running) {
            }
            System.out.println("run end!");
        }
    }

    public static void main(String[] args) {
        Thread thread = new Thread(t);

        thread.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t.running = false;
    }
}

原子操作类

原子操作类使java.util.atomic包下的类,这些类为操作提供了原子性,使用时不用进行额外的同步,这些操作仅限于对原子类对象进行操作,如果多个原子类多对象进行操作还是需要额外的同步。

AutomicInteger 使用

在 Counter 类中 AutomicInteger 并没有使用 volatile 修饰,并且 run 中的 integer.incrementAndGet()没有进行同步,最终这些操作并没有在多线程环境下出错。

// Atomic 操作是原子操作
public class TestAtomic {

    static class Counter implements Runnable{
        AtomicInteger integer = new AtomicInteger(0);

        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                integer.incrementAndGet();
            }
        }
    }

    public static void main(String[] args) {
        List<Thread> threadList = new ArrayList<>(10);
        Counter c = new Counter();

        // 10个线程对Counter进行类型,每个线程执行run会累加10000此,最终结果为10 0000
        for (int i = 0; i < 10; i++) {
            threadList.add(new Thread(c));
        }


        for (Thread thread : threadList) {
            thread.start();
        }

        // 阻塞到所有线程执行完毕
        for (Thread thread : threadList) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // result
        System.out.println(c.integer.get());
    }
}

使用原子类实现 CAS 操作。

// 使用原子类的CAS
// 使两个线程协作输出两个字符串 str1=123456 str2=ABCDEF, 要求结果输出为:1A2B3C4D5E6F
public class TestAtomic2 {

    public static void main(String[] args) {
        String str1 = "123456";
        String str2 = "ABCDEF";

        // cas 锁
        AtomicInteger mutex = new AtomicInteger(0);

        // CAS操作,threa1在0输出,thread2在1时输出

        // thread 1
        new Thread(() -> {
            for (int i = 0; i < str1.length(); i++) {
                // 与期望的值不符进行空循环
		while (mutex.get()!=0) {

                }
                System.out.print(str1.charAt(i));
                mutex.set(1);

            }

        }).start();

        // thread 2
        new Thread(() -> {

            for (int i = 0; i < str2.length(); i++) {
                // 与期望的值不符进行空循环
                while (mutex.get()!=1) {

                }
                System.out.print(str2.charAt(i));
                mutex.set(0);
            }
        }).start();
    }
}
// 1A2B3C4D5E6F

使用 volatile 变量实现 CAS,与上边的等价。

// 使用原子类的CAS
// 使两个线程协作输出两个字符串 str1=123456 str2=ABCDEF, 要求结果输出为:1A2B3C4D5E6F
public class TestAtomic2 {

    static volatile int mutex = 0;

    public static void main(String[] args) {
        String str1 = "123456";
        String str2 = "ABCDEF";

        // CAS操作,threa1在0输出,thread2在1时输出

        // thread 1
        new Thread(() -> {
            for (int i = 0; i < str1.length(); i++) {
                while (mutex != 0) {

                }
                System.out.print(str1.charAt(i));
                mutex = 1;

            }

        }).start();

        // thread 2
        new Thread(() -> {
            for (int i = 0; i < str2.length(); i++) {
                while (mutex != 1) {

                }
                System.out.print(str2.charAt(i));
                mutex = 0;
            }
        }).start();
    }
}

LongAdder

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;

// 使用LongAdder100个线程累加,这些操作都是原子操作,最终结果正确
public class TestLongAdder {
    public static void main(String[] args) {
        LongAdder adder = new LongAdder();

        List<Thread> threadList = new ArrayList<>(100);

        for (int i = 0; i < 100; i++) {
            threadList.add(new Thread(()->{
                for (int j = 0; j < 100; j++) {
                    adder.increment();
                }
            }));
        }

        for (Thread thread : threadList) {
            thread.start();
        }

        for (Thread thread : threadList) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(adder.longValue());
    }
}

AtomicInteger 和 LongAdder 性能对比

/*
 * 测试三中实现自增的效率,分别时 sync+volatile、AtomicInteger、LongAdder
 *          sync      AtomicInteger       LongAdder
 * 线程数
 * 10       50          26                  30
 * 100      355         208                 55
 * 500      896         813                 127
 * 900      1286        1783                440
 * 1200     1245        1977                1976
 *
 * 结论:测试结果可以看出,synchronized和AtomicInteger对比当线程数量少时AtomicInteger自旋锁比较快
 *      当线程较多时,自旋锁性能下降而synchronized性能上升。
 *      LongAdder对CAS进行了优化,线程数量即使较多时也能保持良好的性能,但测试参数波动较大
 */
public class LongAdderVsAtomicInteger {
    static int counterA = 0;

    public static void main(String[] args) throws InterruptedException {
        int num = 1200;
        List<Thread> threadList = new ArrayList<>(num);

        Object lockA = new Object();
        for (int i = 0; i < num; i++) {
            threadList.add(new Thread(() -> {
                for (int i1 = 0; i1 < 10_0000; i1++) {
                    synchronized (lockA) {
                        counterA++;
                    }
                }
            }));
        }

        long start = System.currentTimeMillis();
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("synchronized    " + counterA + "    " + (System.currentTimeMillis() - start));


        Thread.sleep(1000);


        // 使用AtomicInteger -------------------------------------------------------------
        AtomicInteger counterB = new AtomicInteger(0);
        threadList.clear();
        for (int i = 0; i < num; i++) {
            threadList.add(new Thread(() -> {
                for (int i1 = 0; i1 < 10_0000; i1++) {
                    counterB.incrementAndGet();
                }
            }));
        }
        start = System.currentTimeMillis();
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("AtomicInteger    " + counterB.longValue() + "    " + (System.currentTimeMillis() - start));


        Thread.sleep(1000);


        //测试使用LongAdder -------------------------------------------------------------
        LongAdder counterC = new LongAdder();
        threadList.clear();
        for (int i = 0; i < num; i++) {
            threadList.add(new Thread(() -> {
                for (int i1 = 0; i1 < 10_0000; i1++) {
                    counterC.increment();
                }
            }));
        }
        start = System.currentTimeMillis();
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("LongAdder    " + counterC.longValue() + "    " + (System.currentTimeMillis() - start));
    }
}

Reentrant

Reentrant 和 synchronized 功能相似,且都是可重入锁。

常用 api 有:

方法名 作用
lock() 加锁, 未获取到锁阻塞
tryLock() 尝试非阻塞获取锁, 并返回获取锁的情况, 无论是否获得锁都将继续执行, 当返回 false 最后又 unlock()将抛出IllegalMonitorStateException异常
tryLock(times) 超时获取锁, 超过预设的等待时间不再等待继续执行, 在未获取到锁下执行 unlock()将抛出IllegalMonitorStateException异常
lockInterruptibly() 可中断获取锁, 和 lock 方法的不同在于该方法响应中断, 当前线程获取到锁后可以被中断
unlock() 释放锁
newCondition() 创建锁条件对象, 用于执行监视器的方法阻塞当前线程或发出信号使其他线程获取锁

Reentrant 特性

  • 是可重入锁
  • 必须手动释放锁, 不能依靠异常释放锁
  • 公平锁和非公平锁, 默认非公平锁
  • 可以尝试锁定"try…"

Reentrant 使用

使用 Reentrant 是都要手动释放锁! 可以按照以下模板写.

try{
     	lock.lock();
	// 操作
}finally{
	lock.unlock();
}

使用示例:

public class ReentrantLock2 {
	Lock lock = new ReentrantLock();

	void m1() {
		try {
			lock.lock(); //synchronized(this)
			for (int i = 0; i < 10; i++) {
				TimeUnit.SECONDS.sleep(1);

				System.out.println(i);
				//if(i==2) m2();// 这里可以证明ReetrantLock是可重入锁
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	void m2() {
		try {
			lock.lock();
			System.out.println("m2 ...");
		} finally {
			lock.unlock();
		}
	}

	public static void main(String[] args) {
		ReentrantLock2 rl = new ReentrantLock2();
		new Thread(rl::m1).start();
		try {
			TimeUnit.SECONDS.sleep(1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		new Thread(rl::m2).start();
	}
}

Reentrant 中使用 Condition

Condition 的作用是在线程持有 ReentrantLock 后对锁进一步控制,它的功能就像 Thread 的 wait/notify 功能,在线程持有锁后可以进行阻塞或通知其他线程竞争锁。

// 演示Condition使用
public class ReentrantLockWithCoundition {
    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    void m1() {
        try {
            lock.lock();
            System.out.println("m1 start");
            condition.await();
            System.out.println("m1 end");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    void m2() {
        try {
            lock.lock();
            System.out.println("m2 start");
            condition.signal();
            System.out.println("m2 end");
        } finally {
            lock.unlock();
        }

    }

    public static void main(String[] args) {
        ReentrantLockWithCoundition o = new ReentrantLockWithCoundition();
        new Thread(o::m1,"t1").start();
        new Thread(o::m2,"t2").start();
        // t1线程先启动,在执行m1方法时阻塞,线程t2拿到锁执行后通知t1继续执行
        // 因此Condition的作用是更加细粒度的控制锁,它的作用就像Thead的wait/notify功能,在一个线程获取锁后可以进一步操作
    }
}

Reentrant 中使用计数门闩 CountDownLatch

CountDownLatch 能实现阻塞多个线程的 join 方法的功能,在使用为 CountDownLatch 初始一个值(要阻塞的线程数),当线程执行完后值就减 1,当值为 0 时,CountDownLatch#await() 方法就不再阻塞,这跟把所有线程都执行 join 方法的功能时一样的。

// 两个方法完成的功能时一样的,都是等到所有的线程执行完,最后执行输出语句
public class TestCountDownLatch {
    public static void main(String[] args) {
        usingJoin();
        usingCountDownLatch();
    }

    private static void usingCountDownLatch() {
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);

        for(int i=0; i<threads.length; i++) {
            threads[i] = new Thread(()->{
                int result = 0;
                for(int j=0; j<10000; j++) result += j;
                latch.countDown();
            });
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("latch.getCount() = " + latch.getCount());
        System.out.println("end latch");
    }

    private static void usingJoin() {
        Thread[] threads = new Thread[100];

        for(int i=0; i<threads.length; i++) {
            threads[i] = new Thread(()->{
                int result = 0;
                for(int j=0; j<10000; j++) {
                    result += j;
                    //if(result%100==0) System.out.println(result);
                }
            });
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }

        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("end join");
    }
}

CyclicBarrier

CyclicBarrier 设置一个数量,待阻塞线程到达这个数量时可以触发回调函数。

CyclicBarrier 只有阻塞线程计数功能没有提共同步功能,因此不能做锁。

public class TestCyclicBarrier {
    public static void main(String[] args) {
        // 99个线程,每20个输出一次,共输出4次, 最后一次不足20不输出
        CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("阻塞线程以达20!!请根据情况进行处理"));

        for(int i=0; i<99; i++) {
                new Thread(()->{
                    try {
                        // 记录线程阻塞
                        barrier.await();
                        //System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }, "t-" + i).start();
        }
    }
}

Phaser

Phaser 用来控制完成阶段性的任务。

使用示例如下:

v1 版本,从示例中可以看出 Phaser 通过arriveAndAwaitAdvance来控制走下一个阶段,继承 Phaser 的类重写onAdvance来监控到达了那个阶段。

/*
 * 完成一个流程控制: 所有人都到达后发布一个通知,所有人都吃完宴席后发个通知,所有人都离开后发个通知
 */
public class TestPhaser {

    static MarryPhaser phase = new MarryPhaser();

    public static void main(String[] args) {
        phase.bulkRegister(5);
        for (int i = 0; i < 5; i++) {
            final int temp = i;
            new Thread(()->{
                Person person = new Person("P" + temp);
                person.arrive();
                phase.arriveAndAwaitAdvance();

                person.eat();
                phase.arriveAndAwaitAdvance();

                person.leave();
                phase.arriveAndAwaitAdvance();
            }).start();
        }
    }

    static class MarryPhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch(phase){
                case 0:
                    System.out.println("大家都到达了!!!");
                    return false;
                case 1:
                    System.out.println("大家都吃完饭了");
                    return false;
                case 2:
                    System.out.println("大家都离开了");
                    System.out.println("婚礼结束了");
                    return true;
                default:
                    return true;
            }
        }
    }

    static class Person {
        String name;

        public Person(String name) {
            this.name = name;
        }

        void arrive(){
            System.out.printf("%s  到达了\n", name);
            sleepUtil(1);
        }

        void eat(){
            System.out.printf("%s 开始吃宴\n", name);
            sleepUtil(1);
        }

        void leave(){
            System.out.printf("%s 离开了\n", name);
            sleepUtil(1);
        }

    }

    static void sleepUtil(int second){
        try {
            TimeUnit.SECONDS.sleep(second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
// output
P1  到达了
P4  到达了
P3  到达了
P0  到达了
P2  到达了
大家都到达了!!!
P2 开始吃宴
P3 开始吃宴
P0 开始吃宴
P4 开始吃宴
P1 开始吃宴
大家都吃完饭了
P0 离开了
P3 离开了
P4 离开了
P2 离开了
P1 离开了
大家都离开了
婚礼结束了

v2 版本。

/*
 * 完成一个流程控制: 所有人都到达后发布一个通知,所有人都吃完宴席后发个通知,所有人都离开后发个通知
 */
public class TestPhaser {

    static MarryPhaser phase = new MarryPhaser();

    public static void main(String[] args) {
        phase.bulkRegister(5);
        for (int i = 0; i < 5; i++) {
            final int temp = i;
            new Thread(new Person("P" + temp)).start();
        }
    }

    static class MarryPhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch(phase){
                case 0:
                    System.out.println("大家都到达了!!!");
                    return false;
                case 1:
                    System.out.println("大家都吃完饭了");
                    return false;
                case 2:
                    System.out.println("大家都离开了");
                    System.out.println("婚礼结束了");
                    return true;
                default:
                    return true;
            }
        }
    }

    static class Person implements Runnable{
        String name;

        public Person(String name) {
            this.name = name;

        }

        void arrive(){
            System.out.printf("%s  到达了\n", name);
            phase.arriveAndAwaitAdvance();
            sleepUtil(1);
        }

        void eat(){
            System.out.printf("%s 开始吃宴\n", name);
            phase.arriveAndAwaitAdvance();
            sleepUtil(1);
        }

        void leave(){
            System.out.printf("%s 离开了\n", name);
            phase.arriveAndAwaitAdvance();
            sleepUtil(1);
        }

        @Override
        public void run() {
            arrive();

            eat();

            leave();
        }
    }

    static void sleepUtil(int second){
        try {
            TimeUnit.SECONDS.sleep(second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Semaphore

记录型信号量,用做线程并发流量控制,运行同时运行 xx 个线程,剩下的阻塞。

使用示例:

public class TestSemaphore {
    public static void main(String[] args) {
        // 允许两个线程同时执行,默认非公平锁
        //Semaphore s = new Semaphore(2);

        // 允许两个线程同时执行,非公平锁
        Semaphore s = new Semaphore(1, true);

        //只允许一个线程同时执行
        //Semaphore s = new Semaphore(1);

        new Thread(()->{
            try {
                s.acquire();

                System.out.println("T1 running...");
                Thread.sleep(200);
                System.out.println("T1 running...");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }).start();

        new Thread(()->{
            try {
                s.acquire();

                System.out.println("T2 running...");
                Thread.sleep(200);
                System.out.println("T2 running...");

                s.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Exchanger

用于两个线程间交换数据。它提供一个同步点,使用 exchange()方法交换数据,当第一个线程执行了 exchange()后,Exchanger 对象会等待第二个线程执行 exchange()方法,第二个方法执行后两者交换数据。

使用示例:

// 交换两个线程中的数据
public class T12_TestExchanger {

    static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
        new Thread(()->{
            String s = "这是t1中的数据~";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t1").start();

        new Thread(()->{
            String s = "这是t2中的数据!";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t2").start();
    }
}
// output
t2 这是t1中的数据~
t1 这是t2中的数据!

(本文完)