深入理解Java多线程(一)线程基础

深入理解Java多线程(一)线程基础

Java 线程的实现

Java 线程在 JDK1.2之前,是基于称为“绿色线程”的用户线程实现的,而在 JDK 1.2 中,线程模型替换为基于操作系统原生线程模型来实现,因此,在目前的 JDK 版本中,操作系统支持怎样的线程模型,在很大程度上决定了 Java 虚拟机的线程是怎样映射的,这点在不同平台上没有办法达成一致,虚拟机规范中也并未限定 Java 线程需要使用哪种线程模型来实现。

举个例子,对于 Sun JDK 来说,它的 Windows 版与 Linux 版都是使用一对一的线程模型实现的,一条 Java 线程就是映射到一条轻量级进程之中,因为 Windows 和 Linux 系统提供的线程模型就是一对一的。

总结:java线程与操作系统线程是一对一的,在linux上通过调用pthread库创建线程。

参考:

  1. Java线程的底层实现
  2. java线程理解以及openjdk中的实现

线程的创建

  1. 实现Runnable接口
  2. 继承Thread类
  3. 实现Callable
  4. 线程池
    public static void main(String[] args) {
        // 实现Runnable接口
        // new 一个接口并实现,相当于new一个实现Runnable接口的类对象放进去
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    System.out.println("hello world");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        t.start();
    }
    static class MyThread extends Thread {
        @Override
        public void run() {
            while (true) {
                System.out.println("hello world");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        // 继承Thread类并覆盖run方法
        MyThread t = new MyThread();
        t.start();
    }
    static class CallableImpl implements Callable {
        @Override
        public Object call() throws Exception {
            for (int i = 0; i < 10; i++) {
                System.out.println("hello world - " + (i + 1));
            }
            return "call 执行完毕.";
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 通过Callable创建
        FutureTask ft = new FutureTask(new CallableImpl());
        Thread t = new Thread(ft, "callable 线程");
        t.start();
        String str = (String)ft.get();
        System.out.println(str);
    }
    // 通过线程池创建线程
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1);
        // ()->{} 可以是一个实现Runnable或Callable对象,此处使用lambda
        es.submit(()->{
            while (true){
                System.out.println("hello world");
                Thread.sleep(1000);
            }
        });
    }

线程状态

线程状态切换图

线程状态切换图,#后为调用类的方法

状态标识:

状态说明
NEW刚创建, 还没调用start未启动
RUNNABLE运行(JVM中)或者处在等待操作系统分配资源
BLOCKED阻塞状态,在等待synchronized锁时处在该状态,Object#wait()也处在该状态
WAITING等待,调用sleep方法,join时处理器无时间执行,LockSupport#park方法会使线程
TIMED_WAITING计时等待,执行方法:Thread#sleep(times)、Object#wait(times)、Thread#join()没有执行时间、LockSupport#parkNanos(times)、ockSupport#parkUntil(times)
TERMINATED终止,线程被终止或自然结束

线程状态获取示例

    static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println(this.getState());

            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i);
            }
        }
    }

    public static void main(String[] args) {
        Thread t = new MyThread();

        System.out.println(t.getState());

        t.start();
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(t.getState());
    }

线程的优先级

每一个线程都有优先级,优先级的范围是1~10,默认为5。
线程的优先级在调度过程可能会起到作用,在一些操作系统上甚至会忽略线程的优先级。

    /**
     * The minimum priority that a thread can have.
     */
    public final static int MIN_PRIORITY = 1;

   /**
     * The default priority that is assigned to a thread.
     */
    public final static int NORM_PRIORITY = 5;

    /**
     * The maximum priority that a thread can have.
     */
    public final static int MAX_PRIORITY = 10;

守护线程

守护线程是为其他线程服务的线程。

设置方法

setPriority(int);

原子性 & 同步

原子意为“不能够被进一步分割的最小粒子”,原子操作做不可被中断的一个或一组操作。

简单理解就是一个操作不可分割,如果执行就一次性执行完,中间不能有其他人操作,否则就会有问题。在java中++运算符不是原子性的,因为它可以被分为三个操作,读取变量,为遍历赋值,写入变量,多线程环境下同时访问这个变量就会出现错误数据,在不加锁的情况下,一个变量++操作就是非原子性的。

同步的关键是保证操作的原子性,以及线程之间操作数据的可见性,进而保证了线程安全。

synchronized

synchronized的英意是:已同步、同步的。

通常把synchronized称为一把“锁”,这是一把重锁,为什么称为重锁?

因为synchronized保证互斥访问时是在操作系统“内核态”使用操作系统进行加锁的,锁的释放和线程的来回切换开销比较大。

synchronized是可重入锁,可重入的意思是当一个线程持有了一个代码块的锁,如果在这个代码块中在调用了另一个同步方法,这个线程仍然可以这个方法(这两个代码块使用同一个对象进行加锁,如this)。可重入使用计数方式,如果再次持有某个对象的锁就让计数值加一,待退出方法后值减一,退出最外层的方法锁就释放了。

public class T_ReentrantSynchnorized {
    // lock by this
    synchronized static void innerMethod() {
        System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
        System.out.println("内部调用方法");
    }

    // lock by this
    synchronized static void method() {
        System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
        System.out.println("method_1");
        //
        innerMethod();
        System.out.println("method_2");
    }

    public static void main(String[] args) {
        // 证明synchronized是可重入的
        // 说明:启动10个线程执行加锁方法,方法加的是对象的锁,因此10个线程抢夺一个锁,强到后执行
        // 如果synchronized不能重入,那么innerMethod不会执行
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                T_ReentrantSynchnorized.method();
                System.out.println("--------------------");
            },(i+1)+"").start();
        }
    }
}

synchronized在JVM是基于进入退出Monitor对象来实现的,方法和代码块的实现有差别,在字节码上表现出了差异。

synchronized编译后的字节码后, 如果synchronized是加载方法上,那编译后的字节码Monitor监控的是整个方法,字节码方法上有synchronized标记。

如果是在方法内部使用synchronized对一部分代码加锁,编译后会在这个代码块前后添加monitorenter monitorexit,保证在访问这块代码是是原子操作,一个线程进入后其他线程需要等待。

编译后的字节码如下:

上个类T对方法直接加锁后的字节码。

// java code
    synchronized static void innerMethod() {
        System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
        System.out.println("内部调用方法");
    }

// bate code
  static synchronized void innerMethod();
    Code:
       0: getstatic     #2                  // Field java/lang/System.out:Ljava/io/PrintStream;
       3: new           #3                  // class java/lang/StringBuilder
       6: dup
       7: invokespecial #4                  // Method java/lang/StringBuilder."<init>":()V
      10: ldc           #5                  // String Thread.currentThread().getName() =

使用对象对方法内的一部分代码加锁。

// java code
public class T {
	private int count = 10;
	private Object o = new Object();
	
	public void m() {
		synchronized(o) { //使用对象o对代码块加锁
			count--;
			System.out.println(Thread.currentThread().getName() + " count = " + count);
		}
	}
}

// m方法的byte code
  public void m();
    Code:
       0: aload_0
	...
       5: astore_1
       6: monitorenter
       7: aload_0
	...
      54: aload_1
      55: monitorexit
	...
      60: aload_1
      61: monitorexit

synchronized的使用示例

static class T{
    // 一把锁,Java中任何一个对象都可以作为synchronized使用的锁
    Object lock = new Object();
    
    // 把的实例作为锁
    synchronized void m(){
        System.out.println("hello world");
    }    
    
    // 把lock对象做为锁 ↑
    void n(){
        System.out.println("top");
        synchronized (lock){
            System.out.println("inner");
        }
        System.out.println("bottom");
    }
}

static class Y{
    // static方法加锁,类必须也是静态的,应为静态方法synchronized加锁使用的是类对象 即 Y.class 
    static synchronized void X(){
        System.out.println("hello world");
    }
}

sychronized使用总结:

  • 在非静态类的方法加锁,则锁是该类的实例
  • 在方法内部加锁要显示式指定一个对象,可以是自己创建的对象(如上边的lock)或者 this,this即实例对象,在任意时刻只能由一个线程访问一个实例
  • 在静态类方法上加锁,则锁是类的对象(Y.class),在任意时刻只能由一个线程访问该类

基本数据类型包装类和字符串不要作为锁对象,他们在jvm中都有缓存,这样锁就会暴露,有可能这把锁在别处使用,结果可想而知。

JDK6对synchronized的优化

JDK6对synchronized加锁的方式优化后,synchronized锁并不是那么重了,它加入了锁升级的方式来减少加锁、释放锁的开销。

java中每个对象都可以做为锁,这是因为锁的信息就在对象的头信息上。

对象的头部信息称为mark word,它的Hotspot的结构如下。

image.png

锁升级过程

无锁 -> 偏向锁 -> 轻量级锁/自旋锁 -> 重量级锁

  1. 无锁,一个对象被创建出来,它处于无锁的状态
  2. 当有一个线程访问这个锁对象会升级为偏向锁,这个线程的指针会被记录在这个锁对象的mark word里
  3. 当有两个两个线程同时竞争这个锁对象时,这个锁对象的锁就会变成轻量级锁,进行的操作如下:
    1. 在线程栈中创建lock record对象
    2. 将锁对象的mark work拷贝到lock record
    3. 锁对象的mark work记录lock record指针
    4. 释放竞争线程CAS修改lock record中的线程指针指向只记线程的指针
    5. 修改成功的获得锁,其他线程进入自旋状态
  4. 当轻量级锁竞争过于激烈(到达阈值时),升级为重量级锁,锁对象mark work指向系统互斥量指针,获得锁的线程执行,其他线程阻塞

锁的对比

名称加锁方式优点缺点使用场景
偏向锁mark work标记线程指针快,仅修改锁对象的mark work,纳秒及开销多线线程竞争会存在撤销开销适用在只有一个线程同步快的场景
轻量级锁mark work记录lock record快,竞争线程不阻塞自旋占用CPU资源同步代码执行快,响应时间快的场景
重量级锁操作系统锁吞吐量高(阻塞线程不消耗CPU)线程阻塞,响应时间慢同步代码执行慢的场景

CPU常见术语

  1. 内存屏障 - memory barriers,是一组CPU指令,用于实现对内存操作的顺序限制
  2. 缓存行 - cache line,是CPU高速缓存存储的单位,CPU从内存中读取数据每次读取一行并缓存(在L1 L2 L3等cache中),缓存行通常大小为64byte
  3. 原子操作 - auto operations,不可中断的一个或一组操作
  4. 缓存行填充 - cache line fill,假设缓存行大小为64byte,当不足64byte是进行补充(补齐),正好填充为64byte
  5. 缓存命中 - cache hit, CPU读取数据,先在cache中查看,如果有就直接读取,而不是再到内存中取
  6. 写命中 - write hit,CPU将操作数写回到内存时,先检查缓存中是否有这块数据,如果存在这个缓存行,则将操作数写入缓存而不是内存,这个操作称为写命中

volatile

volatile(易变的、不稳定的),这也是这个关键字在Java、JVM、CPU一些操作的语义。

volatile并发编程中常用的一个关键字,但它并不是锁,因为volatile不能保证操作的原子性,它仅具有让变量在多个线程间具有可见性

volatile有两个作用:

  1. 阻止指令重排序
  2. 使共享变量在多个线程间可见 —— 增加可见性

注意点:volatile只能保证其修饰的变量可见,比如引用或基本数据类型,并不能保证引用的内部数据可见。

可见性与不可见

在多核处理器上,线程并发执行,在访问变量时会把变量复制一份到当前线程(线程私有空间,缓存),待处理完再写回,当此线程在操作共享变量的时候他对外部变量的修改感知不到,因为它读取的是自己拷贝的(CPU的缓存),这是线程对共享变量不可见。可见性,就是共享变量被修改后,其他线程能够及时读取到变量的新值。

对线程共享变量加volatile、synchronized都会使线程对共享变量可见。

volatile如何保证可见性的?

依靠CPU缓存一致性协议和对总线加锁操作。

  1. 将处理器的缓存行写回内存(缓存一致性协议)
  2. 写回操作在其他CPU里缓存了该地址的数据无效(缓存一致性协议)
  3. 锁总线,只允许一个处理器访问缓存和内存

将处理器缓存写入内存是JVM向CPU发出的一个Lock指令,将变量所在的缓存行写入内存;另外CPU包保障缓存一致性是依靠CPU缓存协议的,具体操作为处理器通过嗅探总线上传过来的数据来检查缓存值是否过期,当处理器发现对应的行过期或对应内存地址被修改,就会将缓存设为无效,当处理器对数据进行操作是会重新从把对应内存的数据读取到缓存里。

CPU如何保证缓存一致的?

  • 锁总线(处理器Lock信号),屏蔽其他处理器,只允许一个处理器访问缓存和内存
  • 锁缓存(缓存一致性协议),阻止其他处理器修改缓存区数据在内存的区域,其他处理器回写内存数据使之无效

通过volatile提升性能。缓存行填充可以提高CPU读写内存的效率,可以参考这篇文章: 剖析Disruptor:为什么会这么快?(二)神奇的缓存行填充

在老版本JDK7里的LinkedTransferQueue就使用了缓存行填充来提升效率,但在JDK8版本中不再使用缓存行填充了。

ThreadLocal

先看ThreadLocal的使用示例。

public class T {
    public static void main(String[] args) {
        final ThreadLocal<String> threadLocal = new ThreadLocal<>();
        threadLocal.set("我是主线程main的保存的数据");

        Thread t = new Thread(() -> {
            threadLocal.set("这是线程t保存的数据");
            System.out.println("threadLocal.get() = " + threadLocal.get());
        }, "t");

        t.start();
        System.out.println("threadLocal.get() = " + threadLocal.get());
    }
}

ThreadLocal的作用。本地线程ThreadLocal可以将当前线程(在哪个线程执行的方法里就是哪个线程)与一个变量相关联,可以简单的理解为Map<K,V>,当前线程就是K,设置和获取的数据就是V。

基于ThreadLocal可以与变量绑定的特性,可以将线程的上下文存放进去,这样就可以随时拿到这个数据。典型的应用是Spring的@Transactional的实现,想一下为什么我们没有指定Connection为什么事务就能回滚了?因为这里面用到了ThreadLocal,并把Connection绑定到了当前线程,发生异常从中取出来然后撤销事务。

ThreadLocal只是一个操作变量的工具,真正的数据不在ThreadLocal而是Thread对象上。

一个线程可以有多个ThreadLocal,多个ThreadLocal维护(增/删)Thread对象上的一个Map结构;一个ThreadLocal代表一个本地变量,因为存放变量的键是ThreadLocal对象。看完源码后这点会更清晰,先看张图。

ThreadLocal与线程的关系
图片引自知乎的这篇文章


ThreadLocal源码分析。

public class ThreadLocal<T> {

    public ThreadLocal() { 
    }

    /**
     * 获取与当前线程相关联的对象
     */
    public T get() {
        // 获取当前线程
        Thread t = Thread.currentThread();
        // 获取线程的threadLocals,threadLocals是一个ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // 从map中拿到数据
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        // 创建空一个ThreadLocalMap,并放入一个null
        return setInitialValue();
    }
    
    // 初始化方法,初始创建一个ThreadLocalMap,然后添加个null
    private T setInitialValue() {
        T value = initialValue(); // null
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
    }

    // 设置
    public void set(T value) {
        // 获取当前线程
        Thread t = Thread.currentThread();
        // 获取当前线程的threadLocals,threadLocals是一个ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

     public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }

    // threadLocals是一个ThreadLocalMap
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

    /**
     * Create the map associated with a ThreadLocal. Overridden in
     * InheritableThreadLocal.
     */
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

    /**
     * Thread的对象中的ThreadLocals对象,是一个Map,键是ThreadLocal对象
     */
    static class ThreadLocalMap {

        static class Entry extends WeakReference<ThreadLocal<?>> {
            Object value;
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

        /**
         * 默认容量
         */
        private static final int INITIAL_CAPACITY = 16;

        /**
         * 存储数据数组
         */
        private Entry[] table;

        private int size = 0;

        /**
         * resize、rehash时的容量,达到这个值时会进行resize
         */
        private int threshold; // Default to 0


        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }

        /**
         * Set the value associated with key.
         */
        private void set(ThreadLocal<?> key, Object value) {
            
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);

            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();

                if (k == key) {
                    e.value = value;
                    return;
                }

                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            tab[i] = new Entry(key, value);
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

        /**
         * Remove the entry for key.
         */
        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    e.clear();
                    expungeStaleEntry(i);
                    return;
                }
            }
        }
    }
    
}

Thread中有一个ThreadLocal的ThreadLocalMap,用来存储ThreadLocal放置(set)的数据。

// 线程类
public class Thread implements Runnable {

    ...
    private volatile String name;
    private int            priority;
    private Thread         threadQ;
    private long           eetop;
    
    ...

    /* ThreadLocal中的map */
    ThreadLocal.ThreadLocalMap threadLocals = null;
    ...
}

(完)

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×