Java并发编程面试三
Java 线程池详解
使用线程池的好处
池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
Executor 框架
简介
Executor
框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor
来启动线程比使用 Thread
的 start
方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。
补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
Executor
框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor
框架让并发编程变得更加简单。
Executor 框架结构(主要由三大部分组成)
任务(Runnable
/Callable
)
执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
任务的执行(Executor
)
如下图所示,包括任务执行机制的核心接口 Executor ,以及继承自 Executor
接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。
这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。
注意: 通过查看
ScheduledThreadPoolExecutor
源代码我们发现ScheduledThreadPoolExecutor
实际上是继承了ThreadPoolExecutor
并实现了 ScheduledExecutorService ,而ScheduledExecutorService
又实现了ExecutorService
,正如我们下面给出的类关系图显示的一样。
ThreadPoolExecutor 类描述:
1 | //AbstractExecutorService实现了ExecutorService接口 |
ScheduledThreadPoolExecutor 类描述:
1 | //ScheduledExecutorService继承ExecutorService接口 |
异步计算的结果(Future
)
Future 接口以及 Future
接口的实现类 FutureTask 类都可以代表异步计算的结果。
当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit()
方法时会返回一个 FutureTask 对象)
Executor 框架的使用示意图
- 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
- 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行:
ExecutorService.execute(Runnable command)
)或者也可以把Runnable
对象或Callable
对象提交给ExecutorService
执行(ExecutorService.submit(Runnable task)
或ExecutorService.submit(Callable <T> task)
)。 - 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行
execute()
方法和submit()
方法的区别,submit()
会返回一个FutureTask 对象)。由于 FutureTask
实现了Runnable
,我们也可以创建FutureTask
,然后直接交给ExecutorService
执行。 - 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
(重要)ThreadPoolExecutor 类简单介绍
线程池实现类 ThreadPoolExecutor 是 Executor 框架最核心的类。
ThreadPoolExecutor 类分析
ThreadPoolExecutor
类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。
1 | /** |
下面这些对创建非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。
ThreadPoolExecutor 3 个最重要的参数:
- corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
- maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor
其他常见参数 :
- keepAliveTime:当线程池中的线程数量大于
corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁; - unit :
keepAliveTime
参数的时间单位。 - threadFactory :executor 创建新线程的时候会用到。
- handler :饱和策略。关于饱和策略下面单独介绍一下。
下面这张图可以加深你对线程池中各个参数的相互关系的理解(图片来源:《Java 性能调优实战》):
ThreadPoolExecutor 饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor
定义一些策略:
- ThreadPoolExecutor.AbortPolicy :抛出
RejectedExecutionException
来拒绝新任务的处理。 - ThreadPoolExecutor.CallerRunsPolicy :调用执行自己的线程运行任务,也就是直接在调用
execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。 - ThreadPoolExecutor.DiscardPolicy :不处理新任务,直接丢弃掉。
- ThreadPoolExecutor.DiscardOldestPolicy : 此策略将丢弃最早的未处理的任务请求。
举个例子:
Spring 通过
ThreadPoolTaskExecutor
或者我们直接通过ThreadPoolExecutor
的构造函数创建线程池的时候,当我们不指定RejectedExecutionHandler
饱和策略的话来配置线程池的时候默认使用的是ThreadPoolExecutor.AbortPolicy
。在默认情况下,ThreadPoolExecutor
将抛出RejectedExecutionException
来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用ThreadPoolExecutor.CallerRunsPolicy
。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看ThreadPoolExecutor
的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。)
推荐使用 ThreadPoolExecutor
构造函数创建线程池
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
为什么呢?
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors
去创建,而是通过 ThreadPoolExecutor
构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors
返回线程池对象的弊端如下(后文会详细介绍到):
- FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为
Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。- CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为
Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
方式一:通过ThreadPoolExecutor构造函数实现(推荐)
方式二:通过 Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor
:
FixedThreadPool
SingleThreadExecutor
- CachedThreadPool
对应 Executors 工具类中的方法如图所示:
ThreadPoolExecutor 使用+原理分析
我们上面讲解了 Executor
框架以及 ThreadPoolExecutor
类,下面让我们实战一下,来通过写一个 ThreadPoolExecutor
的小 Demo 来回顾上面的内容。
示例代码:Runnable
+ThreadPoolExecutor
首先创建一个 Runnable
接口的实现类(当然也可以是 Callable
接口,我们上面也说了两者的区别。)
MyRunnable.java
1 | import java.util.Date; |
编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor
构造函数自定义参数的方式来创建线程池。
ThreadPoolExecutorDemo.java
1 | import java.util.concurrent.ArrayBlockingQueue; |
可以看到我们上面的代码指定了:
corePoolSize
: 核心线程数为 5。maximumPoolSize
:最大线程数 10keepAliveTime
: 等待时间为 1L。unit
: 等待时间的单位为 TimeUnit.SECONDS。workQueue
:任务队列为ArrayBlockingQueue
,并且容量为 100;handler
:饱和策略为CallerRunsPolicy
。
Output:
1 | pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020 |
线程池原理分析
承接 4.1 节,我们通过代码输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)
现在,我们就分析上面的输出内容来简单分析一下线程池原理。
为了搞懂线程池的原理,我们需要首先分析一下 execute方法。 在 4.1 节中的 Demo 中我们使用 executor.execute(worker)
来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:
1 | // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) |
通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。
addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。
1 | // 全局锁,并发操作必备 |
更多关于线程池源码分析的内容推荐这篇文章:硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理open in new window
现在,让我们在回到 4.1 节我们写的 Demo, 现在应该是不是很容易就可以搞懂它的原理了呢?
没搞懂的话,也没关系,可以看看我的分析:
我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。
几个常见的对比
Runnable
vs Callable
Runnable
自 Java 1.0 以来一直存在,但Callable
仅在 Java 1.5 中引入,目的就是为了来处理Runnable
不支持的用例。Runnable 接口不会返回结果或抛出检查异常,但是 Callable 接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。
工具类 Executors
可以实现将 Runnable
对象转换成 Callable
对象。(Executors.callable(Runnable task)
或 Executors.callable(Runnable task, Object result)
)。
Runnable.java
1 |
|
Callable.java
1 |
|
execute()
vs submit()
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;submit()
方法用于提交需要返回值的任务。线程池会返回一个Future
类型的对象,通过这个Future
对象可以判断任务是否执行成功,并且可以通过Future
的get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)
方法的话,如果在timeout
时间内任务还没有执行完,就会抛出java.util.concurrent.TimeoutException
。
这里只是为了演示使用,推荐使用 ThreadPoolExecutor
构造方法来创建线程池。
示例1:使用 get()
方法获取返回值。
1 | ExecutorService executorService = Executors.newFixedThreadPool(3); |
输出:
1 | abc |
示例2:使用 get(long timeout,TimeUnit unit)
方法获取返回值。
1 | ExecutorService executorService = Executors.newFixedThreadPool(3); |
输出:
1 | Exception in thread "main" java.util.concurrent.TimeoutException |
shutdown()
VSshutdownNow()
- shutdown() :关闭线程池,线程池的状态变为
SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。 - shutdownNow() :关闭线程池,线程的状态变为
STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。
isTerminated()
VS isShutdown()
- isShutDown 当调用
shutdown()
方法后返回为 true。 - isTerminated 当调用
shutdown()
方法后,并且所有提交的任务完成后返回为 true
加餐:Callable
+ThreadPoolExecutor
示例代码
MyCallable.java
1 | import java.util.concurrent.Callable; |
CallableDemo.java
1 | import java.util.ArrayList; |
Output:
1 | Wed Nov 13 13:40:41 CST 2019::pool-1-thread-1 |
几种常见的线程池详解
FixedThreadPool
介绍
FixedThreadPool
被称为可重用固定线程数的线程池。通过 Executors 类中的相关源代码来看一下相关实现:
1 | /** |
另外还有一个 FixedThreadPool
的实现方法,和上面的类似,所以这里不多做阐述:
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
从上面源代码可以看出新创建的 FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。
执行任务过程介绍
FixedThreadPool
的 execute()
方法运行示意图(该图片来源:《Java 并发编程的艺术》):
上图说明:
- 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
- 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入
LinkedBlockingQueue
; - 线程池中的线程执行完 手头的任务后,会在循环中反复从
LinkedBlockingQueue
中获取任务来执行;
为什么不推荐使用FixedThreadPool
?
FixedThreadPool 使用无界队列 LinkedBlockingQueue(队列的容量为 Integer.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响 :
- 当线程池中的线程数达到
corePoolSize
后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize; - 由于使用无界队列时
maximumPoolSize
将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建FixedThreadPool
的源码可以看出创建的FixedThreadPool
的corePoolSize
和maximumPoolSize
被设置为同一个值。 - 由于 1 和 2,使用无界队列时
keepAliveTime
将是一个无效参数; - 运行中的
FixedThreadPool
(未执行shutdown()
或shutdownNow()
)不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。
SingleThreadExecutor 详解
介绍
SingleThreadExecutor
是只有一个线程的线程池。下面看看SingleThreadExecutor 的实现:
1 | /** |
1 | public static ExecutorService newSingleThreadExecutor() { |
从上面源代码可以看出新创建的 SingleThreadExecutor
的 corePoolSize
和 maximumPoolSize
都被设置为 1.其他参数和 FixedThreadPool
相同。
执行任务过程介绍
SingleThreadExecutor
的运行示意图(该图片来源:《Java 并发编程的艺术》):
上图说明 :
- 如果当前运行的线程数少于
corePoolSize
,则创建一个新的线程执行任务; - 当前线程池中有一个运行的线程后,将任务加入
LinkedBlockingQueue
- 线程执行完当前的任务后,会在循环中反复从
LinkedBlockingQueue
中获取任务来执行;
为什么不推荐使用SingleThreadExecutor
?
SingleThreadExecutor
使用无界队列 LinkedBlockingQueue
作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor
使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool
相同。说简单点就是可能会导致 OOM,
CachedThreadPool 详解
介绍
CachedThreadPool
是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool
的实现:
1 | /** |
1 | public static ExecutorService newCachedThreadPool() { |
CachedThreadPool
的corePoolSize
被设置为空(0),maximumPoolSize
被设置为 Integer.MAX.VALUE
,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。
执行任务过程介绍
CachedThreadPool
的 execute()
方法的执行示意图(该图片来源:《Java 并发编程的艺术》):
上图说明:
- 首先执行
SynchronousQueue.offer(Runnable task)
提交任务到任务队列。如果当前maximumPool
中有闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那么主线程执行 offer 操作与空闲线程执行的poll
操作配对成功,主线程把任务交给空闲线程执行,execute()
方法执行完成,否则执行下面的步骤 2; - 当初始
maximumPool
为空,或者maximumPool
中没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种情况下,步骤 1 将失败,此时CachedThreadPool
会创建新线程执行任务,execute 方法执行完成;
为什么不推荐使用CachedThreadPool
?
CachedThreadPool
允许创建的线程数量为 Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
[ScheduledThreadPoolExecutor 详解
ScheduledThreadPoolExecutor 主要用来在给定的延迟后运行任务,或者定期执行任务。 这个在实际项目中基本不会被用到,也不推荐使用,大家只需要简单了解一下它的思想即可。
简介
ScheduledThreadPoolExecutor
使用的任务队列 DelayQueue
封装了一个 PriorityQueue
,PriorityQueue
会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask
的 time
变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask
的 squenceNumber
变量小的先执行)。
ScheduledThreadPoolExecutor 和 Timer 的比较:
Timer
对系统时钟的变化敏感,ScheduledThreadPoolExecutor
不是;Timer
只有一个执行线程,因此长时间运行的任务可以延迟其他任务。ScheduledThreadPoolExecutor
可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;- 在
TimerTask
中抛出的运行时异常会杀死一个线程,从而导致Timer
死机:-( …即计划任务将不再运行。ScheduledThreadExecutor
不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute
方法ThreadPoolExecutor
)。抛出异常的任务将被取消,但其他任务将继续运行。
综上,在 JDK1.5 之后,你没有理由再使用 Timer 进行任务调度了。
关于定时任务的详细介绍,小伙伴们可以在 JavaGuide 的项目首页搜索“定时任务”找到对应的原创内容。
运行机制
ScheduledThreadPoolExecutor 的执行主要分为两大部分:
- 当调用
ScheduledThreadPoolExecutor
的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay() 方法时,会向ScheduledThreadPoolExecutor
的 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask 。 - 线程池中的线程从
DelayQueue
中获取ScheduledFutureTask
,然后执行任务。
ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor做了如下修改:
- 使用 DelayQueue 作为任务队列;
- 获取任务的方不同
- 执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor 执行周期任务的步骤
- 线程 1 从
DelayQueue
中获取已到期的ScheduledFutureTask(DelayQueue.take())
。到期任务是指ScheduledFutureTask
的 time 大于等于当前系统的时间; - 线程 1 执行这个
ScheduledFutureTask
; - 线程 1 修改
ScheduledFutureTask
的 time 变量为下次将要被执行的时间; - 线程 1 把这个修改 time 之后的
ScheduledFutureTask
放回DelayQueue
中(DelayQueue.add()
)。
线程池大小确定
线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换:
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于现实世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。
但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
- CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
参考
- 《Java 并发编程的艺术》
- Java Scheduler ScheduledExecutorService ScheduledThreadPoolExecutor Exampleopen in new window
- java.util.concurrent.ScheduledThreadPoolExecutor Exampleopen in new window
- ThreadPoolExecutor – Java Thread Pool Exampleopen in new window
其他推荐阅读
Java 线程池最佳实践
这篇文章篇幅虽短,但是绝对是干货。标题稍微有点夸张,嘿嘿,实际都是自己使用线程池的时候总结的一些个人感觉比较重要的点。
线程池知识回顾
开始这篇文章之前还是简单介绍线程池,之前写的《新手也能看懂的线程池学习总结》这篇文章介绍的很详细了。
为什么要使用线程池?
池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池在实际项目的使用场景
线程池一般用于执行多个不相关联的耗时任务,没有多线程的情况下,任务顺序执行,使用了线程池的话可让多个不相关联的任务同时执行。
假设我们要执行三个不相关的耗时任务,Guide 画图给大家展示了使用线程池前后的区别。
注意:下面三个任务可能做的是同一件事情,也可能是不一样的事情。
使用多线程前应为:任务 1 –> 任务 2 –> 任务 3(图中把任务 3 画错为 任务 2)
如何使用线程池?
一般是通过 ThreadPoolExecutor
的构造函数来创建线程池,然后提交任务给线程池执行就可以了。
ThreadPoolExecutor
构造函数如下:
1 | /** |
简单演示一下如何使用线程池,更详细的介绍,请看:《新手也能看懂的线程池学习总结》open in new window 。
1 | private static final int CORE_POOL_SIZE = 5; |
控制台输出:
1 | CurrentThread name:pool-1-thread-5date:2020-06-06T11:45:31.639Z |
线程池最佳实践
简单总结一下我了解的使用线程池的时候应该注意的东西,网上似乎还没有专门写这方面的文章。
因为 Guide 还比较菜,有补充和完善的地方,可以在评论区告知或者在微信上与我交流。
使用 ThreadPoolExecutor
的构造函数声明线程池
1. 线程池必须手动通过 ThreadPoolExecutor 的构造函数来声明,避免使用Executors 类的 newFixedThreadPool 和 newCachedThreadPool ,因为可能会有 OOM 的风险。
Executors 返回线程池对象的弊端如下:
- FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为
Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。- CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为
Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
说白了就是:使用有界队列,控制线程创建数量。
除了避免 OOM 的原因之外,不推荐使用 Executors
提供的两种快捷的线程池的原因还有:
- 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
- 我们应该显示地给我们的线程池命名,这样有助于我们定位问题。
监测线程池运行状态
你可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件。
除此之外,我们还可以利用 ThreadPoolExecutor
的相关 API 做一个简陋的监控。从下图可以看出, ThreadPoolExecutor
提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等。
下面是一个简单的 Demo。printThreadPoolStatus()
会每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。
1 | /** |
建议不同类别的业务用不同的线程池
很多人在实际项目中都会有类似这样的问题:我的项目中多个业务需要用到线程池,是为每个线程池都定义一个还是说定义一个公共的线程池呢?
一般建议是不同的业务使用不同的线程池,配置线程池的时候根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,重心优化系统性能瓶颈相关的业务。
我们再来看一个真实的事故案例! (本案例来源自:《线程池运用不当的一次线上事故》open in new window ,很精彩的一个案例)
上面的代码可能会存在死锁的情况,为什么呢?画个图给大家捋一捋。
试想这样一种极端情况:假如我们线程池的核心线程数为 n,父任务(扣费任务)数量为 n,父任务下面有两个子任务(扣费任务下的子任务),其中一个已经执行完成,另外一个被放在了任务队列中。由于父任务把线程池核心线程资源用完,所以子任务因为无法获取到线程资源无法正常执行,一直被阻塞在队列中。父任务等待子任务执行完成,而子任务等待父任务释放线程池资源,这也就造成了 **”死锁”**。
解决方法也很简单,就是新增加一个用于执行子任务的线程池专门为其服务。
别忘记给线程池命名
初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。
默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。
给线程池里的线程命名通常有下面两种方式:
**1.利用 guava 的 ThreadFactoryBuilder
**
1 | ThreadFactory threadFactory = new ThreadFactoryBuilder() |
2.自己实现 ThreadFactor。
1 | import java.util.concurrent.Executors; |
正确配置线程池参数
说到如何给线程池配置参数,美团的骚操作至今让我难忘(后面会提到)!
我们先来看一下各种书籍和博客上一般推荐的配置线程池参数的方式,可以作为参考!
常规操作
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换:
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
- 如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的,CPU 根本没有得到充分利用。
- 如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
- CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
🌈 拓展一下(参见:issue#1737open in new window):
线程数更严谨的计算的方法应该是:
最佳线程数 = N(CPU 核心数)∗(1+WT(线程等待时间)/ST(线程计算时间))
,其中WT(线程等待时间)=线程运行总时间 - ST(线程计算时间)
。线程等待时间所占比例越高,需要越多线程。线程计算时间所占比例越高,需要越少线程。
我们可以通过 JDK 自带的工具 VisualVM 来查看
WT/ST
比例。CPU 密集型任务的
WT/ST
接近或者等于 0,因此, 线程数可以设置为 N(CPU 核心数)∗(1+0)= N,和我们上面说的 N(CPU 核心数)+1 差不多。IO 密集型任务下,几乎全是线程等待时间,从理论上来说,你就可以将线程数设置为 2N(按道理来说,WT/ST 的结果应该比较大,这里选择 2N 的原因应该是为了避免创建过多线程吧)。
公示也只是参考,具体还是要根据项目实际线上运行情况来动态调整。我在后面介绍的美团的线程池参数动态配置这种方案就非常不错,很实用!
美团的骚操作
美团技术团队在《Java 线程池实现原理及其在美团业务中的实践》open in new window这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。
美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。这三个核心参数是:
- corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
- maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
为什么是这三个参数?
我在这篇《新手也能看懂的线程池学习总结》open in new window 中就说过这三个参数是 ThreadPoolExecutor
最重要的参数,它们基本决定了线程池对于任务的处理策略。
如何支持参数动态配置? 且看 ThreadPoolExecutor
提供的下面这些方法。
格外需要注意的是corePoolSize
, 程序运行期间的时候,我们调用 setCorePoolSize()
这个方法的话,线程池会首先判断当前工作线程数是否大于corePoolSize
,如果大于的话就会回收工作线程。
另外,你也看到了上面并没有动态指定队列长度的方法,美团的方式是自定义了一个叫做 ResizableCapacityLinkedBlockIngQueue
的队列(主要就是把LinkedBlockingQueue
的 capacity 字段的 final 关键字修饰给去掉了,让它变为可变的)。
最终实现的可动态修改线程池参数效果如下。
还没看够?推荐 why 神的《如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答。》open in new window这篇文章,深度剖析,很不错哦!
Java 常见并发容器总结
JDK 提供的这些容器大部分在 java.util.concurrent
包中。
- ConcurrentHashMap : 线程安全的
HashMap
- CopyOnWriteArrayList : 线程安全的
List
,在读多写少的场合性能非常好,远远好于Vector
。 - ConcurrentLinkedQueue : 高效的并发队列,使用链表实现。可以看做一个线程安全的
LinkedList
,这是一个非阻塞队列。 - BlockingQueue : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。
- ConcurrentSkipListMap : 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找。
ConcurrentHashMap
我们知道 HashMap
不是线程安全的,在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap()
方法来包装我们的 HashMap
。但这是通过使用一个全局的锁来同步不同线程间的并发访问,因此会带来不可忽视的性能问题。
所以就有了 HashMap
的线程安全版本—— ConcurrentHashMap
的诞生。
在 ConcurrentHashMap
中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。
CopyOnWriteArrayList
CopyOnWriteArrayList 简介
1 | public class CopyOnWriteArrayList<E> |
在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List
的内部数据,毕竟读取操作是安全的。
这和我们之前在多线程章节讲过 ReentrantReadWriteLock
读写锁的思想非常类似,也就是读读共享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList
类比相比于在读写锁的思想又更进一步。为了将读取的性能发挥到极致,CopyOnWriteArrayList
读取是完全不用加锁的,并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操作的性能就会大幅度提升。那它是怎么做的呢?
CopyOnWriteArrayList 是如何做到的?
CopyOnWriteArrayList
类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。
从 CopyOnWriteArrayList
的名字就能看出 CopyOnWriteArrayList
是满足 CopyOnWrite
的。所谓 CopyOnWrite
也就是说:在计算机,如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉了。
CopyOnWriteArrayList 读取和写入源码简单分析
CopyOnWriteArrayList 读取操作的实现
读取操作没有任何同步控制和锁操作,理由就是内部数组 array
不会发生修改,只会被另外一个 array
替换,因此可以保证数据安全。
1 | /** The array, accessed only via getArray/setArray. */ |
CopyOnWriteArrayList 写入操作的实现
CopyOnWriteArrayList
写入操作 add()
方法在添加集合的时候加了锁,保证了同步,避免了多线程写的时候会 copy 出多个副本出来。
1 | /** |
ConcurrentLinkedQueue
Java 提供的线程安全的 Queue
可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue
,非阻塞队列的典型例子是 ConcurrentLinkedQueue
,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。
从名字可以看出,ConcurrentLinkedQueue
这个队列使用链表作为其数据结构.ConcurrentLinkedQueue
应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。
ConcurrentLinkedQueue
内部代码我们就不分析了,大家知道 ConcurrentLinkedQueue
主要使用 CAS 非阻塞算法来实现线程安全就好了。
ConcurrentLinkedQueue
适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue
来替代。
BlockingQueue
BlockingQueue 简介
上面我们己经提到了 ConcurrentLinkedQueue
作为高性能的非阻塞队列。下面我们要讲到的是阻塞队列——BlockingQueue
。阻塞队列(BlockingQueue
)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue
提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
BlockingQueue
是一个接口,继承自 Queue
,所以其实现类也可以作为 Queue
的实现来使用,而 Queue
又继承自 Collection
接口。下面是 BlockingQueue
的相关实现类:
下面主要介绍一下 3 个常见的 BlockingQueue
的实现类:ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
。
ArrayBlockingQueue
ArrayBlockingQueue
是 BlockingQueue
接口的有界队列实现类,底层采用数组来实现。
1 | public class ArrayBlockingQueue<E> |
ArrayBlockingQueue
一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock
,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
ArrayBlockingQueue
默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue
。而非公平性则是指访问 ArrayBlockingQueue
的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue
可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue
。如果保证公平性,通常会降低吞吐量。如果需要获得公平性的 ArrayBlockingQueue
,可采用如下代码:
1 | private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true); |
LinkedBlockingQueue
LinkedBlockingQueue
底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue
相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue
容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue
对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE
。
相关构造方法:
1 | /** |
PriorityBlockingQueue
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo()
方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator
来指定排序规则。
PriorityBlockingQueue
并发控制采用的是可重入锁 ReentrantLock
,队列为无界队列(ArrayBlockingQueue
是有界队列,LinkedBlockingQueue
也可以通过在构造函数中传入 capacity
指定队列最大的容量,但是 PriorityBlockingQueue
只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。
简单地说,它就是 PriorityQueue
的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException
异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
推荐文章: 《解读 Java 并发队列 BlockingQueue》open in new window
ConcurrentSkipListMap
下面这部分内容参考了极客时间专栏《数据结构与算法之美》open in new window以及《实战 Java 高并发程序设计》。
为了引出 ConcurrentSkipListMap
,先带着大家简单理解一下跳表。
对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。
跳表的本质是同时维护了多个链表,并且链表是分层的,
最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集。
跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素 18。
查找 18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显。
从上面很容易看出,跳表是一种利用空间换时间的算法。
使用跳表实现 Map
和使用哈希算法实现 Map
的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListMap
。
参考
- 《实战 Java 高并发程序设计》
- https://javadoop.com/post/java-concurrent-queue
- https://juejin.im/post/5aeebd02518825672f19c546
AQS 详解
开始之前,先来看几道常见的面试题!建议你带着这些问题来看这篇文章:
- 何为 AQS?AQS 原理了解吗?
CountDownLatch
和CyclicBarrier
了解吗?两者的区别是什么?- 用过
Semaphore
吗?应用场景了解吗? - ……
相关阅读:从 ReentrantLock 的实现看AQS的原理及应用
AQS 简单介绍
AQS 的全称为 AbstractQueuedSynchronizer
,翻译过来的意思就是抽象队列同步器。这个类在 java.util.concurrent.locks
包下面。
AQS 就是一个抽象类,主要用来构建锁和同步器。
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
AQS 为构建锁和同步器提供了一些通用功能的是实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock
,Semaphore
,其他的诸如 ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
(jdk1.7) 等等皆是基于 AQS 的。
AQS 原理
在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。
下面大部分内容其实在 AQS 类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话可以看看源码。
AQS 原理概览
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。
看个 AQS(AbstractQueuedSynchronizer
)原理图:
AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。
1 | private volatile int state;//共享变量,使用volatile修饰保证线程可见性 |
状态信息通过 protected
类型的getState()
,setState()
,compareAndSetState()
进行操作
1 | //返回同步状态的当前值 |
AQS 对资源的共享方式
AQS 定义两种资源共享方式
Exclusive(独占)
只有一个线程能执行,如 ReentrantLock
。又可分为公平锁和非公平锁,ReentrantLock
同时支持两种锁,下面以 ReentrantLock
对这两种锁的定义做介绍:
- 公平锁 :按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁 :当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。
说明:下面这部分关于
ReentrantLock
源代码内容节选自:https://www.javadoop.com/post/AbstractQueuedSynchronizer-2 ,这是一篇很不错文章,推荐阅读。
下面来看 ReentrantLock 中相关的源代码:
ReentrantLock
默认采用非公平锁,因为考虑获得更好的性能,通过 boolean
来决定是否用公平锁(传入 true 用公平锁)。
1 | /** Synchronizer providing all implementation mechanics */ |
ReentrantLock
中公平锁的 lock
方法
1 | static final class FairSync extends Sync { |
非公平锁的 lock
方法:
1 | static final class NonfairSync extends Sync { |
总结:公平锁和非公平锁只有两处不同:
- 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
- 非公平锁在 CAS 失败后,和公平锁一样都会进入到
tryAcquire
方法,在tryAcquire
方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
Share(共享)
多个线程可同时执行,如 Semaphore/CountDownLatch
。Semaphore
、CountDownLatch
、 CyclicBarrier
、ReadWriteLock
我们都会在后面讲到。
ReentrantReadWriteLock
可以看成是组合式,因为 ReentrantReadWriteLock
也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了。
AQS 底层使用了模板方法模式
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
- 使用者继承
AbstractQueuedSynchronizer
并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放) - 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的钩子方法:
1 | protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。 |
什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,一般使用 protected
关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。
篇幅问题,这里就不详细介绍模板方法模式了,不太了解的小伙伴可以看看这篇文章:用Java8 改造后的模板方法模式真的是 yyds!open in new window。
除了上面提到的钩子方法之外,AQS 类中的其他方法都是 final
,所以无法被其他类重写。
以 ReentrantLock
为例,state 初始化为 0,表示未锁定状态。A 线程 lock()
时,会调用 tryAcquire()
独占该锁并将 state+1
。此后,其他线程再 tryAcquire()
时就会失败,直到 A 线程 unlock()
到 state=
0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。
再以 CountDownLatch
以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown()
一次,state 会 CAS(Compare and Swap) 减 1。等到所有子线程都执行完后(即 state=0
),会 unpark()
主调用线程,然后主调用线程就会从 await()
函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。
推荐两篇 AQS 原理和相关源码分析的文章:
Semaphore(信号量)
synchronized
和 ReentrantLock
都是一次只允许一个线程访问某个资源,Semaphore
(信号量)可以指定多个线程同时访问某个资源。
示例代码如下:
1 | /** |
执行 acquire()
方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 release
方法增加一个许可证,这可能会释放一个阻塞的 acquire()
方法。然而,其实并没有实际的许可证这个对象,Semaphore
只是维持了一个可获得许可证的数量。 Semaphore
经常用于限制获取某种资源的线程数量。
当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:
1 | semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4 |
除了 acquire()
方法之外,另一个比较常用的与之对应的方法是 tryAcquire()
方法,该方法如果获取不到许可就立即返回 false。
Semaphore
有两种模式,公平模式和非公平模式。
- 公平模式: 调用
acquire()
方法的顺序就是获取许可证的顺序,遵循 FIFO; - 非公平模式: 抢占式的。
Semaphore
对应的两个构造方法如下:
1 | public Semaphore(int permits) { |
这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。
issue645 补充内容open in new window :Semaphore
与 CountDownLatch
一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 permits
。当执行任务的线程数量超出 permits
,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 release()
方法,release()
方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 如此,每次只有最多不超过 permits
数量的线程能自旋成功,便限制了执行任务线程的数量。
CountDownLatch (倒计时器)
CountDownLatch
允许 count
个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch
是共享锁的一种实现,它默认构造 AQS 的 state
值为 count
。当线程使用 countDown()
方法时,其实使用了tryReleaseShared
方法以 CAS 的操作来减少 state
,直至 state
为 0 。当调用 await()
方法的时候,如果 state
不为 0,那就证明任务还没有执行完毕,await()
方法就会一直阻塞,也就是说 await()
方法之后的语句不会被执行。然后,CountDownLatch
会自旋 CAS 判断 state == 0
,如果 state == 0
的话,就会释放所有等待的线程,await()
方法之后的语句得到执行。
CountDownLatch 的两种典型用法
1、某一线程在开始运行前等待 n 个线程执行完毕。
将 CountDownLatch
的计数器初始化为 n (new CountDownLatch(n)
),每当一个任务线程执行完毕,就将计数器减 1 (countdownlatch.countDown()
),当计数器的值变为 0 时,在 CountDownLatch 上 await()
的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
2、实现多个线程开始执行任务的最大并行性。
注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch
对象,将其计数器初始化为 1 (new CountDownLatch(1)
),多个线程在开始执行任务前首先 coundownlatch.await()
,当主线程调用 countDown()
时,计数器变为 0,多个线程同时被唤醒。
CountDownLatch 的使用示例
1 | /** |
上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行System.out.println("finish");
。
与 CountDownLatch
的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await()
方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch
对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()
方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await()
方法,恢复执行自己的任务。
再插一嘴:CountDownLatch
的 await()
方法使用不当很容易产生死锁,比如我们上面代码中的 for 循环改为:
1 | for (int i = 0; i < threadCount-1; i++) { |
这样就导致 count
的值没办法等于 0,然后就会导致一直等待。
CountDownLatch 的不足
CountDownLatch
是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch
使用完毕后,它不能再次被使用。
CountDownLatch 相常见面试题
CountDownLatch
怎么用?应用场景是什么?CountDownLatch
和CyclicBarrier
的不同之处?CountDownLatch
类中主要的方法?
CyclicBarrier(循环栅栏)
CyclicBarrier
和 CountDownLatch
非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch
更加复杂和强大。主要应用场景和 CountDownLatch
类似。
CountDownLatch
的实现是基于 AQS 的,而CycliBarrier
是基于ReentrantLock
(ReentrantLock
也属于 AQS 同步器)和Condition
的。
CyclicBarrier
的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
CyclicBarrier
默认的构造方法是 CyclicBarrier(int parties)
,其参数表示屏障拦截的线程数量,每个线程调用 await()
方法告诉 CyclicBarrier
我已经到达了屏障,然后当前线程被阻塞。
再来看一下它的构造函数:
1 | public CyclicBarrier(int parties) { |
其中,parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
CyclicBarrier 的应用场景
CyclicBarrier
可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。
CyclicBarrier 的使用示例
示例 1:
1 | /** |
运行结果,如下:
1 | threadnum:0is ready |
可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await()
方法之后的方法才被执行。
另外,CyclicBarrier
还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction)
,用于在线程到达屏障时,优先执行 barrierAction
,方便处理更复杂的业务场景。示例代码如下:
1 | /** |
运行结果,如下:
1 | threadnum:0is ready |
CyclicBarrier 源码分析
当调用 CyclicBarrier
对象调用 await()
方法时,实际上调用的是 dowait(false, 0L)
方法。 await()
方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties
的值时,栅栏才会打开,线程才得以通过执行。
1 | public int await() throws InterruptedException, BrokenBarrierException { |
dowait(false, 0L)
:
1 | // 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。 |
总结:CyclicBarrier
内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
CyclicBarrier 和 CountDownLatch 的区别
下面这个是国外一个大佬的回答:
CountDownLatch
是计数器,只能使用一次,而 CyclicBarrier
的计数器提供 reset
功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从 jdk 作者设计的目的来看,javadoc 是这么描述它们的:
CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)
对于 CountDownLatch
来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier
,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
CountDownLatch
是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier
更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
ReentrantLock 和 ReentrantReadWriteLock
ReentrantLock
和 synchronized
的区别在上面已经讲过了这里就不多做讲解。另外,需要注意的是:读写锁 ReentrantReadWriteLock
可以保证多个线程可以同时读,所以在读操作远大于写操作的时候,读写锁就非常有用了。
Atomic 原子类总结
Atomic 原子类介绍
Atomic 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 Atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。
所以,所谓原子类说简单点就是具有原子/原子操作特征的类。
并发包 java.util.concurrent
的原子类都存放在java.util.concurrent.atomic
下,如下图所示。
根据操作的数据类型,可以将 JUC 包中的原子类分为 4 类
基本类型
使用原子的方式更新基本类型
- AtomicInteger:整型原子类
- AtomicLong:长整型原子类
- AtomicBoolean :布尔型原子类
数组类型
使用原子的方式更新数组里的某个元素
- AtomicIntegerArray:整型数组原子类
- AtomicLongArray:长整型数组原子类
- AtomicReferenceArray :引用类型数组原子类
引用类型
- AtomicReference:引用类型原子类
- AtomicMarkableReference:原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,也可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
- AtomicStampedReference :原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
对象的属性修改类型
- AtomicIntegerFieldUpdater:原子更新整型字段的更新器
- AtomicLongFieldUpdater:原子更新长整型字段的更新器
- AtomicReferenceFieldUpdater:原子更新引用类型里的字段
🐛 修正(参见:issue#626open in new window) :
AtomicMarkableReference
不能解决 ABA 问题。
1 | /** |
CAS ABA 问题
- 描述: 第一个线程取到了变量 x 的值 A,然后巴拉巴拉干别的事,总之就是只拿到了变量 x 的值 A。这段时间内第二个线程也取到了变量 x 的值 A,然后把变量 x 的值改为 B,然后巴拉巴拉干别的事,最后又把变量 x 的值变为 A (相当于还原了)。在这之后第一个线程终于进行了变量 x 的操作,但是此时变量 x 的值还是 A,所以 compareAndSet 操作是成功。
- 例子描述(可能不太合适,但好理解): 年初,现金为零,然后通过正常劳动赚了三百万,之后正常消费了(比如买房子)三百万。年末,虽然现金零收入(可能变成其他形式了),但是赚了钱是事实,还是得交税的!
- 代码例子(以
AtomicInteger
为例)
1 | import java.util.concurrent.atomic.AtomicInteger; |
输出内容如下:
1 | Thread-0 ------ currentValue=1 |
下面我们来详细介绍一下这些原子类。
基本类型原子类
基本类型原子类介绍
使用原子的方式更新基本类型
- AtomicInteger:整型原子类
- AtomicLong:长整型原子类
- AtomicBoolean :布尔型原子类
上面三个类提供的方法几乎相同,所以我们这里以 AtomicInteger 为例子来介绍。
AtomicInteger 类常用方法
1 | public final int get() //获取当前的值 |
AtomicInteger 常见方法使用
1 | import java.util.concurrent.atomic.AtomicInteger; |
基本数据类型原子类的优势
通过一个简单例子带大家看一下基本数据类型原子类的优势
① 多线程环境不使用原子类保证线程安全(基本数据类型)
1 | class Test { |
② 多线程环境使用原子类保证线程安全(基本数据类型)
1 | class Test2 { |
AtomicInteger 线程安全原理简单分析
AtomicInteger 类的部分源码:
1 | // setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用) |
AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。
CAS 的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe 类的 objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址。另外 value 是一个 volatile 变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。
数组类型原子类
数组类型原子类介绍
使用原子的方式更新数组里的某个元素
- AtomicIntegerArray:整形数组原子类
- AtomicLongArray:长整形数组原子类
- AtomicReferenceArray :引用类型数组原子类
上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerArray 为例子来介绍。
AtomicIntegerArray 类常用方法
1 | public final int get(int i) //获取 index=i 位置元素的值 |
AtomicIntegerArray 常见方法使用
1 | import java.util.concurrent.atomic.AtomicIntegerArray; |
引用类型原子类
引用类型原子类介绍
基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用 引用类型原子类。
- AtomicReference:引用类型原子类
- AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
- AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,也可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
上面三个类提供的方法几乎相同,所以我们这里以 AtomicReference 为例子来介绍。
AtomicReference 类使用示例
1 | import java.util.concurrent.atomic.AtomicReference; |
上述代码首先创建了一个 Person 对象,然后把 Person 对象设置进 AtomicReference 对象中,然后调用 compareAndSet 方法,该方法就是通过 CAS 操作设置 ar。如果 ar 的值为 person 的话,则将其设置为 updatePerson。实现原理与 AtomicInteger 类中的 compareAndSet 方法相同。运行上面的代码后的输出结果如下:
1 | Daisy |
AtomicStampedReference 类使用示例
1 | import java.util.concurrent.atomic.AtomicStampedReference; |
输出结果如下:
1 | currentValue=0, currentStamp=0 |
AtomicMarkableReference 类使用示例
1 | import java.util.concurrent.atomic.AtomicMarkableReference; |
输出结果如下:
1 | currentValue=null, currentMark=false |
对象的属性修改类型原子类
对象的属性修改类型原子类介绍
如果需要原子更新某个类里的某个字段时,需要用到对象的属性修改类型原子类。
- AtomicIntegerFieldUpdater:原子更新整形字段的更新器
- AtomicLongFieldUpdater:原子更新长整形字段的更新器
- AtomicReferenceFieldUpdater :原子更新引用类型里的字段的更新器
要想原子地更新对象的属性需要两步。第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。第二步,更新的对象属性必须使用 public volatile 修饰符。
上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerFieldUpdater
为例子来介绍。
AtomicIntegerFieldUpdater 类使用示例
1 | import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
输出结果:
1 | 22 |
Reference
- 《Java 并发编程的艺术》
ThreadLocal 详解
前言
全文共 10000+字,31 张图,这篇文章同样耗费了不少的时间和精力才创作完成,原创不易,请大家点点关注+在看,感谢。
对于ThreadLocal
,大家的第一反应可能是很简单呀,线程的变量副本,每个线程隔离。那这里有几个问题大家可以思考一下:
ThreadLocal
的 key 是弱引用,那么在ThreadLocal.get()
的时候,发生GC之后,key 是否为null?ThreadLocal
中ThreadLocalMap
的数据结构?ThreadLocalMap
的Hash 算法?ThreadLocalMap
中Hash 冲突如何解决?ThreadLocalMap
的扩容机制?ThreadLocalMap
中过期 key 的清理机制?探测式清理和启发式清理流程?ThreadLocalMap.set()
方法实现原理?ThreadLocalMap.get()
方法实现原理?- 项目中
ThreadLocal
使用情况?遇到的坑? - ……
上述的一些问题你是否都已经掌握的很清楚了呢?本文将围绕这些问题使用图文方式来剖析ThreadLocal
的点点滴滴。
目录
注明: 本文源码基于JDK 1.8
ThreadLocal
代码演示
我们先看下ThreadLocal
使用示例
1 | public class ThreadLocalTest { |
打印结果:
1 | [一枝花算不算浪漫] |
ThreadLocal
对象可以提供线程局部变量,每个线程Thread
拥有一份自己的副本变量,多个线程互不干扰。
ThreadLocal
的数据结构
Thread
类有一个类型为ThreadLocal.ThreadLocalMap
的实例变量threadLocals
,也就是说每个线程有一个自己的ThreadLocalMap
。
ThreadLocalMap
有自己的独立实现,可以简单地将它的key
视作ThreadLocal
,value
为代码中放入的值(实际上key
并不是ThreadLocal
本身,而是它的一个弱引用)。
每个线程在往ThreadLocal
里放值的时候,都会往自己的ThreadLocalMap
里存,读也是以ThreadLocal
作为引用,在自己的map
里找对应的key
,从而实现了线程隔离。
ThreadLocalMap
有点类似HashMap
的结构,只是HashMap
是由数组+链表实现的,而ThreadLocalMap
中并没有链表结构。
我们还要注意Entry
, 它的key
是ThreadLocal<?> k
,继承自WeakReference
, 也就是我们常说的弱引用类型。
GC 之后 key 是否为 null?
回应开头的那个问题, ThreadLocal
的key
是弱引用,那么在ThreadLocal.get()
的时候,发生GC
之后,key
是否是null
?
为了搞清楚这个问题,我们需要搞清楚Java
的四种引用类型:
- 强引用:我们常常 new 出来的对象就是强引用类型,只要强引用存在,垃圾回收器将永远不会回收被引用的对象,哪怕内存不足的时候
- 软引用:使用 SoftReference 修饰的对象被称为软引用,软引用指向的对象在内存要溢出的时候被回收
- 弱引用:使用 WeakReference 修饰的对象被称为弱引用,只要发生垃圾回收,若这个对象只被弱引用指向,那么就会被回收
- 虚引用:虚引用是最弱的引用,在 Java 中使用 PhantomReference 进行定义。虚引用中唯一的作用就是用队列接收对象即将死亡的通知
接着再来看下代码,我们使用反射的方式来看看GC
后ThreadLocal
中的数据情况:(下面代码来源自:https://blog.csdn.net/thewindkee/article/details/103726942 本地运行演示 GC 回收场景)
1 | public class ThreadLocalDemo { |
结果如下:
1 | 弱引用key:java.lang.ThreadLocal@433619b6,值:abc |
如图所示,因为这里创建的ThreadLocal
并没有指向任何值,也就是没有任何引用:
1 | new ThreadLocal<>().set(s); |
所以这里在GC
之后,key
就会被回收,我们看到上面debug
中的referent=null
, 如果改动一下代码:
这个问题刚开始看,如果没有过多思考,弱引用,还有垃圾回收,那么肯定会觉得是null
。
其实是不对的,因为题目说的是在做 ThreadLocal.get()
操作,证明其实还是有强引用存在的,所以 key
并不为 null
,如下图所示,ThreadLocal
的强引用仍然是存在的。
如果我们的强引用不存在的话,那么 key
就会被回收,也就是会出现我们 value
没被回收,key
被回收,导致 value
永远存在,出现内存泄漏。
ThreadLocal.set()
方法源码详解
ThreadLocal
中的set
方法原理如上图所示,很简单,主要是判断ThreadLocalMap
是否存在,然后使用ThreadLocal
中的set
方法进行数据处理。
代码如下:
1 | public void set(T value) { |
主要的核心逻辑还是在ThreadLocalMap
中的,一步步往下看,后面还有更详细的剖析。
ThreadLocalMap
Hash 算法
既然是Map
结构,那么ThreadLocalMap
当然也要实现自己的hash
算法来解决散列表数组冲突问题。
1 | int i = key.threadLocalHashCode & (len-1); |
ThreadLocalMap
中hash
算法很简单,这里i
就是当前 key 在散列表中对应的数组下标位置。
这里最关键的就是threadLocalHashCode
值的计算,ThreadLocal
中有一个属性为HASH_INCREMENT = 0x61c88647
1 | public class ThreadLocal<T> { |
每当创建一个ThreadLocal
对象,这个ThreadLocal.nextHashCode
这个值就会增长 0x61c88647
。
这个值很特殊,它是斐波那契数 也叫 黄金分割数。hash
增量为 这个数字,带来的好处就是 hash
分布非常均匀。
我们自己可以尝试下:
可以看到产生的哈希码分布很均匀,这里不去细纠斐波那契具体算法,感兴趣的可以自行查阅相关资料。
ThreadLocalMap
Hash 冲突
注明: 下面所有示例图中,绿色块
Entry
代表正常数据,灰色块代表Entry
的key
值为null
,已被垃圾回收。白色块表示Entry
为null
。
虽然ThreadLocalMap
中使用了黄金分割数来作为hash
计算因子,大大减少了Hash
冲突的概率,但是仍然会存在冲突。
HashMap
中解决冲突的方法是在数组上构造一个链表结构,冲突的数据挂载到链表上,如果链表长度超过一定数量则会转化成红黑树。
而 ThreadLocalMap
中并没有链表结构,所以这里不能使用 HashMap
解决冲突的方式了。
如上图所示,如果我们插入一个value=27
的数据,通过 hash
计算后应该落入槽位 4 中,而槽位 4 已经有了 Entry
数据。
此时就会线性向后查找,一直找到 Entry
为 null
的槽位才会停止查找,将当前元素放入此槽位中。当然迭代过程中还有其他的情况,比如遇到了 Entry
不为 null
且 key
值相等的情况,还有 Entry
中的 key
值为 null
的情况等等都会有不同的处理,后面会一一详细讲解。
这里还画了一个Entry
中的key
为null
的数据(Entry=2 的灰色块数据),因为key
值是弱引用类型,所以会有这种数据存在。在set
过程中,如果遇到了key
过期的Entry
数据,实际上是会进行一轮探测式清理操作的,具体操作方式后面会讲到。
ThreadLocalMap.set()
详解
ThreadLocalMap.set()
原理图解
看完了ThreadLocal
hash 算法后,我们再来看set
是如何实现的。
往ThreadLocalMap
中set
数据(新增或者更新数据)分为好几种情况,针对不同的情况我们画图来说明。
第一种情况: 通过hash
计算后的槽位对应的Entry
数据为空:
这里直接将数据放到该槽位即可。
第二种情况: 槽位数据不为空,key
值与当前ThreadLocal
通过hash
计算获取的key
值一致:
这里直接更新该槽位的数据。
第三种情况: 槽位数据不为空,往后遍历过程中,在找到Entry
为null
的槽位之前,没有遇到key
过期的Entry
:
遍历散列数组,线性往后查找,如果找到Entry
为null
的槽位,则将数据放入该槽位中,或者往后遍历过程中,遇到了key 值相等的数据,直接更新即可。
第四种情况: 槽位数据不为空,往后遍历过程中,在找到Entry
为null
的槽位之前,遇到key
过期的Entry
,如下图,往后遍历过程中,遇到了index=7
的槽位数据Entry
的key=null
:
散列数组下标为 7 位置对应的Entry
数据key
为null
,表明此数据key
值已经被垃圾回收掉了,此时就会执行replaceStaleEntry()
方法,该方法含义是替换过期数据的逻辑,以index=7位起点开始遍历,进行探测式数据清理工作。
初始化探测式清理过期数据扫描的开始位置:slotToExpunge = staleSlot = 7
以当前staleSlot
开始 向前迭代查找,找其他过期的数据,然后更新过期数据起始扫描下标slotToExpunge
。for
循环迭代,直到碰到Entry
为null
结束。
如果找到了过期的数据,继续向前迭代,直到遇到Entry=null
的槽位才停止迭代,如下图所示,slotToExpunge 被更新为 0:
以当前节点(index=7
)向前迭代,检测是否有过期的Entry
数据,如果有则更新slotToExpunge
值。碰到null
则结束探测。以上图为例slotToExpunge
被更新为 0。
上面向前迭代的操作是为了更新探测清理过期数据的起始下标slotToExpunge
的值,这个值在后面会讲解,它是用来判断当前过期槽位staleSlot
之前是否还有过期元素。
接着开始以staleSlot
位置(index=7
)向后迭代,如果找到了相同 key 值的 Entry 数据:
从当前节点staleSlot
向后查找key
值相等的Entry
元素,找到后更新Entry
的值并交换staleSlot
元素的位置(staleSlot
位置为过期元素),更新Entry
数据,然后开始进行过期Entry
的清理工作,如下图所示:
向后遍历过程中,如果没有找到相同 key 值的 Entry 数据:
从当前节点staleSlot
向后查找key
值相等的Entry
元素,直到Entry
为null
则停止寻找。通过上图可知,此时table
中没有key
值相同的Entry
。
创建新的Entry
,替换table[stableSlot]
位置:
替换完成后也是进行过期元素清理工作,清理工作主要是有两个方法:expungeStaleEntry()
和cleanSomeSlots()
,具体细节后面会讲到,请继续往后看。
ThreadLocalMap.set()
源码详解
上面已经用图的方式解析了set()
实现的原理,其实已经很清晰了,我们接着再看下源码:
java.lang.ThreadLocal
.ThreadLocalMap.set()
:
1 | private void set(ThreadLocal<?> key, Object value) { |
这里会通过key
来计算在散列表中的对应位置,然后以当前key
对应的桶的位置向后查找,找到可以使用的桶。
1 | Entry[] tab = table; |
什么情况下桶才是可以使用的呢?
k = key
说明是替换操作,可以使用- 碰到一个过期的桶,执行替换逻辑,占用过期桶
- 查找过程中,碰到桶中
Entry=null
的情况,直接使用
接着就是执行for
循环遍历,向后查找,我们先看下nextIndex()
、prevIndex()
方法实现:
1 | private static int nextIndex(int i, int len) { |
接着看剩下for
循环中的逻辑:
- 遍历当前
key
值对应的桶中Entry
数据为空,这说明散列数组这里没有数据冲突,跳出for
循环,直接set
数据到对应的桶中 - 如果
key
值对应的桶中Entry
数据不为空
2.1 如果k = key
,说明当前set
操作是一个替换操作,做替换逻辑,直接返回
2.2 如果key = null
,说明当前桶位置的Entry
是过期数据,执行replaceStaleEntry()
方法(核心方法),然后返回 for
循环执行完毕,继续往下执行说明向后迭代的过程中遇到了entry
为null
的情况
3.1 在Entry
为null
的桶中创建一个新的Entry
对象
3.2 执行++size
操作- 调用
cleanSomeSlots()
做一次启发式清理工作,清理散列数组中Entry
的key
过期的数据
4.1 如果清理工作完成后,未清理到任何数据,且size
超过了阈值(数组长度的 2/3),进行rehash()
操作
4.2rehash()
中会先进行一轮探测式清理,清理过期key
,清理完成后如果size >= threshold - threshold / 4,就会执行真正的扩容逻辑(扩容逻辑往后看)
接着重点看下replaceStaleEntry()
方法,replaceStaleEntry()
方法提供替换过期数据的功能,我们可以对应上面第四种情况的原理图来再回顾下,具体代码如下:
java.lang.ThreadLocal.ThreadLocalMap.replaceStaleEntry()
:
1 | private void replaceStaleEntry(ThreadLocal<?> key, Object value, |
slotToExpunge
表示开始探测式清理过期数据的开始下标,默认从当前的staleSlot
开始。以当前的staleSlot
开始,向前迭代查找,找到没有过期的数据,for
循环一直碰到Entry
为null
才会结束。如果向前找到了过期数据,更新探测清理过期数据的开始下标为 i,即slotToExpunge=i
1 | for (int i = prevIndex(staleSlot, len); |
接着开始从staleSlot
向后查找,也是碰到Entry
为null
的桶结束。 如果迭代过程中,碰到 k == key,这说明这里是替换逻辑,替换新数据并且交换当前staleSlot
位置。如果slotToExpunge == staleSlot
,这说明replaceStaleEntry()
一开始向前查找过期数据时并未找到过期的Entry
数据,接着向后查找过程中也未发现过期数据,修改开始探测式清理过期数据的下标为当前循环的 index,即slotToExpunge = i
。最后调用cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
进行启发式过期数据清理。
1 | if (k == key) { |
cleanSomeSlots()
和expungeStaleEntry()
方法后面都会细讲,这两个是和清理相关的方法,一个是过期key
相关Entry
的启发式清理(Heuristically scan
),另一个是过期key
相关Entry
的探测式清理。
如果 k != key则会接着往下走,k == null
说明当前遍历的Entry
是一个过期数据,slotToExpunge == staleSlot
说明,一开始的向前查找数据并未找到过期的Entry
。如果条件成立,则更新slotToExpunge
为当前位置,这个前提是前驱节点扫描时未发现过期数据。
1 | if (k == null && slotToExpunge == staleSlot) |
往后迭代的过程中如果没有找到k == key
的数据,且碰到Entry
为null
的数据,则结束当前的迭代操作。此时说明这里是一个添加的逻辑,将新的数据添加到table[staleSlot]
对应的slot
中。
1 | tab[staleSlot].value = null; |
最后判断除了staleSlot
以外,还发现了其他过期的slot
数据,就要开启清理数据的逻辑:
1 | if (slotToExpunge != staleSlot) |
ThreadLocalMap
过期 key 的探测式清理流程
上面我们有提及ThreadLocalMap
的两种过期key
数据清理方式:探测式清理和启发式清理。
我们先讲下探测式清理,也就是expungeStaleEntry
方法,遍历散列数组,从开始位置向后探测清理过期数据,将过期数据的Entry
设置为null
,沿途中碰到未过期的数据则将此数据rehash
后重新在table
数组中定位,如果定位的位置已经有了数据,则会将未过期的数据放到最靠近此位置的Entry=null
的桶中,使rehash
后的Entry
数据距离正确的桶的位置更近一些。操作逻辑如下:
如上图,set(27)
经过 hash 计算后应该落到index=4
的桶中,由于index=4
桶已经有了数据,所以往后迭代最终数据放入到index=7
的桶中,放入后一段时间后index=5
中的Entry
数据key
变为了null
如果再有其他数据set
到map
中,就会触发探测式清理操作。
如上图,执行探测式清理后,index=5
的数据被清理掉,继续往后迭代,到index=7
的元素时,经过rehash
后发现该元素正确的index=4
,而此位置已经有了数据,往后查找离index=4
最近的Entry=null
的节点(刚被探测式清理掉的数据:index=5
),找到后移动index= 7
的数据到index=5
中,此时桶的位置离正确的位置index=4
更近了。
经过一轮探测式清理后,key
过期的数据会被清理掉,没过期的数据经过rehash
重定位后所处的桶位置理论上更接近i= key.hashCode & (tab.len - 1)
的位置。这种优化会提高整个散列表查询性能。
接着看下expungeStaleEntry()
具体流程,我们还是以先原理图后源码讲解的方式来一步步梳理:
我们假设expungeStaleEntry(3)
来调用此方法,如上图所示,我们可以看到ThreadLocalMap
中table
的数据情况,接着执行清理操作:
第一步是清空当前staleSlot
位置的数据,index=3
位置的Entry
变成了null
。然后接着往后探测:
执行完第二步后,index=4 的元素挪到 index=3 的槽位中。
继续往后迭代检查,碰到正常数据,计算该数据位置是否偏移,如果被偏移,则重新计算slot
位置,目的是让正常数据尽可能存放在正确位置或离正确位置更近的位置
在往后迭代的过程中碰到空的槽位,终止探测,这样一轮探测式清理工作就完成了,接着我们继续看看具体实现源代码:
1 | private int expungeStaleEntry(int staleSlot) { |
这里我们还是以staleSlot=3
来做示例说明,首先是将tab[staleSlot]
槽位的数据清空,然后设置size--
接着以staleSlot
位置往后迭代,如果遇到k==null
的过期数据,也是清空该槽位数据,然后size--
1 | ThreadLocal<?> k = e.get(); |
如果key
没有过期,重新计算当前key
的下标位置是不是当前槽位下标位置,如果不是,那么说明产生了hash
冲突,此时以新计算出来正确的槽位位置往后迭代,找到最近一个可以存放entry
的位置。
1 | int h = k.threadLocalHashCode & (len - 1); |
里是处理正常的产生Hash
冲突的数据,经过迭代后,有过Hash
冲突数据的Entry
位置会更靠近正确位置,这样的话,查询的时候 效率才会更高。
ThreadLocalMap
扩容机制
在ThreadLocalMap.set()
方法的最后,如果执行完启发式清理工作后,未清理到任何数据,且当前散列数组中Entry
的数量已经达到了列表的扩容阈值(len*2/3)
,就开始执行rehash()
逻辑:
1 | if (!cleanSomeSlots(i, sz) && sz >= threshold) |
接着看下rehash()
具体实现:
1 | private void rehash() { |
这里首先是会进行探测式清理工作,从table
的起始位置往后清理,上面有分析清理的详细流程。清理完成之后,table
中可能有一些key
为null
的Entry
数据被清理掉,所以此时通过判断size >= threshold - threshold / 4
也就是size >= threshold * 3/4
来决定是否扩容。
我们还记得上面进行rehash()
的阈值是size >= threshold
,所以当面试官套路我们ThreadLocalMap
扩容机制的时候 我们一定要说清楚这两个步骤:
接着看看具体的resize()
方法,为了方便演示,我们以oldTab.len=8
来举例:
扩容后的tab
的大小为oldLen * 2
,然后遍历老的散列表,重新计算hash
位置,然后放到新的tab
数组中,如果出现hash
冲突则往后寻找最近的entry
为null
的槽位,遍历完成之后,oldTab
中所有的entry
数据都已经放入到新的tab
中了。重新计算tab
下次扩容的阈值,具体代码如下:
1 | private void resize() { |
ThreadLocalMap.get()
详解
上面已经看完了set()
方法的源码,其中包括set
数据、清理数据、优化数据桶的位置等操作,接着看看get()
操作的原理。
ThreadLocalMap.get()
图解
第一种情况: 通过查找key
值计算出散列表中slot
位置,然后该slot
位置中的Entry.key
和查找的key
一致,则直接返回:
第二种情况: slot
位置中的Entry.key
和要查找的key
不一致:
我们以get(ThreadLocal1)
为例,通过hash
计算后,正确的slot
位置应该是 4,而index=4
的槽位已经有了数据,且key
值不等于ThreadLocal1
,所以需要继续往后迭代查找。
迭代到index=5
的数据时,此时Entry.key=null
,触发一次探测式数据回收操作,执行expungeStaleEntry()
方法,执行完后,index 5,8
的数据都会被回收,而index 6,7
的数据都会前移。index 6,7
前移之后,继续从 index=5
往后迭代,于是就在 index=5
找到了key
值相等的Entry
数据,如下图所示:
ThreadLocalMap.get()
源码详解
java.lang.ThreadLocal.ThreadLocalMap.getEntry()
:
1 | private Entry getEntry(ThreadLocal<?> key) { |
ThreadLocalMap
过期 key 的启发式清理流程
上面多次提及到ThreadLocalMap
过期key的两种清理方式:探测式清理(expungeStaleEntry())、启发式清理(cleanSomeSlots())
探测式清理是以当前Entry
往后清理,遇到值为null
则结束清理,属于线性探测清理。
而启发式清理被作者定义为:Heuristically scan some cells looking for stale entries.
具体代码如下:
1 | private boolean cleanSomeSlots(int i, int n) { |
InheritableThreadLocal
我们使用ThreadLocal
的时候,在异步场景下是无法给子线程共享父线程中创建的线程副本数据的。
为了解决这个问题,JDK 中还有一个InheritableThreadLocal
类,我们来看一个例子:
1 | public class InheritableThreadLocalDemo { |
打印结果:
1 | 子线程获取父类ThreadLocal数据:null |
实现原理是子线程是通过在父线程中通过调用new Thread()
方法来创建子线程,Thread#init
方法在Thread
的构造方法中被调用。在init
方法中拷贝父线程数据到子线程中:
1 | private void init(ThreadGroup g, Runnable target, String name, |
但InheritableThreadLocal
仍然有缺陷,一般我们做异步化处理都是使用的线程池,而InheritableThreadLocal
是在new Thread
中的init()
方法给赋值的,而线程池是线程复用的逻辑,所以这里会存在问题。
当然,有问题出现就会有解决问题的方案,阿里巴巴开源了一个TransmittableThreadLocal
组件就可以解决这个问题,这里就不再延伸,感兴趣的可自行查阅资料。
ThreadLocal
项目中使用实战
ThreadLocal
使用场景
我们现在项目中日志记录用的是ELK+Logstash
,最后在Kibana
中进行展示和检索。
现在都是分布式系统统一对外提供服务,项目间调用的关系可以通过 traceId
来关联,但是不同项目之间如何传递 traceId
呢?
这里我们使用 org.slf4j.MDC
来实现此功能,内部就是通过 ThreadLocal
来实现的,具体实现如下:
当前端发送请求到服务 A时,服务 A会生成一个类似UUID
的traceId
字符串,将此字符串放入当前线程的ThreadLocal
中,在调用服务 B的时候,将traceId
写入到请求的Header
中,服务 B在接收请求时会先判断请求的Header
中是否有traceId
,如果存在则写入自己线程的ThreadLocal
中。
图中的requestId
即为我们各个系统链路关联的traceId
,系统间互相调用,通过这个requestId
即可找到对应链路,这里还有会有一些其他场景:
针对于这些场景,我们都可以有相应的解决方案,如下所示
Feign 远程调用解决方案
服务发送请求:
1 |
|
服务接收请求:
1 |
|
线程池异步调用,requestId 传递
因为MDC
是基于ThreadLocal
去实现的,异步过程中,子线程并没有办法获取到父线程ThreadLocal
存储的数据,所以这里可以自定义线程池执行器,修改其中的run()
方法:
1 | public class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { |
使用 MQ 发送消息给第三方系统
在 MQ 发送的消息体中自定义属性requestId
,接收方消费消息后,自己解析requestId
使用即可。
CompletableFuture入门
自己在项目中使用 CompletableFuture
比较多,看到很多开源框架中也大量使用到了 CompletableFuture
。
因此,专门写一篇文章来介绍这个 Java 8 才被引入的一个非常有用的用于异步编程的类。
简单介绍
CompletableFuture
同时实现了 Future
和 CompletionStage
接口。
1 | public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { |
CompletableFuture
除了提供了更为好用和强大的 Future
特性之外,还提供了函数式编程的能力。
Future
接口有 5 个方法:
boolean cancel(boolean mayInterruptIfRunning)
:尝试取消执行任务。boolean isCancelled()
:判断任务是否被取消。boolean isDone()
: 判断任务是否已经被执行完成。get()
:等待任务执行完成并获取运算结果。get(long timeout, TimeUnit unit)
:多了一个超时时间。
`
CompletionStage 接口中的方法比较多,
CompletableFuture` 的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程。
由于方法众多,所以这里不能一一讲解,下文中我会介绍大部分常见方法的使用。
常见操作
创建 CompletableFuture
常见的创建 CompletableFuture
对象的方法如下:
- 通过 new 关键字。
- 基于
CompletableFuture
自带的静态工厂方法:runAsync()
、supplyAsync()
。
new 关键字
通过 new 关键字创建 CompletableFuture
对象这种使用方式可以看作是将 CompletableFuture
当做 Future
来使用。
我在我的开源项目 guide-rpc-frameworkopen in new window 中就是这种方式创建的 CompletableFuture
对象。
下面咱们来看一个简单的案例。
我们通过创建了一个结果值类型为 RpcResponse<Object>
的 CompletableFuture
,你可以把 resultFuture
看作是异步运算结果的载体。
1 | CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>(); |
假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 complete()
方法为其传入结果,这表示 resultFuture
已经被完成了。
1 | // complete() 方法只能调用一次,后续调用将被忽略。 |
你可以通过 isDone()
方法来检查是否已经完成。
1 | public boolean isDone() { |
获取异步计算的结果也非常简单,直接调用 get()
方法即可。调用 get()
方法的线程会阻塞直到 CompletableFuture
完成运算。
1 | rpcResponse = completableFuture.get(); |
如果你已经知道计算的结果的话,可以使用静态方法 completedFuture()
来创建 CompletableFuture
。
1 | CompletableFuture<String> future = CompletableFuture.completedFuture("hello!"); |
completedFuture()
方法底层调用的是带参数的 new 方法,只不过,这个方法不对外暴露。
1 | public static <U> CompletableFuture<U> completedFuture(U value) { |
静态工厂方法
这两个方法可以帮助我们封装计算逻辑。
1 | static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); |
runAsync()
方法接受的参数是 Runnable
,这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用 runAsync()
方法。
1 |
|
supplyAsync()
方法接受的参数是 Supplier<U>
,这也是一个函数式接口,U
是返回结果值的类型。
1 |
|
当你需要异步操作且关心返回结果的时候,可以使用 supplyAsync()
方法。
1 | CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("hello!")); |
处理异步结算的结果
当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:
thenApply()
thenAccept()
thenRun()
whenComplete()
thenApply()
方法接受一个 Function
实例,用它来处理结果。
1 | // 沿用上一个任务的线程池 |
thenApply()
方法使用示例如下:
1 | CompletableFuture<String> future = CompletableFuture.completedFuture("hello!") |
你还可以进行 流式调用:
1 | CompletableFuture<String> future = CompletableFuture.completedFuture("hello!") |
如果你不需要从回调函数中获取返回结果,可以使用 thenAccept() 或者 thenRun()。这两个方法的区别在于 thenRun() 不能访问异步计算的结果。
thenAccept()
方法的参数是 Consumer<? super T>
。
1 | public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { |
顾名思义,Consumer
属于消费型接口,它可以接收 1 个输入对象然后进行“消费”。
1 |
|
thenRun()
的方法是的参数是 Runnable
。
1 | public CompletableFuture<Void> thenRun(Runnable action) { |
thenAccept()
和 thenRun()
使用示例如下:
1 | CompletableFuture.completedFuture("hello!") |
whenComplete()
的方法的参数是 BiConsumer<? super T, ? super Throwable>
。
1 | public CompletableFuture<T> whenComplete( |
相对于 Consumer
, BiConsumer
可以接收 2 个输入对象然后进行“消费”。
1 |
|
whenComplete()
使用示例如下:
1 | CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello!") |
异常处理
你可以通过 handle()
方法来处理任务执行过程中可能出现的抛出异常的情况。
1 | public <U> CompletableFuture<U> handle( |
示例代码如下:
1 | CompletableFuture<String> future |
你还可以通过 exceptionally()
方法来处理异常情况。
1 | CompletableFuture<String> future |
如果你想让 CompletableFuture
的结果就是异常的话,可以使用 completeExceptionally()
方法为其赋值。
1 | CompletableFuture<String> completableFuture = new CompletableFuture<>(); |
组合 CompletableFuture
你可以使用 thenCompose()
按顺序链接两个 CompletableFuture
对象。
1 | public <U> CompletableFuture<U> thenCompose( |
thenCompose()
方法会使用示例如下:
1 | CompletableFuture<String> future |
在实际开发中,这个方法还是非常有用的。比如说,我们先要获取用户信息然后再用用户信息去做其他事情。
和 thenCompose()
方法类似的还有 thenCombine()
方法, thenCombine()
同样可以组合两个 CompletableFuture
对象。
1 | CompletableFuture<String> completableFuture |
那 thenCompose() 和 thenCombine() 有什么区别呢?
thenCompose()
可以两个CompletableFuture
对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。thenCombine()
会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。
并行运行多个 CompletableFuture
你可以通过 CompletableFuture
的 allOf()
这个静态方法来并行运行多个 CompletableFuture
。
实际项目中,我们经常需要并行运行多个互不相关的任务,这些任务之间没有依赖关系,可以互相独立地运行。
比说我们要读取处理 6 个文件,这 6 个任务都是没有执行顺序依赖的任务,但是我们需要返回给用户的时候将这几个文件的处理的结果进行统计整理。像这种情况我们就可以使用并行运行多个 CompletableFuture
来处理。
示例代码如下:
1 | CompletableFuture<Void> task1 = |
经常和 allOf()
方法拿来对比的是 anyOf()
方法。
allOf() 方法会等到所有的 CompletableFuture 都运行完成之后再返回
1 | Random rand = new Random(); |
调用 join()
可以让程序等future1
和 future2
都运行完了之后再继续执行。
1 | CompletableFuture<Void> completableFuture = CompletableFuture.allOf(future1, future2); |
输出:
1 | future1 done... |
anyOf() 方法不会等待所有的 CompletableFuture 都运行完成之后再返回,只要有一个执行完成即可!
1 | CompletableFuture<Object> f = CompletableFuture.anyOf(future1, future2); |
输出结果可能是:
1 | future2 done... |
也可能是:
1 | future1 done... |
后记
这篇文章只是简单介绍了 CompletableFuture
比较常用的一些 API 。
如果想要深入学习的话,可以多找一些书籍和博客看。
另外,建议G友们可以看看京东的 asyncToolopen in new window 这个并发框架,里面大量使用到了 CompletableFuture
。