Java并发编程

在前面的内容中,学习了多线程相关的内容,在多线程环境下保证线程安全是首要考虑的问题,保证线程安全最简单的方式就是使用synchronized修饰方法或者代码片段,那么synchronized的底层原理是如何实现的,这就需要先深入了解对象在内存中的结构。在本章节中将详细学习synchronized关键字的原理及Java中的并发类。

Java对象结构

Java对象结构包括三部分:对象头、对象体和对齐字节。 在本小节内将详细介绍Java的对象头。
1.对象头
对象头:对象头包括三个字段,第一个字段叫作Mark Word(标记字),用于存储自身运行时的数据,例如GC标志位、哈希码、锁状态等信息。
第二个字段叫作Class Pointer(类对象指针),用于存放方法区Class对象的地址,虚拟机通过这个指针来确定这个对象是哪个类的实例。
第三个字段叫作Array Length(数组长度)。如果对象是一个Java数组,那么此字段必须有,用于记录数组长度的数据;如果对象不是一个Java数组,那么此字段不存在,所以这是一个可选字段。
2.对象体
对象体包含对象的实例变量(成员变量),用于成员属性值,包括父类的成员属性值。这部分内存按4字节对齐。
3.对齐字节
对齐字节也叫作填充对齐,其作用是用来保证Java对象所占内存字节数为8的倍数HotSpot VM的内存管理要求对象起始地址必须是8字节的整数倍。对象头本身是8的倍数,当对象的实例变量数据不是8的倍数时,便需要填充数据来保证8字节的对齐。
接下来,对Object实例结构中几个重要的字段的作用做一下简要说明:

  1. Mark Word(标记字)字段主要用来表示对象的线程锁状态,另外还可以用来配合GC存放该对象的hashCode。
  2. Class Pointer(类对象指针),用于存放方法区Class对象的地址,虚拟机通过这个指针来确定这个对象是哪个类的实例。
  3. Array Length(数组长度)字段占用32位(在32位JVM中)字节,这是可选的,只有当本对象是一个数组对象时才会有这个部分。
  4. 对象体用于保存对象属性值,是对象的主体部分,占用的内存空间大小取决于对象的属性数量和类型。
  5. 对齐字节并不是必然存在的,也没有特别的含义,它仅仅起着占位符的作用。当对象实例数据部分没有对齐(8字节的整数倍)时,就需要通过对齐填充来补全。

对象结构中的字段长度

Mark Word、Class Pointer、Array Length等字段的长度都与JVM的位数有关。Mark Word的长度为JVM的一个Word(字)大小,也就是说32位JVM的MarkWord为32位,64位JVM的Mark Word为64位。Class Pointer(类对象指针)字段的长度也为JVM的一个Word(字)大小,即32位JVM的Mark Word为32位,64位JVM的Mark Word为64位。
所以,在32位JVM虚拟机中,Mark Word和Class Pointer这两部分都是32位的;在64位JVM虚拟机中,Mark Word和Class Pointer这两部分都是64位的。
对于对象指针而言,如果JVM中的对象数量过多,使用64位的指针将浪费大量内存,通过简单统计,64位JVM将会比32位JVM多耗费50%的内存。为了节约内存可以使用选项+UseCompressedOops开启指针压缩。UseCompressedOops中的Oop为Ordinary object pointer(普通对象指针)的缩写。

  • Class对象的属性指针(静态变量)。
  • Object对象的属性指针(成员变量)。
  • 普通对象数组的元素指针。

当然,也不是所有的指针都会压缩,一些特殊类型的指针不会压缩,比如指向PermGen(永久代)的Class对象指针(JDK 8中指向元空间的Class对象指针)、本地变量、堆栈元素、入参、返回值和NULL指针等。需要注意的是在堆内存小于32GB的情况下,64位虚拟机的UseCompressedOops选项是默认开启的,该选项表示开启Oop对象的指针压缩会将原来64位的Oop对象指针压缩为32位。
手动开启和关闭Oop对象指针压缩的Java指令为:

1
java -XX:+UseCompressedOops mainclass

如果对象是一个数组,那么对象头还需要有额外的空间用于存储数组的长度(Array Length字段)。Array Length字段的长度也随着JVM架构的不同而不同:在32位JVM上,长度为32位;在64位JVM上,长度为64位。64位JVM如果开启了Oop对象的指针压缩,Array Length字段的长度也将由64位压缩至32位。

Mark Word的结构信息

Java内置锁涉及很多重要信息,这些都存放在对象结构中,并且存放于对象头的Mark Word字段中。Mark Word的位长度为JVM的一个Word大小,也就是说32位JVM的Mark Word为32位,64位JVM为64位。Mark Word的位长度不会受到Oop对象指针压缩选项的影响。
Java内置锁的状态总共有4种,级别由低到高依次为:无锁、偏向锁、轻量级锁和重量级锁。其实在JDK 1.6之前,Java内置锁还是一个重量级锁,是一个效率比较低下的锁,在JDK 1.6之后,JVM为了提高锁的获取与释放效率,对synchronized的实现进行了优化,引入了偏向锁和轻量级锁,从此以后Java内置锁的状态就有了4种(无锁、偏向锁、轻量级锁和重量级锁),并且4种状态会随着竞争的情况逐渐升级,而且是不可逆的过程,即不可降级,也就是说只能进行锁升级(从低级别到高级别)。

不同锁状态下得Mark Word字段结构

Mark Word字段的结构与Java内置锁的状态强相关。为了让Mark Word字段存储更多的信息,JVM将Mark Word最低两个位设置为Java内置锁状态位,不同锁状态下的32位Mark Word结构如下表所示。

64位的Mark Word与32位的Mark Word结构相似,具体如下表所示。

位Mark Word的构成

由于目前主流的JVM都是64位,因此我们使用64位的Mark Word。接下来对64位的Mark Word中各部分的内容进行具体介绍。

  • lock:锁状态标记位,占两个二进制位,由于希望用尽可能少的二进制位表示尽可能多的信息,因此设置了lock标记。该标记的值不同,整个Mark Word表示的含义就不同。
  • biased_lock:对象是否启用偏向锁标记,只占1个二进制位。为1时表示对象启用偏向锁,为0时表示对象没有偏向锁。lock和biased_lock两个标记位组合在一起共同表示Object实例处于什么样的锁状态。二者组合的含义具体如下表所示。 | biased_lock | lock | 状态 | | — | — | — | | 0 | 01 | 无锁 | | 1 | 01 | 偏向锁 | | 0 | 00 | 轻量级锁 | | 0 | 10 | 重量级锁 | | 0 | 11 | GC标记 |
  • age:4位的Java对象分代年龄。在GC中,对象在Survivor区复制一次,年龄就增加1。当对象达到设定的阈值时,将会晋升到老年代。默认情况下,并行GC的年龄阈值为15,并发GC的年龄阈值为6。由于age只有4位,因此最大值为15,这就是-XX:MaxTenuringThreshold选项最大值为15的原因。
  • identity_hashcode:31位的对象标识HashCode(哈希码)采用延迟加载技术,当调用Object.hashCode()方法或者System.identityHashCode()方法计算对象的HashCode后,其结果将被写到该对象头中。当对象被锁定时,该值会移动到Monitor(监视器)中。
  • thread:54位的线程ID值为持有偏向锁的线程ID。
  • epoch:偏向时间戳。
  • ptr_to_lock_record:占62位,在轻量级锁的状态下指向栈帧中锁记录的指针。
  • ptr_to_heavyweight_monitor:占62位,在重量级锁的状态下指向对象监视器的指针。

锁升级

在JDK 1.6版本之前,所有的Java内置锁都是重量级锁。重量级锁会造成CPU在用户态和核心态之间频繁切换,所以代价高、效率低。JDK 1.6版本为了减少获得锁和释放锁所带来的性能消耗,引入了偏向锁和轻量级锁的实现。所以,在JDK 1.6版本中内置锁一共有4种状态:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这些状态随着竞争情况逐渐升级。内置锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能再降级成偏向锁。这种能升级却不能降级的策略,其目的是提高获得锁和释放锁的效率。锁升级过程大致如下:

无锁状态

Java对象刚创建时还没有任何线程来竞争,说明该对象处于无锁状态(无线程竞争它),这时偏向锁标识位是0,锁状态是01。无锁状态下对象的Mark Word如下表所示。

1
2
3
4
5
6
7
8
9
10
11
package cn.bytecollege;
import org.openjdk.jol.info.ClassLayout;
import java.util.logging.Logger;
public class NoneLock {
public static void main(String[] args) throws InterruptedException {
Logger log = Logger.getLogger("NoneLock");
Object o = new Object();
log.info("未进入同步块,MarkWord 为:");
log.info(ClassLayout.parseInstance(o).toPrintable());
}
}

偏向锁状态

偏向锁是指一段同步代码一直被同一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。如果内置锁处于偏向状态,当有一个线程来竞争锁时,先用偏向锁,表示内置锁偏爱这个线程,这个线程要执行该锁关联的同步代码时,不需要再做任何检查和切换。偏向锁在竞争不激烈的情况下效率非常高。偏向锁状态的Mark Word会记录内置锁自己偏爱的线程ID,内置锁会将该线程当作自己的熟人。偏向锁状态下对象的Mark Word下表。

偏向锁的核心原理是:如果不存在线程竞争的一个线程获得了锁,那么锁就进入偏向状态,此时Mark Word的结构变为偏向锁结构,锁对象的锁标志位(lock)被改为01,偏向标志位(biased_lock)被改为1,然后线程的ID记录在锁对象的MarkWord中(使用CAS操作完成)。以后该线程获取锁时判断一下线程ID和标志位,就可以直接进入同步块,连CAS操作都不需要,这样就省去了大量有关锁申请的操作,从而也就提升了程序的性能。所以,在没有锁竞争的场合,偏向锁有很好的优化效果。但是,一旦有第二条线程需要竞争锁,那么偏向模式立即结束,进入轻量级锁的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.bytecollege;
import org.openjdk.jol.info.ClassLayout;
import java.util.logging.Logger;
public class BiasableLock {
public static void main(String[] args) throws InterruptedException {
Thread.sleep(5000);
Logger log = Logger.getLogger("BiasableLock");
Object o = new Object();
log.info("未进入同步块,MarkWord 为:");
log.info(ClassLayout.parseInstance(o).toPrintable());
log.info("------------------------------------");
Thread t1 = new Thread(() -> {
synchronized (o) {
log.info(("t1进入同步块,MarkWord 为:"));
log.info(ClassLayout.parseInstance(o).toPrintable());
}
}, "t1");
t1.start();
}
}

假如在大部分情况下同步块是没有竞争的,那么可以通过偏向来提高性能。即在无竞争时,之前获得锁的线程再次获得锁时会判断偏向锁的线程ID是否指向自己,如果是,那么该线程将不用再次获得锁,直接就可以进入同步块;如果未指向当前线程,当前线程就会采用CAS操作将Mark Word中的线程ID设置为当前线程ID,如果CAS操作成功,那么获取偏向锁成功,执行同步代码块,如果CAS操作失败,那么表示有竞争,抢锁线程被挂起,撤销占锁线程的偏向锁,然后将偏向锁膨胀为轻量级锁。
偏向锁的缺点:如果锁对象时常被多个线程竞争,偏向锁就是多余的,并且其撤销的过程会带来一些性能开销。
需要注意的是Java默认开始偏向锁,但是在JVM启动后延迟4秒才会启用偏向锁,因此在程序开始线程休眠了5秒就是保证偏向锁开启。

轻量级锁状态

当有两个线程开始竞争这个锁对象时,情况就发生变化了,不再是偏向(独占)锁了,锁会升级为轻量级锁,两个线程公平竞争,哪个线程先占有锁对象,锁对象的Mark Word就指向哪个线程的栈帧中的锁记录。轻量级锁状态下对象的MarkWord如下表所示。

当锁处于偏向锁,又被另一个线程企图抢占时,偏向锁就会升级为轻量级锁。企图抢占的线程会通过自旋的形式尝试获取锁,不会阻塞抢锁线程,以便提高性能。
自旋原理非常简单,如果持有锁的线程能在很短时间内释放锁资源,那么那些等待竞争锁的线程就不需要进行内核态和用户态之间的切换来进入阻塞挂起状态,它们只需要等一等(自旋),等持有锁的线程释放锁后即可立即获取锁,这样就避免了用户线程和内核切换的消耗。
但是,线程自旋是需要消耗CPU的,如果一直获取不到锁,那么线程也不能一直占用CPU自旋做无用功,所以需要设定一个自旋等待的最大时间。JVM对于自旋周期的选择,JDK 1.6之后引入了适应性自旋锁,适应性自旋锁意味着自旋的时间不是固定的,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定的。线程如果自旋成功了,下次自旋的次数就会更多,如果自旋失败了,自旋的次数就会减少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.bytecollege;

import org.openjdk.jol.info.ClassLayout;

import java.util.logging.Logger;

public class ThinLock {
public static void main(String[] args) throws InterruptedException {
Thread.sleep(5000);
Logger log = Logger.getLogger("App");
Object o = new Object();
log.info("未进入同步块,MarkWord 为:");
log.info(ClassLayout.parseInstance(o).toPrintable());
log.info("------------------------------------");
Thread t1 = new Thread(() -> {
synchronized (o) {
log.info(("t1进入同步块,MarkWord 为:"));
log.info(ClassLayout.parseInstance(o).toPrintable());
}
}, "t1");
t1.start();
Thread.sleep(1000);
Thread t2 = new Thread(() -> {
synchronized (o) {
log.info(("t2进入同步块,MarkWord 为:"));
log.info(ClassLayout.parseInstance(o).toPrintable());
}
}, "t1");
t2.start();
}
}

如果持有锁的线程执行的时间超过自旋等待的最大时间仍没有释放锁,就会导致其他争用锁的线程在最大等待时间内还是获取不到锁,自旋不会一直持续下去,这时争用线程会停止自旋进入阻塞状态,该锁膨胀为重量级锁。
轻量锁存在的目的是尽可能不动用操作系统层面的互斥锁,因为其性能比较差。线程的阻塞和唤醒需要CPU从用户态转为核心态,频繁地阻塞和唤醒对CPU来说是一件负担很重的工作。同时我们可以发现,很多对象锁的锁定状态只会持续很短的一段时间,例如整数的自加操作,在很短的时间内阻塞并唤醒线程显然不值得,为此引入了轻量级锁。轻量级锁是一种自旋锁,因为JVM本身就是一个应用,所以希望在应用层面上通过自旋解决线程同步问题。
轻量级锁的执行过程:在抢锁线程进入临界区之前,如果内置锁(临界区的同步对象)没有被锁定,JVM首先将在抢锁线程的栈帧中建立一个锁记录(LockRecord),用于存储锁对象当前 Mark Word 的拷贝(官方称为 Displaced Mark Word),owner 指针指向对象的 Mark Word。,这时的线程堆栈与内置锁对象头大致如下图所示:

然后抢锁线程将使用CAS自旋操作,尝试将内置锁对象头的Mark Word的ptr_to_lock_record(锁记录指针)更新为抢锁线程栈帧中锁记录的地址,如果这个更新执行成功了,这个线程就拥有了这个对象锁。然后JVM将Mark Word中的lock标记位改为00(轻量级锁标志),即表示该对象处于轻量级锁状态。抢锁成功之后,JVM会将Mark Word中原来的锁对象信息(如哈希码等)保存在抢锁线程锁记录的Displaced Mark Word(可以理解为放错地方的Mark Word)字段中,再将抢锁线程中锁记录的owner指针指向锁对象。

轻量级锁的分类

轻量级锁主要有两种:普通自旋锁和自适应自旋锁。

普通自旋锁

所谓普通自旋锁,就是指当有线程来竞争锁时,抢锁线程会在原地循环等待,而不是被阻塞,直到那个占有锁的线程释放锁之后,这个抢锁线程才可以获得锁。

自适应自旋锁

所谓自适应自旋锁,就是等待线程空循环的自旋次数并非是固定的,而是会动态地根据实际情况来改变自旋等待的次数,自旋次数由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。自适应自旋锁的大概原理是:(1)如果抢锁线程在同一个锁对象上之前成功获得过锁,JVM就会认为这次自旋很有可能再次成功,因此允许自旋等待持续相对更长的时间。(2)如果对于某个锁,抢锁线程很少成功获得过,那么JVM将可能减少自旋时间甚至省略自旋过程,以避免浪费处理器资源。自适应自旋解决的是“锁竞争时间不确定”的问题。自适应自旋假定不同线程持有同一个锁对象的时间基本相当,竞争程度趋于稳定。总的思想是:根据上一次自旋的时间与结果调整下一次自旋的时间。
注意:JDK 1.6的轻量级锁使用的是普通自旋锁,且需要使用-XX:+UseSpinning选项手工开启。JDK 1.7后,轻量级锁使用自适应自旋锁,JVM启动时自动开启,且自旋时间由JVM自动控制。轻量级锁也被称为非阻塞同步、乐观锁,因为这个过程并没有把线程阻塞挂起,而是让线程空循环等待。

重量级锁状态

在JVM中,每个对象都关联一个监视器,这里的对象包含Object实例和Class实例。监视器是一个同步工具,相当于一个许可证,拿到许可证的线程即可进入临界区进行操作,没有拿到则需要阻塞等待。重量级锁通过监视器的方式保障了任何时间只允许一个线程通过受到监视器保护的临界区代码。
重量级锁会让其他申请的线程之间进入阻塞,性能降低。重量级锁也叫同步锁,这个锁对象Mark Word再次发生变化,会指向一个监视器对象,该监视器对象用集合的形式来登记和管理排队的线程。重量级锁状态下对象的Mark Word如下表所示。

总结起来,锁升级状态如下图所示:

偏向锁、轻量级锁、重量级锁的对比

总结一下synchronized的执行过程,大致如下:

  1. 线程抢锁时,JVM首先检测内置锁对象Mark Word中的biased_lock(偏向锁标识)是否设置成1,lock(锁标志位)是否为01,如果都满足,确认内置锁对象为可偏向状态。
  2. 在内置锁对象确认为可偏向状态之后,JVM检查Mark Word中的线程ID是否为抢锁线程ID,如果是,就表示抢锁线程处于偏向锁状态,抢锁线程快速获得锁,开始执行临界区代码。
  3. 如果Mark Word中的线程ID并未指向抢锁线程,就通过CAS操作竞争锁。如果竞争成功,就将Mark Word中的线程ID设置为抢锁线程,偏向标志位设置为1,锁标志位设置为01,然后执行临界区代码,此时内置锁对象处于偏向锁状态。
  4. 如果CAS操作竞争失败,就说明发生了竞争,撤销偏向锁,进而升级为轻量级锁。
  5. JVM使用CAS将锁对象的Mark Word替换为抢锁线程的锁记录指针,如果成功,抢锁线程就获得锁。如果替换失败,就表示其他线程竞争锁,JVM尝试使用CAS自旋替换抢锁线程的锁记录指针,如果自旋成功(抢锁成功),那么锁对象依然处于轻量级锁状态。
  6. 如果JVM的CAS替换锁记录指针自旋失败,轻量级锁就膨胀为重量级锁,后面等待锁的线程也要进入阻塞状态。

总体来说,偏向锁是在没有发生锁争用的情况下使用的;一旦有了第二个线程争用锁,偏向锁就会升级为轻量级锁;如果锁争用很激烈,轻量级锁的CAS自旋到达阈值后,轻量级锁就会升级为重量级锁。三种内置锁的对比如表所示。

CAS算法

CAS(Compare And Swap)是一种无锁算法,该算法关键依赖两个值——期望值(旧值)和新值,底层CPU利用原子操作判断内存原值与期望值是否相等,如果相等就给内存地址赋新值,否则不做任何操作。
使用CAS进行无锁编程的步骤大致如下:

  1. 获得字段的期望值(oldValue)。
  2. 计算出需要替换的新值(newValue)。
  3. 通过CAS将新值(newValue)放在字段的内存地址上,如果CAS失败就重复第(1)步到第(2)步,一直到CAS成功,这种重复俗称CAS自旋。

锁的概述

乐观锁与悲观锁

乐观锁和悲观锁是在数据库中引入的名词,但是在并发包锁里面也引入了类似的思想。
悲观锁指对数据被外界修改持保守态度,认为数据很容易就会被其他线程修改,所以在数据被处理前先对数据进行加锁,并在整个数据处理过程中,使数据处于锁定状态。悲观锁的实现往往依靠数据库提供的锁机制,即在数据库中,在对数据记录操作前给记录加排它锁。如果获取锁失败,则说明数据正在被其他线程修改,当前线程则等待或者抛出异常。如果获取锁成功,则对记录进行操作,然后提交事务后释放排它锁。在Java中synchronized便是一种悲观锁。
乐观锁是相对悲观锁来说的,它认为数据在一般情况下不会造成冲突,所以在访问记录前不会加排它锁,而是在进行数据提交更新时,才会正式对数据冲突与否进行检测。例如CAS算法就是一种乐观锁。

公平锁与非公平锁

根据线程获取锁的抢占机制,锁可以分为公平锁和非公平锁,公平锁表示线程获取锁的顺序是按照线程请求锁的时间早晚来决定的,也就是最早请求锁的线程将最早获取到锁。而非公平锁则在运行时闯入,也就是先来不一定先得。
ReentrantLock提供了公平和非公平锁的实现。

  • 公平锁:ReentrantLock pairLock = new ReentrantLock(true)。
  • 非公平锁:ReentrantLock pairLock = new ReentrantLock(false)。如果构造函数不传递参数,则默认是非公平锁。

例如,假设线程A已经持有了锁,这时候线程B请求该锁其将会被挂起。当线程A释放锁后,假如当前有线程C也需要获取该锁,如果采用非公平锁方式,则根据线程调度策略,线程B和线程C两者之一可能获取锁,这时候不需要任何其他干涉,而如果使用公平锁则需要把C挂起,让B获取当前锁。
在没有公平性需求的前提下尽量使用非公平锁,因为公平锁会带来性能开销。

JUC容器

Java同步容器类通过Synchronized(内置锁)来实现同步的容器,比如Vector、HashTable以及SynchronizedList等容器。线程安全的同步容器类主要有Vector、Stack、HashTable等。另外,Java还提供了一组包装方法,将一个普通的基础容器包装成一个线程安全的同步容器。例如通过Collections.synchronized包装方法能将一个普通的容器包装成一个线程安全的Sorted同步容器。

什么是JUC容器

JUC基于非阻塞算法(Lock Free,无锁编程)提供了一组高并发容器,包括高并发的List、Set、Queue、Map容器。
JUC高并发容器是基于非阻塞算法(或者无锁编程算法)实现的容器类,无锁编程算法主要通过CAS(Compare And Swap)+Volatile组合实现,通过CAS保障操作的原子性,通过volatile保障变量内存的可见性。无锁编程算法的主要优点如下:

  1. 开销较小:不需要在内核态和用户态之间切换进程。
  2. 读写不互斥:只有写操作需要使用基于CAS机制的乐观锁,读读操作之间可以不用互斥。

JUC包中提供了List、Set、Queue、Map各种类型的高并发容器,如ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList和CopyOnWriteArraySet。在性能上,ConcurrentHashMap通常优于同步的HashMap,ConcurrentSkipListMap通常优于同步的TreeMap。当读取和遍历操作远远大于列表的更新操作时,CopyOnWriteArrayList优于同步的ArrayList。
1.List
JUC包中的高并发List主要有CopyOnWriteArrayList,对应的基础容器为ArrayList。
CopyOnWriteArrayList相当于线程安全的ArrayList,它实现了List接口。在读多写少的场景中,其性能远远高于ArrayList的同步包装容器。
2.Set
JUC包中的Set主要有CopyOnWriteArraySet和ConcurrentSkipListSet。
CopyOnWriteArraySet继承自AbstractSet类,对应的基础容器为HashSet。其内部组合了一个CopyOnWriteArrayList对象,它的核心操作是基于CopyOnWriteArrayList实现的。
ConcurrentSkipListSet是线程安全的有序集合,对应的基础容器为TreeSet。它继承自AbstractSet,并实现了NavigableSet接口。ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的。
3.Map
JUC包中Map主要有ConcurrentHashMap和ConcurrentSkipListMap。
ConcurrentHashMap对应的基础容器为HashMap。JDK 6中的ConcurrentHashMap采用一种更加细粒度的“分段锁”加锁机制,JDK 8中采用CAS无锁算法。
ConcurrentSkipListMap对应的基础容器为TreeMap。其内部的Skip List(跳表)结构是一种可以代替平衡树的数据结构,默认是按照Key值升序的。
4.Queue
JUC包中的Queue的实现类包括三类:单向队列、双向队列和阻塞队列。
ConcurrentLinkedQueue是基于列表实现的单向队列,按照FIFO(先进先出)原则对元素进行排序。新元素从队列尾部插入,而获取队列元素则需要从队列头部获取。
ConcurrentLinkedDeque是基于链表的双向队列,但是该队列不允许null元素。作为双向队列,ConcurrentLinkedDeque可以当作“栈”来使用,并且高效地支持并发环境。
除了提供普通的单向队列、双向队列外,JUC拓展了队列,增加了可阻塞的插入和获取等操作,提供了一组阻塞队列,具体如下:

  • ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列。
  • LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列。
  • PriorityBlockingQueue:按优先级排序的队列。
  • DelayQueue:按照元素的Delay时间进行排序的队列。
  • SynchronousQueue:无缓冲等待队列。

CopyOnWriteArrayList的使用

从源码中可以看出CopyOnWriteArrayList同样继承了List接口,因此CopyOnWriteArrayList的使用和ArrayList基本一致:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package cn.bytecollege;

import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteListDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
//添加元素
for (int i = 0; i < 5; i++) {
//加锁——复制——添加
list.add(i);
}
//删除元素
//加锁
list.remove(2);
//修改
//加锁——复制——修改
list.set(0,200);
//查询
//不加锁
list.get(0);
}
}

CopyOnWriteArrayList的原理

CopyOnWrite(写时复制)就是在修改器对一块内存进行修改时,不直接在原有内存块上进行写操作,而是将内存复制一份,在新的内存中进行写操作,写完之后,再将原来的指针(或者引用)指向新的内存,原来的内存被回收。CopyOnWriteArrayList是写时复制思想的一种典型实现,其含有一个指向操作内存的内部指针array,而可变操作(add、set等)是在array数组的副本上进行的。当元素需要被修改或者增加时,并不直接在array指向的原有数组上操作,而是首先对array进行一次复制,将修改的内容写入复制的副本中。写完之后,再将内部指针array指向新的副本,这样就可以确保修改操作不会影响访问器的读取操作。CopyOnWriteArrayList的原理如图所示。

ConcurrentHashMap (JDK 1.7)原理

JDK 1.7的ConcurrentHashMap的锁机制基于粒度更小的分段锁,分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,分段锁技术将Key分成一个一个小Segment存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。

ConcurrentHashMap的内部结构的组成部分具体如下:
1.HashEntry
HashEntry结构用于存储“Key-Value对”(即“键-值对”)数据,以及存储其后继节点的指针。
2.Segment
ConcurrentHashMap中的一个段称为Segment,Segment继承了ReentrantLock,所以一个段又是一个ReentrantLock。Segment内部拥有一个HashEntry数组类型的成员table,数组中的每个元素又是一个链表,这个由HashEntry链接起来的链表对应一个哈希表的桶,也就是说,table的一个元素对应哈希表的一个桶。Segment在ConcurrentHashMap中扮演锁的角色,每个Segment守护着一个HashEntry数组中的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
3.ConcurrentHashMap
ConcurrentHashMap在默认并发级别会创建包含16个Segment对象的数组,每个Segment大约守护整个哈希表中桶总数的1/16,其中第N个哈希桶由第N mod 16个锁来保护。假设使用合理的哈希算法使关键字能够均匀地分部,那么这大约能使对锁的请求减少到原来的1/16。默认并发级别的情况下,ConcurrentHashMap支持多达16个并发的写入线程。

ConCurrentHashMap核心源码解析

1.成员变量

1
2
3
4
5
6
7
8
9
10
11
12
//segment默认初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//默认线程并发数
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//HashEntry数组最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//Segment数组中HashEntry数组的最小容量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//
static final int MAX_SEGMENTS = 1 << 16;
//加锁前的重试次数
static final int RETRIES_BEFORE_LOCK = 2;

2.构造方法
ConcurrentHashMap构造器和HashMap类似,都调用了重载的构造器,此处不一一讲解,只讲解被调用的核心构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//判断传入的参数是否合法,如果不合法
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//如果最大并发数超出最大并发数则赋值为最大并发数
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
//左移ssize使它成为2的N次方,concurrencyLevel是16,因此ssift是4
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift和segmentMask都是用于定位segment和HashEntry数组中的元素
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//计算每个segment数组的大小
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//使得HashEntry[]的最小容量为2,作用是让segment添加第二个元素时才扩容
while (cap < c)
cap <<= 1;
// 创建segment数组,并创建segment[0]加入数组中
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
//Segment构造方法
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}

总结上述代码可以看出在构造方法内主要做了以下4件事情:

  1. 创建了Segment数组,初始长度为16,并且不可扩容
  2. 初始化Segment数组中HashEntry数组,初始长度为2,因为负载因子是0.75,因此扩容的阈值的1.5,也就是说在HashEntry数组中添加第一个元素时并不会扩容,添加第二个元素时才会扩容。
  3. 初始化Segment[0],其他位置还是null。
  4. 计算处segmentShift值为32-4=8,segmentMask为16-1=15

3.put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1. 计算 key 的 hash 值
int hash = hash(key);
// 2. 根据 hash 值找到 Segment 数组中的位置 j
// hash 是 32 位,无符号右移 segmentShift(28) 位,剩下高 4 位,
// 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的高 4 位,也就是槽的数组下标
int j = (hash >>> segmentShift) & segmentMask;
// 由于初始化的时候初始化了 segment[0],其他位置还是 null,
// ensureSegment(j) 对 segment[j] 进行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 3. 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}

此处的put()方法较为简单,即通过计算得出对应位置的Segment,并在Segment内部进行put操作。接下来查看在Segment中put的流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//获取Segment独占锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//获取Segment内部HashEntry数组
HashEntry<K,V>[] tab = table;
//利用key的hash计算存放的位置
int index = (tab.length - 1) & hash;
//获取链表的头结点
HashEntry<K,V> first = entryAt(tab, index);
//遍历链表
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//如果遍历过程中发现存放元素的key和链表中已有元素的key相等,则直接覆盖
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//如果不为null,则直接将元素设置为表头,如果是null,初始化并设置为链表表头
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//如果超过了该Segment中HashEntry的阈值,则对HashEntry扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//如果没有达到,则放置到tab的index位置
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
//解锁
unlock();
}
return oldValue;
}

ConcurrentHashMap (JDK 1.8)原理

JDK 1.8对ConcurrentHashMap的内部结构进行了改进,改采用数组+链表或红黑树来实现,但是从Segment(分段锁)技术角度来说,其原理是类似的。
JDK 1.7的ConcurrentHashMap为了进行并发热点的分离,默认情况下将一个table分裂成16个小的table(Segment表示),从而在Segment维度进行比较细粒度的并发控制。实际上,如果并发线程多,这种粒度还是不够细。所以,JDK 1.8的ConcurrentHashMap将并发控制的粒度进一步细化,也就是进一步进行并发热点的分离,将并发粒度细化到每一个桶。既然如此,比较粗粒度的Segment已经没有存在的必要,每一个桶已经变化成实质意义的Segment,所以该结构直接被丢弃。
JDK 1.7的ConcurrentHashMap每一个桶都为链表结构,为了提升节点的访问性能,JDK 1.8引入了红黑树的结构,当桶的节点数超过一定的阈值(默认为64)时,JDK 1.8将链表结构自动转换成红黑树的结构,可以理解为将链式桶转换成树状桶。
ConcurrentHashMap的内部结构的层次关系为ConcurrentHashMap→链式桶/树状桶。这样设计的好处在于,每次访问的时候只需对一个桶进行锁定,而不需要将整个Map集合都进行粗粒度的锁定。
JDK 1.8的ConcurrentHashMap引入红黑树的原因是:链表查询的时间复杂度为O(n),红黑树查询的时间复杂度为O(log(n)),所以在节点比较多的情况下,使用红黑树可以大大提升性能。
一个JDK 1.8版本ConcurrentHashMap实例的内部结构示例如图所示。从图示来看ConcurrentHashMap的数据结构和HashMap的基本一致。

1.Node
此结构为ConcurrentHashMap的核心内部类,它包装了“Key-Value对”,所有插入ConcurrentHashMap的数据都包装在其中。
2.TreeBin(Node子类)
当数据链表(链式桶)长度大于8时,会转换为TreeBin(树状桶)。TreeBin作为根节点,可以认为是红黑树对象。在ConcurrentHashMap的table“数组”中,存放的就是TreeBin对象,而不是TreeNode对象。
3.TreeNode
树状桶节点类
JDK 1.8版本的ConcurrentHashMap中通过一个Node<K,V>[]数组table来保存添加到哈希表中的桶,而在同一个Bucket位置是通过链表和红黑树的形式来保存的。但是数组table是懒加载的,只有在第一次添加元素的时候才会初始化。

ConcurrentHashMap核心源码解析

1.成员变量
JDK 1.8版本ConcurrentHashMap的主要成员属性大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认容量
private static final int DEFAULT_CAPACITY = 16;
//最大数组容量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//最大并发数
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//链表转红黑树阈值
static final int TREEIFY_THRESHOLD = 8;
//红黑树蜕化会为链表的的节点数阈值
static final int UNTREEIFY_THRESHOLD = 6;
//链表转红黑树的数组容量阈值
static final int MIN_TREEIFY_CAPACITY = 64;
//常量:表示正则转移
static final int MOVED = -1; // hash for forwarding nodes
//常量:表示已经转换为树
static final int TREEBIN = -2; // hash for roots of trees
//用于保存元素的数组
private transient volatile Node<K,V>[] nextTable;
//用于控制表初始化和扩容的控制属性
private transient volatile int sizeCtl;
  1. table:table用于保存添加到哈希表中的桶。
  2. DEFAULT_CAPACITY:table的默认长度。默认初期长度为16,在第一次添加元素时,会将table初始化成16个元素的数组。
  3. MIN_TREEIFY_CAPACITY:链式桶转成红黑树桶的阈值。在增加“Key-Value对”时,当链表长度大于该值时,将链表转换成红黑树。
  4. UNTREEIFY_THRESHOLD:红黑树桶还原回链式桶的阈值,也就是红黑树转为链表的阈值,当在容量变动时重新计算存储位置后,当原有的红黑树内节点数量小于6时,将红黑树转换成链表。
  5. MIN_TREEIFY_CAPACITY:链式桶转换成红黑树桶还有一个要求,table的容量达到最小树形化容量的阈值,只有当哈希表中的table容量大于该值时,才允许树将链表转换成红黑树的操作。否则,尽管单个桶内的元素太多,仍然选择直接扩容,而不是将桶树形化。
  6. sizeCtl:sizeCtl用来控制table的初始化和扩容操作的过程,其值大致如下:
    1. -1代表table正在初始化,其他线程应该交出CPU时间片
    2. -N表示有N-1个线程正在进行扩容操作,严格来说,当其为负数时,只用到其低16位,如果其低16位数值为M,此时有M-1个线程进行扩容。
    3. 大于0分两种情况:如果table未初始化,sizeCtl表示table需要初始化的大小;如果table初始化完成,sizeCtl表示table的容量,默认是table大小的0.75倍。

2.构造器

1
2
3
// 这构造函数里,什么都不干
public ConcurrentHashMap() {
}
1
2
3
4
5
6
7
8
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

该构造器相对比较简单,主要作用就是计算初始容量及sizeCtl,其中sizeCtl=(1.5+initialCapacity+1),目的是向上去最近的2的n次方。如果initialCapacity为10,则sizeCtl为16。
3.put()

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 得到 hash 值
int hash = spread(key.hashCode());
// 用于记录相应链表的长度
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组"空",进行数组初始化
if (tab == null || (n = tab.length) == 0)
// 初始化数组
tab = initTable();

// 找该 hash 值对应的数组下标,得到第一个节点 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果数组该位置为空,
// 用一次 CAS 操作将这个新值放入其中即可
// 如果 CAS 失败,那就是有并发操作,进到下一个循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash可以等于 MOVED,此处需要到后面才能看明白,从字面意思也可以看出,肯定是因为在扩容
else if ((fh = f.hash) == MOVED)
// 帮助数据迁移
tab = helpTransfer(tab, f);

else {
// 代码执行到此处,说明f 是该位置的头结点,而且不为空
V oldVal = null;
// 获取数组该位置的头结点的监视器锁
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
// 用于累加,记录链表的长度
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了链表的最末端,将这个新值放到链表的最后面
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 红黑树
Node<K,V> p;
binCount = 2;
// 调用红黑树的插值方法插入新节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

if (binCount != 0) {
// 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
if (binCount >= TREEIFY_THRESHOLD)
// 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,
// 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
// 具体源码我们就不看了,扩容部分后面说
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//
addCount(1L, binCount);
return null;
}

从代码可以看出,ConcurrentHashMap的put的流程基本和HashMap一致,只是在ConcurrentHashMap中通过CAS和Synchronized来保证线程安全。

16.7.6 ConCurrentHashMap 1.7和1.8的区别

两者主要区别如下:

  1. 数据结构不同:Java7中使用了Segment+HashEntry实现,Java8中则使用哈希表+链表+红黑树实现
  2. 加锁方式不同:Java7中使用了分段锁,但是锁力度相对较大(因为锁住的是整个HashEntry数组),Java8中使用了Synchronized和CAS代替分段锁,这样降低了锁粒度,并且只有在CAS失败以后才尝试加锁。
  3. 初始化数组时,在Java8中不需要加锁,因为使用了sizeCtl作为标志位,当其值为-1时,则表明正则有线程初始化。
  4. 在Java7中寻找元素需要2次定位,第一次定位到Segment数组元素,第二次定位到HashEntry数组。