Java并发学习笔记

#JVM中并发部分

Java内存模型与线程

1. 概述

  1. 现代计算机系统为了协调CPU和主存之间的速度矛盾,在二者之间设计 高速缓存作为存储交互,但是也带来一个新的问题:缓存一致性

  2. 缓存一致性是指:多个处理器的缓存数据不一致,在缓存数据写回内存时,无法确定以哪一个缓存数据为准。为了解决一致性问题,每个处理器需要遵守缓存一致性协议

  3. 内存模型: 可以理解为在特定的操作协议下,对于特定的内存或者高速缓存读写过程的抽象

  4. Java 内存模型(JMM)试图屏蔽不同硬件和操作系统之间的内存访问差异,以实现让 Java 程序在各种平台下都能达到一致的内存访问效果。

  5. JMM的主要目标是定义程序中各个变量的访问规则。(其中变量指:实例字段,静态字段和构成数组对象的元素,不包括局部变量与方法参数,原因是 后者是线程私有的,不存在竞争问题

  6. JMM模型必须足够严谨以免Java并发内存访问产生歧义;但也必须足够宽松,有足够空间适应不同平台的硬件差异

2. Java内存模型

1. 主内存与工作内存

  1. JMM规定 所有变量存储在主内存(main memory)中,类比于硬件上的内存,但JMM main memory 只是JVM内存中一部分

  2. JMM规定 线程对变量的操作都在线程自身的工作内存中,类比于硬件上的cache,工作内存 保存 本线程使用到的变量的主内存拷贝

  3. 不同线程之间无法通过直接访问对方工作内存中

2. 内存间交互操作

Java 内存模型定义了 8 个操作来完成主内存和工作内存的交互操作。

  1. lock:作用于主内存的变量,将变量 标识为 线程独占状态
  2. read:把一个变量的值从主内存传输到工作内存中
  3. load:在 read 之后执行,把 read 得到的值放入工作内存的变量副本中
  4. use:把工作内存中一个变量的值传递给执行引擎
  5. assign:把一个从执行引擎接收到的值赋给工作内存的变量
  6. store:把工作内存的一个变量的值传送到主内存
  7. write:在 store 之后执行,把 store 得到的值放入主内存的变量中
  8. unlock:作用于主内存的变量,将变量从 独占状态 释放

3. 对于volatile 变量的特殊规则

  1. volatile 关键字 可以说是Java虚拟机提供的最轻量级的同步机制
  2. volatile的两个特性:
    1. 可见性
    2. 禁止指令重排序: 通过添加内存屏障的方式
  3. volatile 变量 和 8个原子性动作: 假设 volatile 变量进行操作:
    1. read ,load , use 三个动作必须相关联
    2. assign, store, write 三个动作必须相关联 (1,2 两点实现可见性)
    3. 不会产生指令重排序
  4. 可见性:是指 volatile变量对所有线程是立即可见的,对volatile变量的所有(语言层面的)写操作能够立即反应到其他线程中,volatile变量在各个线程中一致,但是volatile变量并不是并发安全的。因为Java中的运算不是原子性的。
  5. volatile可以保证并发安全的场景:
    1. 运算结果不依赖变量当前值
    2. 只有一个线程能够修改变量值
    3. 只有一个volatile变量参与不变约束
  6. 指令重排序:不CPU采用了允许将多条指令不按程序规定的顺序分发给计算单元,但不是任意的重排,需要能正确处理指令间依赖情况(前后次序),以保证正确结果
  7. 通过volatile保证线程安全 与 比较
    1. 绝大多数情况下,volatile的开销低于锁
    2. 由于JVM对锁进行各种优化,不能肯定的说锁在任何情况下开销都高于volatile
    3. 如果volatile符合上述三种情景,满足线程安全,那么选用volatile

4. 内存模型三大特性

使用synchronized能够同时满足 原子性,可见性,有序性,但是万能的同时带来的是相对较大的开销

1. 原子性
  1. JMM 要求 lock, read, load, use, assign, store, write, unlock,这八个操作具有原子性
  2. 非原子性协定:对于64bit类型的long 和 double,可以将这些操作划分为两次32bit操作,但是大多数商业JVM仍然实现了原子性
  3. 总而言之,对于常见JVM,主内存与工作内存间的操作 具有 原子性
  4. 但是32bit 的类型 int 并不是在多线程下线程安全:因为 单个操作具有原子性 不等价于 这八个操作整体具有原子性,可以使用原子类 获取 整体的原子性 如AtomicInteger
  5. synchronized 块中的 语句整体具有原子性。(它对应的内存间交互操作为:lock 和 unlock,在虚拟机实现上对应的字节码指令为 monitorenter 和 monitorexit)(monitor 是 管程的含义)
  6. 除非是顶级高手,不应依赖于原子性实现线程安全
  7. 实现原子性的三种方式:
    1. 操作的原子性
    2. 原子类
    3. synchronized
2. 可见性:
  1. JMM中,每个线程具有工作内存,JMM还具有主内存每个线程对于涉及到的变量拥有各自的副本,线程对于变量的修改一般只作用于工作内存中的副本
  2. 可见性指当一个线程修改了共享变量的值,其它线程能够立即得知这个修改
  3. 即使具有原子性,也不一定具有可见性
  4. JMM是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的

  5. 实现可见性的三种方式:

    1. volatile类型变量
    2. synchronized
    3. final:被 final 关键字修饰的字段在构造器中一旦初始化完成,并且没有发生 this 逃逸(其它线程通过 this 引用访问到初始化了一半的对象),就具有可见性(那么其它线程就能看见 final 字段的值)
3. 有序性
  1. 含义:
    1. 在本线程内观察,所有操作都是有序的 (线程内表现为串行的语义
    2. 在一个线程观察另一个线程,所有操作都是无序的(指令重排序
  2. 重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性
  3. 保证有序性的两种方式:
    1. volatile:通过添加内存屏障的方式来禁止指令重排
    2. synchronized: 同一时刻只有一个线程能获得(lock)临界变量,或者运行临界代码

5. 先行发生原则

  1. 作用:判断数据是否存在竞争,线程是否安全的依据。
  2. 先行发生:是指两个操作之间的偏序关系 或者 依赖关系
1. 单一线程原则 Single Thread rule

在一个线程内,在程序前面的操作先行发生于后面的操作。

2. 管程锁定规则 Monitor Lock Rule

一个 unlock 操作先行发生于后面对同一个锁的 lock 操作。

(管程在功能上和信号量及PV操作类似,属于一种进程同步互斥工具,但是具有与信号量及PV操作不同的属性)

(管程monitor 只是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用 由编译器实现)

3. volatile 变量规则 Volatile Variable Rule

对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作。(当满足volatile变量使用条件时)

  1. 线程启动规则 Thread Start Rule

Thread 对象的 start() 方法调用先行发生于此线程的每一个动作。

5. 线程终止规则 Thread Join Rule

Thread 对象的结束先行发生于 join() 方法返回。

6. 线程中断规则 Thread Interruption Rule

对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断的发生,可以通过 interrupted() 方法检测到是否有中断发生。(先有外部线程 中断 该进程,该进程 才会 检测到中断)

7. 对象终结规则 Finalizer Rule

一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。

8. 传递性 Transitivity

如果操作 A 先行发生于操作 B,操作 B 先行发生于操作 C,那么操作 A 先行发生于操作 C。

3. Java 与线程

1. 线程 与 进程

  1. 进程:资源分配的最小单位(I/O,Mem,等 不包括 计算资源)
  2. 线程:CPU调度的最小单位

2. 线程的三种实现方式

  1. 内核级线程:直接由操作系统内核支持,线程切换等由内核完成,支持内核级线程的操作系统内核 称为 多线程内核

    1. 程序一般使用内核线程的高级接口:轻量级进程 LWP

    2. 轻量级进程 与 内核线程 之间的关系为 1:1模型

    3. 优缺点:

      1. 优点:轻量级进程是独立的调度单元,某个LWP的阻塞不影响其他
      2. 缺点:LWP由内核级线程支持,创建,同步等操作需要切换到 内核态,开销大
  2. 用户级线程:广义上,指非内核线程,因此 LWP广义上属于用户线程, 狭义上:完全建立在用户空间,内核无法感知线程存在。

    1. 部分高性能数据库中的多线程使用用户线程实现

    2. 进程与用户线程之间1:N的关系 是 一对多模型

    3. 优缺点:

      1. 优点:没有进入内核态的开销
      2. 缺点:缺少操作系统内核支援,线程操作(创建,调度等)由用户处理,复杂度高,难以实现
  3. 混合实现的线程:

    1. 用户线程建立在用户空间:因此用户线程的创建,切换等开销不大

    2. LWP 作为 用户线程 和 内核线程的 桥梁

    3. 用户线程的调度,通过LWP 由内核完成,降低阻塞风险

    4. 用户线程 与 LWP 之间 N:M的 关系 成为 多对多模型

3. Java中的线程

3.1 实现方式
  1. JDK1.2之前 绿色线程:用户线程实现
  2. JDK1.2中 替换为 内核级线程实现
  3. SunJDK的Win和Linux版本都是由一对一的线程模型来实现的;一个Java线程映射一个LWP
3.2 Java线程调度
1. 两种调度方式:
     1. 协同式线程调度:线程工作完之后,通知系统,释放资源;
          1. 优点:实现简单;
          2. 缺点:线程时间不受系统控制
     2. 抢占式线程调度:系统分配执行时间,Java中线程可以出让资源(**Thread.yield()**,但也只是对系统的**建议**,不一定执行),但只能等待系统分配资源;
          1. 优点:线程时间系统可控;
          2. 缺点较为复杂
          3. Java中设置10个优先级,但不是一一对应至操作系统,因为不同操作系统优先级数量不同
          4. 操作系统能够主动根据线程运行情况 主动 改变线程 优先级
3.3 进程状态转换 与 Java线程 状态转换
1. 操作系统中进程的状态
  1. 创建态:进程已拥有了PCB,但还未进入主存,还不能进行调度的状态

    1. 创建的两个步骤
      1. 为进程创建PCB,分配PID,并填写必要的管理信息
      2. 转入就绪状态并插入就绪队列之中
  2. 就绪态:已获得了除CPU以外的所有资源,处于等待CPU的状态

  3. 执行态:进程获得CPU,进行计算

  4. 阻塞态:等待除CPU之外的某种临界资源的状态

  5. 终止态:

    1. PCB清零,并将PCB空间返还系统

    2. 进入终止态的进程以后不能再执行

2. Java线程状态转换
  1. 新建:新建后尚未启动的状态(不同于进程创建态)

  2. 运行态:包括操作系统进程 执行态和就绪态,Java线程由KLP托管,具体是否运行由操作系统决定

  3. 无限期等待:类似于OS进程挂起态;直到被唤醒,不会被分配CPU时间

    1. 没有Timeout参数的 Object.wait()
    2. 没有Timeout参数的 Thread.join()
    3. LockSupport.park()
  4. 有限期等待:类似于OS进程挂起态;一定时间之后,系统自动唤醒

    1. 有Timeout参数的 Object.wait()
    2. 有Timeout参数的 Thread.join()
    3. Thread.sleep()
    4. LockSupport.parkNanos()
    5. LockSupport.parkUntil()
  5. 阻塞态:等待某种临界资源的状态

  6. 结束态

线程安全与锁优化

线程安全

定义

  1. 通俗定义:如果一个对象可以安全的被多个线程使用,则线程安全(没有携带任何有用信息)

  2. Brian Goetz 的定义:当多个线程访问一个对象时,倘若不需要考虑线程运行时的调度,不需要额外的同步,也不需要调用者进行任何其他的协调操作,调用该对象的行为(一般弱化为一次调用)都可以获得正确的结果,那么这个对象是线程安全的

  3. 也就是说 类的代码本身封装了所有必要的正确性保证手段(同步互斥等),使用者无需关心多线程问题,那么就是 线程安全

Java中的5种不同程度的线程安全

线程安全不同程度的5种数据类型:不可变;绝对线程安全;相对线程安全;线程兼容;线程对立

不可变 Immutable
  1. 不可变对象带来的安全性是最简单和最纯粹的,在Java中 如 final关键字修饰的变量
  2. Java中的不可变类型:

    1. final修饰的基本类型

    2. 所有成员都被final修饰的类;如 String;Number的部分子类,Integer等包装类型,BigInteger等大数类型,但是原子类型AtomicInteger不是不可变类型

    3. 枚举类型

    4. 对于集合类型,可以使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合

      1
      2
      3
      4
      5
      6
      7
      public class ImmutableExample {
      public static void main(String[] args) {
      Map<String, Integer> map = new HashMap<>();
      Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map);
      unmodifiableMap.put("a", 1); // 该行代码 抛出异常,不可变类型 不能修改
      }
      }
绝对线程安全
  1. 绝对的线程安全十分严格,大多数Java API 声明 线程安全的类都不是绝对线程安全

  2. 绝对线程安全是指:不管运行时环境如何,调用者都不需要任何额外的同步措施

  3. Vector类是线程安全 但不是绝对线程安全

    上述代码 会产生并发错误,问题在于 这两个进程 没有 同步 互斥

相对线程安全
  1. 相对线程安全就是通常意义上的线程安全,保证对对象单独的操作是安全的,调用时无需额外措施,但是对于特定顺序的连续调用,需要额外同步手段
  2. Java API 中 大多数线程安全的类型属于这种,如 Vector、HashTable、Collections.synchronizedCollection()方法包装的集合
  3. 上面两段代码 展示的就是 相对线程安全 (对于向量的一次 添加和删除 是 类中的代码附带了 synchronized,但是对于连续的添加删除,仍需额外的互斥措施)
线程兼容

本身不是线程安全的,但是可以调用者可以通过同步互斥手段保证并发安全,通常所说,类不是并发安全指的是这种情况,如相对并发安全的Vector和 HashTable 对应的线程兼容的 ArrayList 和 HashMap

线程对立
  1. 线程对立是指无论调用端是否采取了同步措施,都不是并发安全的
  2. Java天生多线程特性,这种代码很少出现
  3. 线程对立的一个例子是 Thread类的 suspend()和resume()

线程安全的实现方法

1. 同步互斥
概念:
  1. 同步是指:多线程并发访问共享数据(临界资源)时,需要保证共享数据(临界资源在同一个时刻只有一个线程能够访问(使用信号量机制时是一些线程)
  2. 互斥:是实现同步的一种手段,有 临界区,互斥量,信号量
  3. 同步是目的,互斥是方法
  4. 互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,称为阻塞同步悲观保守的并发策略:无论共享数据是否真的会出现竞争,它都要进行加锁
synchronized
  1. 是Java中最基本的同步互斥手段;
  2. 编译后,在同步块前后形成 monitorenter 和 monitorexit 两个字节码指令
  3. synchronized 对象锁
    1. synchronized(this|object) {}
    2. 类可以有多个对象锁,不同对象之间的锁无关
    3. 用以修饰非静态方法
  4. synchronized 类锁
    1. synchronized(类.class) {}
    2. 一个类只有一个 class对象,因此一个类只有一个类锁
    3. 用以修饰静态方法
  5. synchronized块对于同一个线程来说是可以重入的,排斥的对象是其他线程
  6. 需要进行 管态目态切换(Java线程由LWP托管),理论上开销较大,但是现代JVM已经有非常多优化(不是任何时候都管态目态切换)实际上开销尚可
ReentrantLock
  1. java.util.concurrent(J.U.C)包中的 重入锁(ReentrantLock)
  2. 类似于前者,重入锁是Java API层面的互斥锁,synchronized是原生语法层面的互斥锁,理论上性能忧郁synchronized,实际上 性能上没差
  3. 重入锁 ReentrantLock 额外具有三项功能:
    1. 等待可中断:临界资源长期被锁时,等待该资源的进程可以放弃等待
    2. 公平锁:可以取消优先级,先来先服务
    3. 锁绑定多个条件:一个锁绑定多个条件(临界资源),synchronized锁 一个条件 对应 一个锁
  4. 如果需要 重入锁的额外功能,使用重入锁,否则使用synchronized锁
2. 非阻塞同步
  1. 依赖于硬件指令集·,基于冲突检测乐观并发策略先尝试操作,若不存在共享数据的竞争,那么操作成功,如果存在竞争,采取其他措施(一般是重试),无需挂起

  2. 依赖硬件指令集在于: 操作和冲突检测 两个步骤 需要 硬件支持实现原子性(若使用锁实现原子性,则无意义),如:

    1. 测试并设置 指令
    2. 获取并增加 指令
    3. 交换 指令
    4. 比较并交换 指令(CAS
    5. 加载链接/条件存储(load-linked/store-conditional)(LL/SC
  3. CAS指令 以及在 Java中的应用

    1. CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。(只用在当前真实值等价于获取到的值时,才将当前值更新为新值)

    2. J.U.C 包中的整数原子类通过Unsafe类(不允许用户直接调用)使用CAS指令

      1
      2
      3
      4
      5
      private AtomicInteger cnt = new AtomicInteger();

      public void add() {
      cnt.incrementAndGet(); // 调用了 Unsafe 的 getAndAddInt()
      }
      1
      2
      3
      public final int incrementAndGet() {
      return Unsafe.getAndAddInt(this, VALUE, 1) + 1;
      }
      1
      2
      3
      4
      5
      6
      7
      8
      @HotSpotIntrinsicCandidate
      public final int getAndAddInt(Object o, long offset, int delta) {
      int v;
      do {
      v = getIntVolatile(o, offset); // 得到旧的期望值
      } while (!weakCompareAndSetInt(o, offset, v, v + delta));// 调用 CAS指令,如果不成功,一直重试
      return v;
      }
  4. CAS的ABA漏洞

    如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会认为它从来没有被改变过。

    J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效

3. 无需同步方案

线程安全不是一定需要同步,如果一个方法 不涉及任何的共享数据(临界资源)的竞争,那么它无需任何同步措施

可重入代码/纯代码 pure code
  1. 可重入性是比线程安全更加严格的特性,即 可重入的 都是 线程安全的,但是 线程安全的 不全是 可重入的
  2. 可重入代码的一些典型特征;
    1. 不依赖堆上的数据 和 公用的资源
    2. 用到的变量 由参数传入
    3. 不会调用 不可重入的方法代码
  3. 判断可重入性的简单原则:如果一个方法,只要给定相同的输入,其得到的输出一定相同,也就是 结果可预测不依赖外界,那么就是可重入的
线程本地存储
  1. 将需要共享数据的代码方法一个线程中的技术,叫做线程本地存储(共享数据存储在线程本地工作空间,不存在线程间争用)

  2. 大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完。最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题。

  3. 可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。

    对于以下代码,thread1 中设置 threadLocal 为 1,而 thread2 设置 threadLocal 为 2。过了一段时间之后,thread1 读取 threadLocal 依然是 1,不受 thread2 的影响。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class ThreadLocalExample {
    public static void main(String[] args) {
    ThreadLocal threadLocal = new ThreadLocal();
    Thread thread1 = new Thread(() -> {
    threadLocal.set(1);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(threadLocal.get());
    threadLocal.remove();
    });
    Thread thread2 = new Thread(() -> {
    threadLocal.set(2);
    threadLocal.remove();
    });
    thread1.start();
    thread2.start();
    }
    }
    1
    result: 1

    为了理解 ThreadLocal,先看以下代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class ThreadLocalExample1 {
    public static void main(String[] args) {
    ThreadLocal threadLocal1 = new ThreadLocal();
    ThreadLocal threadLocal2 = new ThreadLocal();
    Thread thread1 = new Thread(() -> {
    threadLocal1.set(1);
    threadLocal2.set(1);
    });
    Thread thread2 = new Thread(() -> {
    threadLocal1.set(2);
    threadLocal2.set(2);
    });
    thread1.start();
    thread2.start();
    }
    }

    它所对应的底层结构图为:

    每个 Thread 都有一个 ThreadLocal.ThreadLocalMap 对象。

    当调用一个 ThreadLocal 的 set(T value) 方法时,先得到当前线程的 ThreadLocalMap 对象,然后将 ThreadLocal->value 键值对插入到该 Map 中。

    1
    2
    3
    4
    5
    6
    7
    8
    public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
    map.set(this, value);
    else
    createMap(t, value);
    }

    get() 方法类似。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
    ThreadLocalMap.Entry e = map.getEntry(this);
    if (e != null) {
    @SuppressWarnings("unchecked")
    T result = (T)e.value;
    return result;
    }
    }
    return setInitialValue();
    }

    ThreadLocal 从理论上讲并不是用来解决多线程并发问题的,因为根本不存在多线程竞争。

    在一些场景 (尤其是使用线程池) 下,由于 ThreadLocal.ThreadLocalMap 的底层数据结构导致 ThreadLocal 有内存泄漏的情况应该尽可能在每次使用 ThreadLocal 后手动调用 remove(),以避免出现 ThreadLocal 经典的内存泄漏甚至是造成自身业务混乱的风险。

栈封闭

多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在虚拟机栈中,属于线程私有的

1
2
3
4
5
6
7
8
9
public class StackClosedExample {
public void add100() {
int cnt = 0;
for (int i = 0; i < 100; i++) {
cnt++;
}
System.out.println(cnt);
}
}
1
2
3
4
5
6
7
public static void main(String[] args) {
StackClosedExample example = new StackClosedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> example.add100());
executorService.execute(() -> example.add100());
executorService.shutdown();
}
1
2
3
result:
100
100

锁优化

主要是指 JVM 对 synchronized 的优化

自旋锁

  1. 目的:为了减小同步互斥(阻塞)时管态目态切换的较大开销
  2. 思想:让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态
  3. 分析:自旋锁虽然能避免进入阻塞状态从而减少开销,但是它需要进行忙循环操作占用 CPU 时间,它只适用于共享数据的锁定状态很短的场景
  4. 自适应的自旋锁:自旋的次数不固定,由前一次在同一个锁上的自旋次数及锁的拥有者的状态来决定。如果上一次自旋成功,那么这一次可以获得更多的自旋次数,如果自旋很少成功,那么下一次自选次数会较少

锁消除

  1. 锁消除是指消除被检测出不可能存在竞争的共享数据的锁

  2. 主要是通过逃逸分析来支持,如果堆上的共享数据不可能被其它线程访问,那么就可以把它们当成私有数据对待,也就不存在并发问题,可以消除锁

    1. 线程逃逸:对象在方法中被定义后,可能被外部方法引用
    2. 方法逃逸:对象在方法中被定义后,可能被外部线程访问
  3. 主要是消除 Java API 中的 隐式锁

    对于一些看起来没有加锁的代码,其实隐式的加了很多锁。例如下面的字符串拼接代码就隐式加了锁:

1
2
3
public static String concatString(String s1, String s2, String s3) {
return s1 + s2 + s3;
}

​ String 是一个不可变的类,编译器会对 String 的拼接自动优化。在 JDK 1.5 之前,会转化为 StringBuffer 对象的连续 append() 操作:

1
2
3
4
5
6
7
public static String concatString(String s1, String s2, String s3) {
StringBuffer sb = new StringBuffer();
sb.append(s1);
sb.append(s2);
sb.append(s3);
return sb.toString();
}

​ 每个 append() 方法中都有一个同步块。虚拟机观察变量 sb,很快就会 发现它的动态作用域被限制在 concatString() 方法内部。也就是说,sb 的所有引用永远不会逃逸到 concatString() 方法之外,其他线程无法访 问到它,因此可以进行消除。

锁粗化

  1. 适用情况:如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗
  2. 优化方法:如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。

轻量级锁

  1. JDK 1.6 引入了偏向锁和轻量级锁,从而让锁拥有了四个状态:

    1. 无锁状态(unlocked)
    2. 偏向锁状态(biasble)
    3. 轻量级锁状态(lightweight locked)
    4. 重量级锁状态(inflated)
  2. HotSpot 虚拟机对象头的内存布局(Mark Word)

    其中 tag bits 对应了五个状态,这些状态在右侧的 state 表格中给出。除了 marked for gc 状态,其余对应锁的四种状态。

​ 下图左侧是一个线程的虚拟机栈,其中有一部分称为 Lock Record 的区 域,这是在轻量级锁运行过程创建的,用于存放锁对象的 Mark Word。 而右侧就是一个锁对象,包含了 Mark Word 和其它信息。

  1. 轻量级锁的思想:使用 CAS 操作来避免重量级锁使用互斥量的开销
  2. 可行性:对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用 CAS 操作进行同步,如果 CAS 失败了再改用互斥量进行同步
  3. 轻量级锁的加锁流程
    1. 当尝试获取一个锁对象时,如果未锁定(标记为 0 01)
      1. 在线程的虚拟机栈创建 Lock Record
      2. 使用 CAS 操作将对象的 Mark Word 更新为 Lock Record 指针
      3. 如果 CAS 操作成功
        1. 线程 成功 获取 该对象的轻量级锁
        2. 对象 Mark Word 的锁标记 改为 00,表示处于轻量级锁状态
      4. 如果 CAS 操作失败
        1. 如果对象的 Mark Word 指向当前线程的虚拟机栈
          1. 当前线程已经拥有了这个锁对象
          2. 直接进入同步块 执行代码
        2. 否则表明 锁对象已经被其他线程线程抢占
          1. 如果有两条以上的线程争用同一个锁
            1. 膨胀 为重量级锁
          2. 等待锁释放

偏向锁

  1. 思想偏向于让第一个获取锁对象的线程该线程之后获取该锁无须同步,甚至无须 CAS 操作
  2. 偏向锁的流程:
    1. 当锁对象第一次被线程获得时:进入偏向状态,标记为 1 01
    2. CAS 操作将线程 ID 记录到 Mark Word
    3. 如果 CAS 成功获得偏向锁以后每次进入这个锁相关的同步块无需其他操作
    4. 当有其他线程尝试获取该锁对象时,偏向状态结束,此时撤销偏向(Revoke Bias)恢复到未锁定状态或者轻量级锁状态

第21章 并发 《Thinking in Java》

并发的多面性

  1. 并发通常是提高运行在 单处理器 上程序的性能违反直觉原因在于,能够明显降低单处理器上因为等待I/O等其他非计算资源导致的阻塞。反之,如果任务是计算密集型,不存在非计算资源导致的阻塞或是很少,那么在单处理器上并发没有必要
  2. 在Java中编写多线程程序最关键的一点是:保证共享资源的同步互斥(不能同时被多个线程占有)
  3. 并发简化代码设计,降低耦合度,如 仿真中,为每一个发生变化的事物,提供一个线程/在 web服务器中,一个请求对应一个线程
  4. Java中线程机制是抢占式的,协作式机制也有一下两点优势:1. 上下文开销较小(一个线程完成后,才出让cpu,无须保存现场) 2. 可以同时执行的线程理论上没有数量限制

基本线程机制

使用线程的三种方式

实现 Runnable 接口

  1. 一个实现了 Runnable接口 的类,通过实现(必须实现)run方法,并在run方法中实现线程的逻辑代码,实现多线程。
  2. 实现了Runnable的类,不具有任何内在线程能力,必须附在另一个线程上,然后通过 Thread 调用 start() 方法来启动线程
  3. 在A函数中,调用该类对象的run方法,此时运行的是A函数所在的线程,不是多线程
1
2
3
4
5
6
public class MyRunnable implements Runnable {
public void run() {
// ...
Thread.yield();//建议出让
}
}
1
2
3
4
5
6
7
8
public static void main(String[] args) {
MyRunnable instance = new MyRunnable();
Thread thread = new Thread(instance);
//附在另一个线程上
thread.start(); // start()方法在启动thread之后立即返回,thread中的run()在另一个线程中运行
//附着在main函数所在的线程上,run()在本线程中运行
instance.run();
}

实现 Callable 接口

  1. 类似于Runnable接口,通过实现(必须实现)call方法,并在call方法中实现逻辑
  2. 类似于Runnable接口,不具有任何的内在线程能力,必须附在另一个线程上,通过Thread的start方法,启动该线程
  3. 与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask 进行接收和封装。
1
2
3
public class MyCallable implements Callable<Integer> {
public Integer call() { return 123; }
}
1
2
3
4
5
6
7
8
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
// 当组合了该对象的thread的运行完成后,FutureTask对象将会接受返回值
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}

继承 Thread 类

  1. Thread类具有内在的线程能力需要实现 run() 方法,因为 Thread 类也实现了 Runable 接口
  2. 当调用 start() 方法启动线程时,VM将该线程放入就绪队列中进入就绪态,当一个线程被调度时会执行该线程的 run() 方法
  3. 当Thread对象创建后,即使没有引用指向该对象,也不会被GC回收,直至Thread任务结束,退出run方法,原因:每个Thread对象都进行了“注册”,实际上有一个引用指向该thread对象,直至任务结束
1
2
3
4
5
public class MyThread extends Thread {
public void run() {
// ...
}
}
1
2
3
4
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}

对比

  1. Runnable & Callable 都不具有内在线程能力必须将实现了Runnable & Callable的对象交给一个Thread对象,实现多线程
  2. Thread类具有内在的多线程能力,Thread类本身也实现了Runnable接口,继承该类时需要实现run方法
  3. 实际应用中,实现接口会更好一些,因为:
    1. Java 单根继承体系,导致如果继承了 Thread 类就无法继承其它类,但是可以实现多个接口
    2. 相比于实现接口,继承整个Thread类的开销较大

Executor

Executor 管理多个异步任务(不需要进行同步操作,线程之间线程安全)的执行,而无需显式地管理线程的生命周期

  1. 主要有三种 Executor:
    1. CachedThreadPool:执行过程中,创建与所需数量相同的线程;
    2. FixedThreadPool:所有任务只能使用固定大小的线程池;提前进行线程分配预支线程分配的代价
    3. SingleThreadExecutor:相当于大小为 1 的 FixedThreadPool,如果向其添加多个任务,这些任务串行运行,利用这个性质,可以避免并发问题
  2. ExecutorService对象executorService.execute()方法,返回空,只用于runnable
  3. ExecutorService对象executorService.submit()方法,返回Future对象,只用于callable和runnable
  4. Future 对象 isDone 方法 / get()方法(若作业还未结束,get()阻塞)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
ArrayList<Future<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 5; i++) {
executorService.execute(new MyRunnable());
// Callable对象 必须使用能够返回Future对象的submit方法提交作业
futures.add(executorService.submit(new Oranage()));
}
// 关闭线程池,无法向线程池添加新线程,已有线程正常运行
executorService.shutdown();
for (Future<Integer> future : futures) {
if(future.isDone()){ // 询问 是否完成
System.out.println(future.get()); // 获取结果
}
// 若不询问 isDone,且未完成,将阻塞
System.out.println(future.get());
}
}

Daemon

  1. 守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分

  2. 当所有非守护线程结束时,JVM将会终止,同时会杀死所有守护线程

  3. main() 属于守护线程。

  4. 使用 setDaemon() 方法将一个线程设置为守护线程/ 必须在线程启动之前

    1
    2
    3
    4
    5
    public static void main(String[] args) {
    Thread thread = new Thread(new MyRunnable());
    thread.setDaemon(true);
    thread.start();
    }
  5. Deamon进程的子进程默认是Deamon进程

  6. Deamon进程的finally{}语句块不会被执行

sleep()

  1. Thread.sleep(millisec) 方法会休眠当前正在执行的线程,millisec 单位为毫秒
  2. Java5引入 TimeUnit.MINUTES.sleep() ; TimeUnit.MILLISECONDS.sleep();等
  3. sleep() 可能会抛出 InterruptedException异常不能跨线程传播必须在本地处理
1
2
3
4
5
6
7
8
9
public void run() {
try {
Thread.sleep(3000);
TimeUnit.MINUTES.sleep(1);
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

优先级

  1. JVM倾向于让优先级高的进程先运行,但不是严格按照优先级次序执行(导致饿死

  2. 不推荐手动修改优先级应该由JVM调度

  3. Java具有10个优先级,不同操作系统优先级划分不同,无法完美映射,应使用 Thread.MAX_PRIORITY;Thread.MIN_PRIORITY;Thread.NORM_PRIORITY

  4. 使用setPriority() getPriority()方法 设置和查看优先级

    1
    2
    Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
    Integer priority = Thread.currentThread().getPriority();

yield()

  1. 对静态方法 Thread.yield() 的调用声明:当前线程已经完成了生命周期中最重要的部分,建议切换给其它具有相同优先级的线程来执行
  2. 仅是建议,不一定切换
  3. 切换对象是其它具有相同优先级的线程
1
2
3
public void run() {
Thread.yield();
}

过期的suspend(),resume(),stop()

  1. 线程对象 threadObject 具有 suspend(),resume(),stop() 三种方法,能够 暂停线程运行,恢复线程运行,停止线程运行
  2. 这是三种方式已经过期,不应该继续使用,原因有:
    1. suspend()暂停线程后不释放资源
    2. stop()停止线程后,不保证线程占有的资源正确释放
  3. 应该使用中断的方式来终止线程

中断

线程中断

  1. 线程具有中断状态
    1. 通过该线程对象的interrupt()方法中断该线程
    2. 通过该线程对象的isInterrupted()方法查看中断标志
    3. Thread类的静态方法 Thread.interrupted()返回当前中断标志,并且将其复位为false
  2. 处于终结状态的线程中断状态为false(非中断)thread.isInterrupted()为false
  3. 方法声明中表明抛出中断异常的方法(如 Thread.sleep())在收到中断之后,抛出InterruptedException,并将中断标志复位
  4. 处于阻塞、限期等待或者无限期等待状态的线程,在收到终端之后,抛出InterruptedException,并将中断标志复位
  5. I/O 阻塞和 synchronized 锁阻塞 不可中断 (使用NIO方式 可以中断I/O)

终止线程的安全方式

  1. 通过中断结束线程

  2. 通过private volatile boolean on字段 标志运行状态结束线程

  3. 通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断地将线程停止,因此这种终止线程的做法显得更加安全和优雅

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import java.util.concurrent.TimeUnit;
    public class Shutdown {
    private static class Runner implements Runnable {
    private long i;
    private volatile boolean on = true;
    @Override
    public void run() {
    try{
    while (on && !Thread.currentThread().isInterrupted()) { i++; }
    System.out.println("Count i = " + i);
    }
    }finally{
    // 将清理工作放在finally中,无论如何都能正确执行
    }
    }
    public void cancel() { on = false; }
    }
    public static void main(String[] args) throws Exception {
    Runner one = new Runner();
    Thread countThread = new Thread(one, "CountThread");
    countThread.start();
    TimeUnit.SECONDS.sleep(1);
    countThread.interrupt(); // 通过中断 结束
    one.cancel();
    Runner two = new Runner();
    countThread = new Thread(two, "CountThread");
    countThread.start();
    TimeUnit.SECONDS.sleep(1);
    two.cancel(); // 通过On标志结束
    }
    }
    // result:
    // Count i = 713834415
    // Count i = 757959972
  4. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import java.util.concurrent.TimeUnit;
    public class Shutdown {
    private static class Runner implements Runnable {
    private long i;
    private volatile boolean on = true;
    @Override
    public void run() {
    while (on && !Thread.currentThread().isInterrupted()) {
    i++;
    }
    System.out.println(" Count i = " + i);
    // 可以进行清理工作
    }
    public void cancel() { on = false;}
    }
    public static void main(String[] args) throws Exception {
    Runner one = new Runner();
    Thread countThread = new Thread(one, "CountThread");
    countThread.start();
    TimeUnit.SECONDS.sleep(1);
    countThread.interrupt(); // 通过中断 结束
    one.cancel();
    Runner two = new Runner();
    countThread = new Thread(two, "CountThread");
    countThread.start();
    TimeUnit.SECONDS.sleep(1);
    two.cancel(); // 通过On标志结束
    }
    }
    //result:
    //Count i = 713834415
    //Count i = 757959972

Executor 的中断操作

  1. Executor 的 shutdown() 方法:等待所有线程执行完毕后再关闭线程池

  2. Executor 的 shutdownNow() 方法:相当于调用每个线程的 interrupt() 方法,立即中断所有线程

  3. 只中断 Executor 中的一个线程

    通过使用 submit() 方法来提交一个线程(runnable/callable),它会返回一个 Future对象,通过调用future对象的cancel(true) 方法就可以中断线程。

Java多线程异常处理

问题

子线程中的异常不会抛出到父线程异常不能跨线程传播,只能在本地进行处理,本地未处理的异常将传播到控制台(有一种方式可以在父线程中处理)

原因

  1. 对于checked exception,Runnable接口中的run()方法没有throws标签,因此任何实现Runnable接口的子类的run()方法也不会抛出受检查异常(Callable也一样)
  2. 对于unchecked exception 也就是 runtime exception,子线程交由JVM调度,发生异常不会通知父线程

错误处理方式

在父线程中使用try catch 捕获子线程异常,子线程异常不会传播到父线程

正确处理方式

将异常在线程本地处理

子线程本地try catch

UncaughtExceptionHandler

  1. Thread.setUncaughtExceptionHandler设置当前线程的异常处理器
  2. Thread.setDefaultUncaughtExceptionHandler为整个程序设置所有线程默认的异常处理器
  3. UncaughtExceptionHandler的优先级 高于 DefaultUncaughtExceptionHandler

  4. 使用方式

    1. UncaughtExceptionHandler
      1. 实现UncaughtExceptionHandler接口,和其中的uncaughtException方法
      2. 实现一个线程工厂,在其中为new出来的线程设置UncaughtExceptionHandler
      3. 将线程工厂交给ExecutorService对象
    2. DefaultUncaughtExceptionHandler
      1. 将Thread类的静态成员,通过其静态方法setDefaultUncaughtExceptionHandler 设置一个 UncaughtExceptionHandler
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    /*
    * 第一步:定义符合线程异常处理器规范的“异常处理器”
    *Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用
    */
    class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{
    // 实现uncaughtException()方法
    @Override
    public void uncaughtException(Thread t, Throwable e) {
    System.out.println("caught "+e);
    }
    }
    /*
    * 第二步:定义线程工厂
    * 线程工厂用来将任务附着给线程,并给该线程绑定一个异常处理器
    */
    class HanlderThreadFactory implements ThreadFactory{
    @Override
    public Thread newThread(Runnable r) {
    System.out.println(this+"creating new Thread");
    Thread t = new Thread(r);
    System.out.println("created "+t);
    t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());//设定线程工厂的异常处理器
    System.out.println("eh="+t.getUncaughtExceptionHandler());
    return t;
    }
    }
    /*
    * 第三步:我们的任务可能会抛出异常
    * 显示的抛出一个exception
    */
    class ExceptionThread implements Runnable{
    @Override
    public void run() {
    Thread t = Thread.currentThread();
    System.out.println("run() by "+t);
    System.out.println("eh = "+t.getUncaughtExceptionHandler());
    throw new RuntimeException();
    }
    }
    /*
    * 第四步:使用线程工厂创建线程池,并调用其execute方法
    */
    public class ThreadExceptionUncaughtExceptionHandler{
    public static void main(String[] args){
    Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
    ExecutorService exec = Executors.newCachedThreadPool(new HanlderThreadFactory());
    exec.execute(new ExceptionThread());
    }
    }

通过Future的get方法捕获异常

  1. 使用线程池提交一个能获取到返回信息的方法,也就是ExecutorService.submit(Callable)

  2. 如果子线程中发生了异常,通过future.get()获取返回值时,可以捕获到子线程异常

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    class ExceptionThread implements Callable{
    @Override
    public int call() {
    Thread t = Thread.currentThread();
    System.out.println("run() by "+t);
    System.out.println("eh = "+t.getUncaughtExceptionHandler());
    throw new RuntimeException();
    return -1;
    }
    }
    public class UseFutureCatchException{
    public static void main(String[] args){
    ExecutorService exec = Executors.newCachedThreadPool(new HanlderThreadFactory());
    Future f = exec.submit(new ExceptionThread());
    try{ f.get();}
    catch(ExecutionException e){
    System.out.println("caught")
    }
    finally { if (executorService != null) {executorService.shutdown();} }
    }

线程间协作

当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调

join()

  1. 在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等,直到目标线程结束。(当前线程挂起,CPU资源交给执行join()方法的线程
  2. join(timeOut)方法 可以设置 超时参数在给定时间之后无论执行join()方法的线程是否完成,join()都将返回
  3. 可以通过在调用线程上调用interrupt()方法,中断join()方法,需要try catch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。
public class JoinExample {
private class A extends Thread {
@Override
public void run() { System.out.println("A"); }
}
private class B extends Thread {
private A a;
B(A a) { this.a = a; }
@Override
public void run() {
try { a.join();}
catch (InterruptedException e) { e.printStackTrace(); }
System.out.println("B");
}
}
public void test() {
A a = new A();
B b = new B(a);
b.start();
a.start();
}
public static void main(String[] args) {
JoinExample example = new JoinExample();
example.test();
}
}
// result:
// A
// B

等待通知机制

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、
wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合可以实现等待/通知模式

Condition接口也提供了类似Object的监视器方法,await() signal() signalAll()与Lock配合可以实现等待/通知模式,这种方式可以指定等待条件,更加灵活

wait() notify() notifyAll()

  1. 使用wait()、notify()和notifyAll()时需要先对调用对象加锁
  2. 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的
    等待队列
    ,线程释放对象上的锁
  3. notify()/ notifyAll()方法将等待队列中的一个/所有 等待线程从等待队列中移到同步队列中,被移动的线程状态由WAITING变为BLOCKEDwait线程不会从wait()返回,只有在notify线程释放锁,等待线程获取锁后,才有机会从wait()返回
  4. 从wait()方法返回的前提是获得了对象的锁
  5. 通知等待机制依托于同步机制,因此只能在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException
  6. 如果wait()没有释放锁,那么其它线程就无法获得锁,进入同步块中,无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁
  7. wait() 和 sleep() 的区别
    1. wait()Object 的方法,且释放
    2. sleep()Thread 的静态方法,且不释放

等待通知机制的经典范式

等待方:
  1. 获取对象object的锁

  2. 如果条件不满足,object.wait(),释放对象的锁,被通知后,仍检查条件是否满足 (执行wait之后并没有退出循环,只是线程挂起)

  3. 如果条件满足,执行逻辑代码

    1
    2
    3
    4
    5
    6
    synchronized(对象) {
    while(条件不满足) {
    对象.wait();
    }
    对应的处理逻辑
    }
通知方:
  1. 获取对象的锁

  2. 改变条件

  3. 通知所有在该对象等待队列中的线程 (notify()是notifyall()的一种优化,使用条件更加严格,所有wait任务必须等待相同条件,必须只有一个任务受到条件改变的影响,对于对象的子类上述限制都有效)

    1
    2
    3
    4
    synchronized(对象) {
    改变条件;
    对象.notifyAll();
    }
例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class WaitNotify {
static boolean flag = true; // 标志条件
static Object lock = new Object(); // 对象
public static void main(String[] args) throws Exception {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
}

static class Wait implements Runnable {
@Override
public void run() {
synchronized (lock) { // 加锁,拥有lock的Monitor
while (flag) { // 当条件不满足时,继续wait,同时释放了lock的锁
System.out.println(Thread.currentThread() + " flag is true. wait@ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
try {
lock.wait(); // 如果不wait(),等待线程一直持有锁,等待条件的变化,造成死锁,程序一直卡在该循环中
// 执行wait之后并没有退出循环,只是线程挂起
} catch (InterruptedException e) {
e.printStackTrace();
}
}// 条件满足时,完成工作
System.out.println(Thread.currentThread() + " flag is false. running @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}
}

static class Notify implements Runnable {
@Override
public void run() { // 加锁,拥有lock的Monitor
synchronized (lock) { // 获取lock的锁,然后进行通知,通知时不会释放lock的锁,直到当前线程释放了lock后,WaitThread才能从wait方法中返回
System.out.println(Thread.currentThread() + " hold lock. notify @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.notifyAll();
flag = false;
}// 再次加锁
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep@ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
/*
Thread[WaitThread,5,main] flag is true. wait@ 10:03:36
Thread[NotifyThread,5,main] hold lock. notify @ 10:03:37
Thread[WaitThread,5,main] flag is false. running @ 10:03:37
Thread[NotifyThread,5,main] hold lock again. sleep@ 10:03:42*/

await() signal() signalAll()

  1. J.U.C 类库中提供了 Condition 类来实现线程之间的协调,具体来说是await() signal() signalAll()
  2. Condition类对象和 Lock类对象搭配实现等待通知机制,Condition对象由Lock对象产生,Condition依赖于Lock(类似等待通知机制依赖于同步互斥机制)
  3. await() 可以
    1. 指定等待的条件(一个Lock可以产生多个condition对象
    2. 选择不被中断 conditionObject.awaitUninterruptedly()
    3. 超时等待(时间)conditionObject.awaitNanos(long timeout) (返回值是还剩余的时间,正表示未超时,负表示超时)
    4. 超时等待(时刻)conditionObject.awaitUntil(Date date)(返回值true表示未超时,false表示超时,Date指的是具体时刻)
  4. signal() signalAll() 表示通知等待 一个/所有线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedQueue<T> {
private Object[] items;
private int addIndex, removeIndex, count; // 添加的下标,删除的下标和数组当前数量
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
// 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
public void add(T t) throws InterruptedException {
lock.lock(); // 1. 获取锁
try {
while (count == items.length) { //while循环内 检查条件1
notFull.await(); // 不满足条件1等待
}
// 满足条件1,执行逻辑操作
items[addIndex] = t;
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
//条件2得到满足,通知等待条件2的线程
notEmpty.signal();
} finally {
lock.unlock(); // 释放锁
}
}
// 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
@SuppressWarnings("unchecked")
public T remove() throws InterruptedException {
lock.lock(); // 1. 获取锁
try {
while (count == 0) { //while循环内 检查条件2
notEmpty.await(); // 不满足条件2等待
}
// 满足条件2,执行逻辑操作
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
//条件1得到满足,通知等待条件2的线程
notFull.signal();
return (T) x;
} finally {
lock.unlock(); // 释放锁
}
}
}

##互斥同步

Java语言提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是 JVM 实现的 synchronized,而另一个是 JDK 实现的 ReentrantLock

synchronized

原理解释

  1. 所有的对象都含有一个单一的锁(monitor),当调用对象的synchronized方法或者语句块时,针对对象加锁,其他方法,只有等正在执行的方法退出后,释放锁,才能获得该对象的锁,也就是 同一个对象的所有synchronized方法或语句块共享一个对象锁
  2. 每个类也具有一个锁(作为Class对象的一部分),synchronized 修饰的静态方法/语句块,以及 针对类的synchronizedsynchronized (SynchronizedClass.class)作用于整个类
  3. 所有能够操作临界资源的途径都必须使用synchronized,否则失效
    1. synchronized 作用在 语句块 和方法 上,因此 对于 类中的字段 必须是 private
    2. 所有涉及到临界资源的方法 都必须被synchronized修饰
  4. 同一个对象可以获得多个锁已经获得锁的方法,可以调用同一个对象的另一个synchronized方法,并且 锁的计数值+1,当一个方法退出时计数值-1

使用方式

  1. 同步一个代码块

    作用于同一个对象,不能作用于不同的对象

    1
    2
    3
    4
    5
    public class SynchronizedExample {
    public void func1() {
    synchronized (this) { // this 也可以是 其他对象
    for (int i = 0;i <10;i++) { System.out.print(i+" ");}
    }}}
    1
    2
    3
    4
    5
    6
    7
    8
    public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func1());
    executorService.execute(() -> e1.func1());
    }
    // 作用于同一个对象, 串行,互斥有效
    // result : 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    SynchronizedExample e2 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func1());
    executorService.execute(() -> e2.func1());
    }
    // 作用于两个对象, 交替执行,并发,互斥无效
    // result : 0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
  2. 同步一个方法

    和同步代码块一样,作用于同一个对象

1
public synchronized void func () {  }
  1. 同步一个类

    作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class SynchronizedExample {
    public void func2() {
    synchronized (SynchronizedExample.class) {
    for (int i=0; i<10; i++) {
    System.out.print(i+" ");
    }
    }
    }
    public static void main(String[] args) {
    SynchronizedExample e1 = new SynchronizedExample();
    SynchronizedExample e2 = new SynchronizedExample();
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> e1.func2());
    executorService.execute(() -> e2.func2());
    }
    }
    // 针对类的synchronized锁,同一个类的不同对象, 串行,互斥有效
    // result : 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
  2. 同步一个静态方法

    作用于整个类 public synchronized static void fun() {}

ReentrantLock

  1. ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁, 由JDK负责实现
  2. 必须显式new锁对象,对锁对象加锁,释放
  3. 搭配 try finally 使用, return 必须在try中unlock必须在finally中,这样 锁一定被释放且不会提早释放(在return之前)
  4. Lock对象 具有 tryLock(),tryLock(time,TimeUnit.SECONDS),等方式,程序员可以通过这样的方式在没有获得锁时,处理其他任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class LockExample {
private Lock lock = new ReentrantLock(); // new 锁
public int func() {
lock.lock(); // 加锁
int result = 0;
try {
for (int i = 0; i < 10; i++) { result += i;}
return result; // 在try子块中return,确保不提前释放锁
} finally {
lock.unlock(); // 确保释放锁,从而避免发生死锁。
}
}
public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.func());
executorService.execute(() -> lockExample.func());
}
}
// result: 串行, 锁有效
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

synchronized 和 ReentrantLock比较

  1. 锁的实现:synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。
  2. 性能: 新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同
  3. 等待可中断:当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。ReentrantLock 可中断,而 synchronized 不行
  4. 公平锁 公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的
  5. 锁绑定多个条件:一个 ReentrantLock 可以同时绑定多个 Condition 对象

使用选择

除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且JVM 会确保锁的释放 synchronized 锁,不用担心没有释放锁而导致死锁问题

J.U.C - AQS

java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是 J.U.C 的核心

CountDownLatch

  1. 用来控制一个线程等待多个线程

  2. 维护了一个计数器 cnt,调用 countDown() 方法计数器值减 1计数器值不为0时调用await()/wait()方法线程阻塞,进入等待队列;计数器值为0时,在该对象等待队列中的线程会被唤醒

  3. CountDownLatch只能触发一次,计数值不能重置,CyclicBarrier是能够重置计数值的版本

CyclicBarrier

  1. 用来控制多个线程互相等待 (CountDownLatch是一个线程等待多个线程)
  2. 通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行
  3. CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用
  4. public CyclicBarrier(int parties, Runnable barrierAction)为其中一个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次
  5. 与CountDownLatch的区别在于
    1. 多个线程相互等待
    2. 不通过countDown方法计数器减一,await方法计数器减一且等待
    3. reset方法重置计数器值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class exam01 {
static class Task implements Runnable {
CyclicBarrier cyclicBarrier;
static int threads = 0;
final int id = threads++;
public Task(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("block thread :" + id);
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("resume thread :" + id);
}
}
public static void main(String[] args) {
final int totalThread = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.submit(new Task(cyclicBarrier));
}
executorService.shutdown();
}
}
//block thread :0 block thread :1 block thread :2 resume thread :1 resume thread :0 resume thread :2

Semaphore

  1. synchronized锁和lock锁,一般只允许一个任务访问一项资源,信号量允许n个任务获得信号量
  2. Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数

以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(()->{
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
1
2 1 2 2 2 2 2 1 2 2

J.U.C - 其它组件

FutureTask

  1. 在介绍 Callable 时我们知道它可以有返回值,返回值通过 Future 进行封装
  2. FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future 接口
  3. FutureTask 既可以当做一个任务执行,也可以有返回值
1
2
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务需要执行很长时间,那么就可以用 FutureTask 来封装这个任务,主线程在完成自己的任务之后再去获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
Thread.sleep(10);
result += i;
}
return result;
}
});

Thread computeThread = new Thread(futureTask);
computeThread.start();

Thread otherThread = new Thread(() -> {
System.out.println("other task is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
otherThread.start();
System.out.println(futureTask.get());
}
}
1
2
other task is running...
4950

BlockingQueue

wait()和notifyAll()方法以及await()和signalAll()方法实现的等待通知模型以一种比较低级的方式解决了不同线程间的协作问题,即每次交互时两个线程都握手(await的例子中,每次加入或者删除一个元素都会有signal操作)

同步队列实现了更高级别的抽象,使得线程之间解耦同步队列在同一时刻只允许一个线程插入或者删除元素

J.U.C.BlockingQueue 接口有以下阻塞队列的实现:

  1. FIFO 队列 :LinkedBlockingQueue、ArrayBlockingQueue(固定长度)
  2. 优先级队列 :PriorityBlockingQueue

提供了阻塞的 take() 和 put() 方法:

  1. 如果队列为空 take() 将阻塞,直到队列中有内容
  2. 如果队列为满 put() 将阻塞,直到队列有空闲

使用 BlockingQueue 实现生产者消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ProducerConsumer {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
private static class Producer extends Thread {
@Override
public void run() {
try {
queue.put("product");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("produce..");
}
}
private static class Consumer extends Thread {
@Override
public void run() {
try {
String product = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("consume..");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
Producer producer = new Producer();
producer.start();
}
for (int i = 0; i < 5; i++) {
Consumer consumer = new Consumer();
consumer.start();
}
for (int i = 0; i < 3; i++) {
Producer producer = new Producer();
producer.start();
}
}
1
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..

PipedWriter类和PipedReader类

  1. Java I/O类库提供通过 I/O线程间进行通信的方式

  2. PipedWriter类 (允许任务向管道写) 和PipedReader类 (允许不同任务从同一个管道中读取)

  3. 该模型可以看成是生产者-消费者问题的变体

  4. 管道基本可以视作一个阻塞队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    import java.io.IOException;
    import java.io.PipedReader;
    import java.io.PipedWriter;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    public class PipedIO {
    static class Sender implements Runnable {
    private Random rand = new Random(47);
    private PipedWriter out = new PipedWriter();
    public PipedWriter getPipedWriter() {
    return out;
    }
    @Override
    public void run() {
    try {
    while (true) {
    for (char c = 'A'; c <= 'z'; c++) {
    out.write(c);
    TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
    }
    }
    } catch (IOException e) {
    System.out.println(e + ": sender write exception");
    } catch (InterruptedException e) {
    System.out.println(e + ": sender sleep exception");
    }
    }
    }
    static class Receiver implements Runnable {
    private PipedReader in;
    public Receiver (Sender sender ) throws IOException{
    in = new PipedReader(sender.getPipedWriter());
    }
    @Override
    public void run() {
    try {
    System.out.println("read: "+(char)in.read()+". ");
    }catch (IOException e){
    System.out.println(e + ": receiver read exception");
    }
    }
    }
    public static void main(String[] args) throws Exception {
    Sender sender = new Sender();
    Receiver receiver = new Receiver(sender);
    ExecutorService exec = Executors.newCachedThreadPool();
    exec.execute(sender);
    exec.execute(receiver);
    TimeUnit.SECONDS.sleep(4);
    exec.shutdownNow();
    }
    }

ForkJoin

主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ForkJoinExample extends RecursiveTask<Integer> {
private final int threshold = 5;
private int first;
private int last;
public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}
@Override
protected Integer compute() {
int result = 0;
if (last - first <= threshold) {
// 任务足够小则直接计算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分成小任务
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
1
2
3
4
5
6
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}

ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

1
public class ForkJoinPool extends AbstractExecutorService

ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。每个线程都维护了一个双端队列,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例如下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。


线程不安全示例

如果多个线程对同一个共享数据进行访问而不采取同步操作的话,那么操作的结果是不一致的。

以下代码演示了 1000 个线程同时对 cnt 执行自增操作,操作结束之后它的值有可能小于 1000。

1
2
3
4
5
6
7
8
9
10
11
12
public class ThreadUnsafeExample {

private int cnt = 0;

public void add() {
cnt++;
}

public int get() {
return cnt;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
ThreadUnsafeExample example = new ThreadUnsafeExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}
1
997

死锁

  1. 定义:死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去
  2. 可能产生死锁的多线程程序并不一定会产生死锁,并且产生死锁的情况一般不可复现,因此编写多线程程序时需要谨慎预防死锁(程序在一次运行中可能良好,但具有死锁的风险
  3. 产生死锁的四个条件
    1. 资源互斥:临界资源只能被一个线程占有
    2. 不可抢占:互斥资源不能被抢占,也就是说占有资源的线程除非完成作业不会主动/被动的释放资源
    3. 持有等待至少一个线程持有部分资源,并且等待另一部分资源,以完成作业
    4. 循环等待:线程A在等待线程B持有的资源,线程B在等待线程C持有的资源,线程C在等待线程A持有的资源
  4. 从产生死锁的四个条件谈预防死锁(只需破坏其中一个条件就可预防死锁)(以哲学家就餐问题为例)
    1. 资源互斥:该条件一般必须满足
    2. 不可抢占:哲学家A,B都只占有一个筷子,A能够抢占B的筷子(被动释放资源
    3. 持有等待:哲学家如果只能占有一根筷子,则选择一根也不占有(主动放弃资源
    4. 循环等待:将所有哲学家(除最后一个)拿筷子的顺序设为相同,最后一个哲学家拿筷子的顺序设为相反

参考

操作系统之进程的几种状态

Think in java

如果觉得有用的话,打赏我吧~