java基础篇4 线程池原子类
线程类的执行方式
一、runnable的执行 , 作为thread的参数
public class MyRunnable implements Runnable {
@Override
public void run() {
// 在这里定义可在线程中执行的代码
}
}
MyRunnable myRunnable = new MyRunnable();
Thread thread = new Thread(myRunnable);
thread.start();
or
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// 在这里定义可在线程中执行的代码
}
});
thread.start();
二、callable的执行,被ExecutorService执行or作为FeatureTask的参数
(后面看 底层,发现最终还是底层还是用execute(Runnable r)实现的 只不过这里的r是经过 FutureTask 封装过的Runnable
public class MyCallable implements Callable<T> {
@Override
public T call() throws Exception {
// 在这里定义可被调用的代码
}
}
MyCallable myCallable = new MyCallable();
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<T> future = executor.submit(myCallable);
T result = future.get();
or
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<T> future = executor.submit(new Callable<T>() {
@Override
public T call() throws Exception {
// 在这里定义可被调用的代码
}
});
T result = future.get();
or
使用FeatureTask (用FutureTask 包装 callable 再将FutureTask 对象作为参数创建线程,开启,获取参数通过FutureTask 对象获取 )
Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。
其实FutureTask 又实现了Runnable接口, 所以线程接受的还是一个Runnable变量…..
Callable<Process> task = () -> {
// 执行异步任务
Runtime runtime = Runtime.getRuntime();
Process process = runtime.exec("/Users/mac/Desktop/qc-java-runtime/src/main/java/com/qc/runtime/shell.sh");
return process;
};
// 将Callable包装成FutureTask
FutureTask<Process> future = new FutureTask<>(task);
// 启动新线程来执行异步任务
new Thread(future).start();
// 获取异步任务的结果
Process result = future.get();
System.out.println(result);
实现 Runnable 接⼝和 Callable 接⼝的区别
- 1、最大的区别,runnable没有返回值,而实现callable接口的任务线程能返回执行结果(调用结果时主线程会阻塞来等待执行获取结果)
2、callable接口实现类中的run方法允许异常向上抛出,可以在内部处理,try catch,但是runnable接口实现类中run方法的异常必须在内部处理,不能抛出;所以,如果任务不需要返回结果或抛出异常推荐使⽤ Runnable 接⼝,这样代码看起来会更加简洁
3、callable和runnable都可以应用于executors(线程池)。而thread类只支持runnable
- ⼯具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。( Executors.callable Runnable task )或 Executors.callable Runnable task Object resule )
线程池
为什么要⽤线程池?
池化技术相⽐⼤家已经屡⻅不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应⽤。池化技术的思想主要是为了减少每次获取资源的消耗,提⾼对资源的利⽤率。(避免每次重新建立对象,而直接保留这个对象复用)
线程池提供了⼀种限制和管理资源(包括执⾏⼀个任务)。 每个线程池还维护⼀些基本统计信息,例如已完成任务的数量。
这⾥借⽤《Java 并发编程的艺术》提到的来说⼀下使⽤线程池的好处:
- 降低资源消耗。通过重复利⽤已创建的线程降低线程创建和销毁造成的消耗。
- 提⾼响应速度。当任务到达时,任务可以不需要的等到线程创建就能⽴即执⾏。
- 提⾼线程的可管理性。线程是稀缺资源,如果⽆限制的创建,不仅会消耗系统资源,还会降
- 低系统的稳定性,使⽤线程池可以进⾏统⼀的分配,调优和监控。
Executor 框架
是 Java5 之后引进的,通过 Executor 来启动线程比使用 Thread 的start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。
补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。
Executor 框架结构
1) 任务( Runnable / Callable )
执行任务需要实现的 Runnable 接口 或 Callable 接口。 Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
2) 任务的执行( Executor )
如下图所示,包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的ExecutorService 接口。 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口
这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。
3) 异步计算的结果( Future ) (当需要获取线程执行结果时)
Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。
当我们把 Runnable 接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)
Executor 框架的使用示意图
- 主线程首先要创建实现
Runnable
或者Callable
接口的任务对象。 - 把创建完成的实现
Runnable
接口的 对象直接交给execute(Runnable r)
,或者也可以把Runnable
对象或Callable
对象提交给submit(Runnable r) or submit(Callable c)
。 - 如果执行
ExecutorService.submit(…)
,ExecutorService
将返回一个实现Future
接口的对象submit()
会返回一个FutureTask
对象)。由于FutureTask
实现了Runnable
,我们也可以创建FutureTask
,然后直接交给ExecutorService
执行。 - 最后,主线程可以执行
FutureTask.get()
方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)
来取消此任务的执行。
执行 execute() 方法和 submit() 方法的区别
execute() (只能接收实现了Runable接口的实现类)
该⽅法⽤于提交不需要返回值的任务,所以⽆法判断任务是否被线程池执⾏成功与否;
submit() (可以接收Callable、Runnable两种类型的参数)
该⽅法⽤于提交需要返回值的任务。
线程池会返回⼀个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执⾏成功,并且可以通过 Future 的 get() ⽅法来获取返回值, get() ⽅法会阻塞当前线程直到任务完成,⽽使⽤ get long timeout TimeUnitunit ⽅法则会阻塞当前线程⼀段时间后⽴即返回,这时候有可能任务没有执⾏完
即使用了
submit()
底层ExecutorService
还是用的execute
异常
execute会直接抛出任务执行时的异常,可以用try、catch来捕获,和普通线程的处理方式完全一致
submit会吃掉异常,可通过Future的get方法将任务执行时的异常重新抛出。
如何创建线程池
线程池的创建⽅式总共包含以下 7 种(其中 6 种是通过 Executors (工具类)创建的, 1 种是通过ThreadPoolExecutor 创建的, 其实前四个底层都是ThreadPoolExecutor)
- Executors.newFixedThreadPool:创建⼀个固定⼤⼩的线程池,可控制并发的线程数,超出的线程会在队列中等待;
- 允许请求的队列⻓度为Integer.MAX_VALUE
- Executors.newCachedThreadPool:创建⼀个可缓存的线程池,若线程数超过处理所需,缓存⼀段时间后会回收,若线程数不够,则新建线程;
- 允许创建的线程数量为 Integer.MAX_VALUE
- Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执⾏顺序;
- 允许请求的队列⻓度为Integer.MAX_VALUE
- Executors.newScheduledThreadPool:创建⼀个可以执⾏延迟任务的线程池;
- 允许创建的线程数量为Integer.MAX_VALUE
- Executors.newSingleThreadScheduledExecutor:创建⼀个单线程的可以执⾏延迟任务的线程池;
- Executors.newWorkStealingPool:创建⼀个抢占式执⾏的线程池(任务执⾏顺序不确定)【JDK1.8 添加】。
- ThreadPoolExecutor:最原始的创建线程池的⽅式,它包含了 7 个参数可供设置,后⾯会详细讲。
《阿⾥巴巴 Java 开发⼿册》中强制线程池不允许使⽤ Executors 去创建,⽽是通过ThreadPoolExecutor 的⽅式,这样的处理⽅式让写的同学更加明确线程池的运⾏规则,规避资源耗尽的⻛险
Executors 返回线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列⻓度为Integer.MAX_VALUE ,可能堆积⼤量的请求,从⽽导致 OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为Integer.MAX_VALUE ,可能会创建⼤量线程,从⽽导致 OOM。
ThreadPoolExecutor 类分析
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize :线程池中核心线程数的最大值
- maximumPoolSize :线程池中能拥有最多线程数
- workQueue:用于缓存任务的阻塞队列
当调用线程池execute() 方法添加一个任务时,线程池会做如下判断:
- 如果有空闲线程,则直接执行该任务;
- 如果没有空闲线程,且当前运行的线程数少于corePoolSize,则创建新的线程执行该任务;
- 如果没有空闲线程,且当前的线程数等于corePoolSize,同时阻塞队列未满,则将任务入队列,而不添加新的线程;
- 如果没有空闲线程,且阻塞队列已满,同时池中的线程数小于maximumPoolSize ,则创建新的线程执行任务;
- 如果没有空闲线程,且阻塞队列已满,同时池中的线程数等于maximumPoolSize ,则根据构造函数中的 handler 指定的策略来拒绝新的任务。
KeepAliveTime:
- keepAliveTime :表示空闲线程的存活时间
- TimeUnit unit :表示keepAliveTime的单位
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
workQueue 任务队列:
- workQueue :它决定了缓存任务的排队策略
ThreadPoolExecutor线程池推荐了三种等待队列,它们是:SynchronousQueue 、LinkedBlockingQueue和 ArrayBlockingQueue。
1)有界队列:
SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于 阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
ArrayBlockingQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。2)无界队列:
LinkedBlockingQueue:基于链表结构的无界阻塞队列,它可以指定容量也可以不指定容量(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE)
PriorityBlockingQueue:是一个按照优先级进行内部元素排序的无界阻塞队列。队列中的元素必须实现 Comparable 接口,这样才能通过实现compareTo()方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue 不会保证优先级一样的元素的排序。
注意:keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。
threadFactory:
threadFactory :指定创建线程的工厂。(可以不指定) 一般使用Executors 工具类里面指定的工厂
如果不指定线程工厂时,ThreadPoolExecutor 会使用ThreadPoolExecutor.defaultThreadFactory 创建线程。默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY 的优先级,以及名为 “pool-XXX-thread-” 的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。
handler 拒绝策略:
handler :表示当 workQueue 已满,且池中的线程数达到 maximumPoolSize 时,线程池拒绝添加新任务时采取的策略。(可以不指定)
| 策略 | BB |
| ———————————————————— | —————————————————————————————— |
| ThreadPoolExecutor.AbortPolicy() | 来拒绝新任务的处理。抛出RejectedExecutionException异常。默认策略 |
| ThreadPoolExecutor.CallerRunsPolicy() | 由向线程池提交任务的线程来执行该任务,当最⼤池被填满时,此策略为我们提供可伸缩队列。 |
| ThreadPoolExecutor.DiscardPolicy() | 不处理新任务,直接丢弃掉。 |
| ThreadPoolExecutor.DiscardOldestPolicy() | 此策略将丢弃最早的队列中那个未处理的任务请求 |最科学的的还是 AbortPolicy 提供的处理方式:抛出异常,由开发人员进行处理。
Atomic 原⼦类
Atomic 翻译成中⽂是原⼦的意思。在化学上,我们知道原⼦是构成⼀般物质的最⼩单位,在化学反应中是不可分割的。在我们这⾥ Atomic 是指⼀个操作是不可中断的。即使是在多个线程⼀起执⾏的时候,⼀个操作⼀旦开始,就不会被其他线程⼲扰。
并发包 java.util.concurrent 的原⼦类都存放在 java.util.concurrent.atomic中
JUC 包中的原⼦类(4类)
原子基本类型
使⽤原⼦的⽅式更新基本类型
- AtomicInteger :整形原⼦类
- AtomicLong :⻓整型原⼦类
- AtomicBoolean :布尔型原⼦类
数组类型
使⽤原⼦的⽅式更新数组⾥的某个元素
- AtomicIntegerArray :整形数组原⼦类
- AtomicLongArray :⻓整形数组原⼦类
- AtomicReferenceArray :引⽤类型数组原⼦类
引⽤类型
- AtomicReference :引⽤类型原⼦类
- AtomicStampedReference :原⼦更新带有版本号的引⽤类型。该类将整数值与引⽤关联起来,可⽤于解决原⼦的更新数据和数据的版本号,可以解决使⽤ CAS 进⾏原⼦更新时可能出现的 ABA 问题。
- AtomicMarkableReference :原⼦更新带有标记位的引⽤类型
对象的属性修改类型
- AtomicIntegerFieldUpdater :原⼦更新整形字段的更新器
- AtomicLongFieldUpdater :原⼦更新⻓整形字段的更新器
- AtomicReferenceFieldUpdater :原⼦更新引⽤类型字段的更新器
AQS
关于AQS 具体看看 我的 AQS 与ReentrantLock这篇博客
AQS 的全称为( AbstractQueuedSynchronizer ),这个类在 java.util.concurrent.locks 包下⾯。
AQS ( Abstract Queued Synchronizer )是一个抽象的队列同步器,通过维护一个共享资源状态( Volatile Int State )和一个先进先出( FIFO )的线程等待队列来实现一个多线程访问共享资源的同步框架。
状态信息通过 protected 类型的 getState , setState , compareAndSetState 进行操作
AQS 定义两种资源共享方式
1)Exclusive(独占)
只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁,下面以 ReentrantLock 对这两种锁的定义做介绍:
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
非公平锁:当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。
了解一下 ReentrantLock 中相关知识:
ReentrantLock 实现了 Lock 接口,Lock 接口中定义了 lock 与 unlock 相关操作,并且还存在newCondition 方法,表示生成一个条件。
ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)。
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
// 默认非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2) Share(共享):多个线程可同时执⾏,如
CountDownLatch 、 Semaphore 、 CountDownLatch 、 CyclicBarrier 、 ReadWriteLock 我们都会在后⾯讲到
ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某⼀资源进⾏读
不同的⾃定义同步器争⽤共享资源的⽅式也不同。⾃定义同步器在实现时只需要实现共享资源state 的获取与释放⽅式即可,⾄于具体线程等待队列的维护(如获取资源失败⼊队/唤醒出队等),AQS 已经在顶层实现好了。
AQS 底层使⽤了模板⽅法模式
同步器的设计是基于模板⽅法模式的,如果需要⾃定义同步器⼀般的⽅式是这样(模板⽅法模式很经典的⼀个应⽤)
使⽤者继承 AbstractQueuedSynchronizer 并重写指定的⽅法。(这些重写⽅法很简单,⽆⾮是对于共享资源 state 的获取和释放)
将 AQS 组合在⾃定义同步组件的实现中,并调⽤其模板⽅法,⽽这些模板⽅法会调⽤使⽤者重写的⽅法。
AQS 使⽤了模板⽅法模式,⾃定义同步器时需要重写下⾯⼏个 AQS 提供的模板⽅法:
isHeldExclusively()//该线程是否正在独占资源。只有⽤到condition才需要去实现它。
tryAcquire(int)//独占⽅式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占⽅式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享⽅式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可⽤资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享⽅式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个⽅法都抛出 UnsupportedOperationException 。 这些⽅法的实现必须是内部线程安全的,并且通常应该简短⽽不是阻塞。AQS 类中的其他⽅法都是 final ,所以⽆法被其他类使⽤,只有这⼏个⽅法可以被其他类使⽤
以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调⽤tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失败,直到 A 线程unlock()到 state=0(即释放锁)为⽌,其它线程才有机会获取该锁。当然,释放锁之前,A 线程⾃⼰是可以重复获取此锁的(state 会累加),这就是可重⼊的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state是能回到零态的。
再以 CountDownLatch 以例,任务分为 N 个⼦线程去执⾏,state 也初始化为 N(注意 N 要与线程个数⼀致)。这 N 个⼦线程是并⾏执⾏的,每个⼦线程执⾏完后 countDown() ⼀次,state 会CAS(Compare and Swap)减 1。等到所有⼦线程都执⾏完后(即 state=0),会 unpark()主调⽤线程,然后主调⽤线程就会从 await() 函数返回,继续后余动作。
⼀般来说,⾃定义同步器要么是独占⽅法,要么是共享⽅式,他们也只需实现 tryAcquire tryRelease 、 tryAcquireShared-tryReleaseShared 中的⼀种即可。但 AQS 也⽀持⾃定义同步器同时实现独占和共享两种⽅式,如 ReentrantReadWriteLock 。
推荐两篇 AQS 原理和相关源码分析的⽂章:
http://www.cnblogs.com/waterystone/p/4920797.html
https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html