AQS与ReentrantLock
基础知识
线程类对象的三种创建方式
继承Thread类
编写一个类,继承
Thread
类,如MyThread
,重写父类的run
方法即可在主线程中调用
new MyThread().start()
即可实现Runable接口
编写一个类,实现
Runnable
接口,重写run
方法在主线程中将这个类的实例作为参数传进Thread构造函数,通过Thread实例start调用
实现Callable接口
编写一个类,实现
Callable
接口,重写call
方法需要与
FutureTask
结合使用
3种让线程等待和唤醒的方法
wait() notify()
使用Object
中的wait()
方法让线程等待,使用Object
中的notify()
方法唤醒线程
- Object类中的wait、notify、notifyAll用于线程等待和唤醒的方法,都必须在sychronized内部执行(必须用到关键字sychronized,sychronized锁住任意对象或者类),且成对出现使用,否则会报错
- 先wait,再notify才可以。
// 正常情况下
static Object objectLock = new Object();
public static void main(String[] args) {
new Thread(()->{
synchronized (objectLock){
System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
try {
objectLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"\t"+"----被唤醒");
}
},"A").start();
new Thread(()->{
synchronized (objectLock){
objectLock.notify();
System.out.println(Thread.currentThread().getName()+"\t"+"----唤醒动作");
}
},"B").start();
}
//输出结果
A ----come in
B ----唤醒动作
A ----被唤醒
//异常情况1:wait()和notify()脱离synchronized单独使用。
static Object objectLock = new Object();
public static void main(String[] args) {
new Thread(()->{
// synchronized (objectLock){
System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
try {
objectLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"\t"+"----被唤醒");
// }
},"A").start();
new Thread(()->{
// synchronized (objectLock){
objectLock.notify();
System.out.println(Thread.currentThread().getName()+"\t"+"----唤醒动作");
// }
},"B").start();
}
//输出结果
java.lang.IllegalMonitorStateException
//异常情况2:先notify(),后wait(),A线程一直处于等待状态。
static Object objectLock = new Object();
public static void main(String[] args) {
new Thread(()->{
//暂停几秒钟,确保B线程先进入。
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (objectLock){
System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
try {
objectLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"\t"+"----被唤醒");
}
},"A").start();
new Thread(()->{
synchronized (objectLock){
objectLock.notify();
System.out.println(Thread.currentThread().getName()+"\t"+"----唤醒动作");
}
},"B").start();
}
//输出结果
B ----唤醒动作
A ----come in
await() signal()
这个有一点涉及到本章要讲的 ReentrantLock
使用JUC
包中Condition
的await()
方法让线程等待,使用signal()
方法唤醒线程
线程需要获得并持有锁,必须在锁块(synchronized或lock)中
必须要先等待(await() )后唤醒(signal),线程才能够被唤醒。
//正常情况下
static Object objectLock = new Object();
static Lock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) {
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
condition.await();
System.out.println(Thread.currentThread().getName()+"\t"+"----被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
},"A").start();
new Thread(()->{
lock.lock();
try{
condition.signal();
System.out.println(Thread.currentThread().getName()+"\t"+"----唤醒动作");
}finally{
lock.unlock();
}
},"B").start();
}
//输出情况
A ----come in
B ----唤醒动作
A ----被唤醒
//异常情况1:注销lock()和unlock()
static Object objectLock = new Object();
static Lock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) {
new Thread(()->{
// lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
condition.await();
System.out.println(Thread.currentThread().getName()+"\t"+"----被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// lock.unlock();
}
},"A").start();
new Thread(()->{
// lock.lock();
try{
condition.signal();
System.out.println(Thread.currentThread().getName()+"\t"+"----唤醒动作");
}finally{
// lock.unlock();
}
},"B").start();
}
//输出结果报异常
java.lang.IllegalMonitorStateException
//异常情况2:线程B先进入,即先signal(),后await()
static Object objectLock = new Object();
static Lock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) {
new Thread(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
condition.await();
System.out.println(Thread.currentThread().getName()+"\t"+"----被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
},"A").start();
new Thread(()->{
lock.lock();
try{
condition.signal();
System.out.println(Thread.currentThread().getName()+"\t"+"----唤醒动作");
}finally{
lock.unlock();
}
},"B").start();
}
//输出结果
B ----唤醒动作
A ----come in
LockSupport类
LockSupport
类可以阻塞当前线程以及唤醒指定被阻塞的线程
总结
LockSupport是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞之后也有对应的唤醒方法。归根揭底,LockSupport调用的是Unsafe中的native代码。
- LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit)。
- permit只有两个值1和0,默认是0。
- 可以把许可看成是一种(0,1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1。
- permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park()方法会被唤醒,然后会将permit再次设置为0并返回。
//不需要在锁块中
public static void main(String[] args) {
Thread a = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "----come in");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "\t" + "----被唤醒");
}, "A");
a.start();
Thread b = new Thread(() -> {
LockSupport.unpark(a);
System.out.println(Thread.currentThread().getName() + "\t" + "----唤醒动作");
}, "B");
b.start();
}
//输出结果
A ----come in
B ----唤醒动作
A ----被唤醒
//先执行unpark(),依然可以被唤醒
public static void main(String[] args) {
Thread a = new Thread(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + "----come in");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "\t" + "----被唤醒");
}, "A");
a.start();
Thread b = new Thread(() -> {
LockSupport.unpark(a);
System.out.println(Thread.currentThread().getName() + "\t" + "----唤醒动作");
}, "B");
b.start();
}
//输出结果
B ----唤醒动作
A ----come in
A ----被唤醒
可见相比于 前两种方式,最后LockSupport这种方式不需要获取锁对象就能锁住线程,唤醒时选哟指定唤醒的线程
为什么可以先唤醒线程后阻塞线程 (先 upark后 park, 后面这个park不生效)?
——-因为unpark获取了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。
——-但是这种品正只能用一次,当调用park以后发现有凭证,就会不阻塞但是把凭证设为0
为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?(先 upark两次后 park两次, 第一个park不生效但是最后面这个park真的会锁柱)
——-因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样,只会增加一个凭证;而调用两次park却需要消费两个凭证,证不够,不能放行。
ReentrantLock的简单实用
//正常情况下
public class Test4{
private volatile int num;
private Lock lock = new ReentrantLock();
private int getNum(){
return num;
}
private void addNum(){
lock.lock(); // 加锁
try{
Thread.sleep(2L);
num++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
finally {
lock.unlock(); // 解锁
}
}
public static void main(String[] args) throws InterruptedException {
Test4 t = new Test4();
for(int i = 0 ;i < 100;i++){
new Thread( t::addNum ).start();
}
Thread.sleep(500L);
System.out.println(t.getNum());
}
}
模板方法设计模式
以一个数据挖掘案例为例,针对Doc,Csv ,PDF等文件进行处理
发现针对每一个类型的处理的大致步骤及流程基本一致(如何打开文件、如何提取文件、如何关闭文件),并且还有些处理方法一样
这些流程可以被定义成父类,当你对其中一种文件进行处理的时候就可以继承父类,重写 父类中的这些步骤,就可以时整个模板有扩展性
ReentrantLock初步探索
底层实现:
ReentrantLock 的底层是通过 AbstractQueuedSynchronizer (AQS)实现
继承关系:
ReentrantLock 实现了 Lock 接口,Lock 接口中定义了 lock 与 unlock 相关操作,并且还存在newCondition 方法,表示生成一个条件。
构造函数
ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)。
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
// 默认非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
类的内部类 : Sync、NonfairSync、FairSync
ReentrantLock 总共有三个内部类,并且三个内部类是紧密相关的,下面先看三个类的关系
ReentrantLock 有一个 Sync 属性,sync 有俩子类 NonfairSync 和 FairSync 。
说明: ReentrantLock 类内部总共存在 Sync、NonfairSync、FairSync 三个类,NonfairSync 与FairSync类继承自 Sync 类,Sync 类继承自 AbstractQueuedSynchronizer(AQS)抽象类。下面逐个进行分析。
总结:
点击关于
ReentrantLock的简单实用
中的lock()方法,发现进入了 Lock接口的lock()方法,ReentrantLock实现了这个Lock接口在进入ReentrantLock 看看他是如何实现 lock方法的,发现它调用了sync.lock(); 额。。。
在看看这个sync是个啥东西,发现他是ReentrantLock 内部定义的一个静态抽象内部类Sync,这个内部类继承了AbstractQueuedSynchronizer (AQS),然后这个sync具体的实现有两种一个是FairSync,一个NonfairSync,嗯 也就是说 ReentrantLock 的许多功能都是通过 AQS实现的。
abstract static class Sync extends AbstractQueuedSynchronizer
看来必须要了解一下AQS了。。。
AQS (以ReentrantLock为例)
AQS有两个重要的组成部分
AQS组成部分
state :private volatile int state;
volatile 保证了多线程的可见性
state 的值代表了同步状态 : 具体多少代表加锁多少代表解锁呢?其实在不同的子类实现中有不同的含义
- 以ReentrantLock中 state = 0 表示当前共享资源没有被加锁,state=1表示被一个线程加锁了 ,state>1表示被同一个线程多次加锁(这一点上看,ReenTrantLock时可重入的)
关于state的有三个方法 final 子类无法重写
- final int getState
- final void setState
final boolean compareAndSetState
补充:
- 在多线程的情况下针对共享变量的修改,一种悲观的方式就是加锁去实现,另一种乐观的实现就是CAS
- CAS全称:Compare And Swap,
- 他有三个操作数,: 内存位置、 预期数值(或者说版本)、新值
- 简单来讲,就是想要对一个变量有所修改,先要对变量有一个预期,如果现在获取到的这个数不符合预期或者不符合版本,就修改失败
- 如这里,我想让AQS state变量从0修改为1, 内存位置就是state存储数据的位置,预期数值为0 ,新值为1,如果state不等于0那就修改失败
Node 节点构成的双向链表(等待队列)
Node节点这个数据结构又分为几个组成部分
- int waitStatus
- Node prev
- Node next
- Thread thread
这个Node节点构成的链表主要用于存储当前没有获得锁的线程
一个线程是否能获得锁取决于AQS当前state的值,如果没有获得锁的线程就放在Node双向链表中(也就是等待队列)
例子
举一个简单的例子
一个资源处于没有被锁的状态即state=0,这时来了一个线程thread0尝试取获取资源,采用CAS的方式将state改为1
这时来了一个线程thread1也来尝试获取资源也尝试CAS获取锁,但是没能成功
这时AQS将这个线程加入到等待队列中
可以看到,AQS第一次加入阻塞线程的时候会创建两个节点
此时线程thread2 3 4也来了
重复1的步骤CAS失败,加入队列
接下来,线程thread0执行完释放了锁,state状态变为0
这时线程thread1就会被唤醒,但是并不是直接获得了锁,而是也要经历CAS修改状态来获得锁
假设它修改成功了,获得了锁
获得了锁之后AQS会使线程thread1的Node节点出队
接着,线程会按照上述步骤执行
AQS源码
AQS父类
查看AQS源码发现 AQS继承了一个AbstractOwnableSynchronizer的类,这个类比较简单,有一个成员变量 exclusiveOwnerThread 代表了当前占有(锁)资源的线程,并分别设定了get set方法
AQS其他成员变量
第一个红框中,代表了AQS队列的头尾Node
第二个红框中 表示了state相关的操作
第三个红框中 是 等待时长相关参数,有关超时终端
此外还有一个重要成员变量就是 组成 AQS 队列的Node
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
前两个 变量 也即 SHARED EXCLUSIVE 表明了 AQS支持的两种模式,一个是共享的(共享锁)一个是排他的(独占锁)
thread 表示包装到节点中的那个线程
nextWaiter 作用:
- 在条件队列中,它指向下一个节点
- 同步队列中,指的是这个当前这个节点要获取的是共享锁还是排他锁
方法:
isShared :判断当前节点要获取的是共享锁还是排他锁
predecessor:获取当前节点的前驱节点
现在讲讲waitStatus 代表的含义
假设这样一个场景,老师找一些童鞋单独谈话,这些童鞋按照顺序来谈,每次老师和一个同学谈完,这个同学有”责任”去喊下一位同学进办公室
这样的话,我们将这些“责任”符号化
最后一位INITIAL表示没有责任,SIGNAL表示叫下一位,CANCELED表示需要告知取消了
再次类比到程序中
看上面采用静态常量的方式表示状态,注意初始状态是0,-1表示需要唤醒下一个节点,1表示取消状态
以ReentrantLock.lock()(非公平锁的lock)为例,AQS执行步骤
- ReentrantLock.lock() 调用了 aync.lock
- aync有两种实现 ,默认非公平锁实现进入
- 找到 lock()
final void lock() {
if (compareAndSetState(0, 1)) // 希望获取锁
setExclusiveOwnerThread(Thread.currentThread()); // 如果成功,把独占线程设为当前线程
else
acquire(1); // 如果没有获取锁成功??
}
仔细看 aquire() 有四个函数,
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire 的实现
首先说一下,AQS 的status状态变量是一个int变量,其实这些状态具体是什么含义是子类去定义的
status为何值时加锁成功,为何值时加锁/解锁失败,这都需要子类自己去定义
这里的tryAcquire就是这些方法其中之一, 我们会猜测,既然这个方法需要子类去定义的话,是不是在父类抽象类种应该定义为抽象方法呢
发现 AQS中的tryAcquire并不是抽象方法,,,而是在方法中直接抛了一个异常为什么?
并且,你会发现AQS作为一个抽象类居然没有一个抽象方法!!!!!问题1: 既然AQS没有抽象方法,那他为什么要定义一个抽象类呢?
问题2: 既然AQS已经定义为了抽象类,那么该子类实现的方法并没有定义为抽象方法呢?回答 问题1: AQS作为一种模板,不希望别人直接拿来套用(不希望别人直接new AQS对象去使用) 所以AQS声明为抽象类
回答 问题2: tyrAquire应该是尝试获取锁,像这种方法还有tryRelease、tryAcquireShared、tryReleaseShared 顾名思义 这些方法嗾使尝试 获取/获取 (共享/独占) 锁的, 而 ReentrantLock作为一种独占锁,他应该是不需要 tryAcquireShared、tryReleaseShared 这两个函数的,但是ReentrantLock里面的实现类为了作为一种实现类,就必须实现这些方法,这也太麻烦了,所以为了后面麻烦,AQS就在这里“帮忙实现了”,如果你在实现中不小心调用了这个方法就会抛出异常总结:模板模式 + 按需实现
接下来,我们看看在非公平锁实现类种tryAcquire
是怎么实现的
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
非公平锁加锁方式
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS当前值
int c = getState();
if (c == 0) { // 如果是 0 (没有加锁 )就CAS加锁
if (compareAndSetState(0, acquires)) {
// 如果加锁成功 就把state改为当前线程的值
setExclusiveOwnerThread(current); //
return true;
}
}
// 如果当前线程state不等于零(大于零)要判断是不是重入锁
else if (current == getExclusiveOwnerThread()) {
// 如果当前前线程就是 当前锁的线程,当前线程重复加锁
// 如:三个方法加的时同一把锁并且采用递归调用的方式
// 当前线程 不需要CAS 为什么?
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 可见,如果获取锁成功就 lock()方法结束就会继续运行下去
- 获取锁失败 那么 (!tryAquire())为true, 就会判断下一段 逻辑 ( acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
addWaiter 的实现
顾名思义,没有获得线程的节点会被打包成Node加入到同步队列
这是一个父类抽象类AQS已经提我们实现的方法
// AQS默认实现
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // AQS的同步队列已经有了节点,把当前节点放到队尾
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 当为AQS队列为空时 执行
return node;
}
看看队列为空(也即尾节点为空)时怎么加新节点的?
// AQS默认实现
private Node enq(final Node node) {
// 进入一个死循环,通过不断尝试的方式 将 Node节点加入到同步队列当中
for (;;) {
Node t = tail; //拿到当前同步队列的尾指针,如果为空
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
// 当尾节点为空时,表示AQS队列为空,这个是就会new 一个 新的 Node节点(“空”节点),这个节点不包含任何数据
// 并通过CAS的方式将 这个“空”节点设置为 头节点,并将尾指针也指向这个头节点(“空”节点)
tail = head;
//但是这样并没有跳出循环 ,下次进入就是进入else语句
} else {
node.prev = t; // 把 新节点 的前驱节点指向 尾节点 (“空”节点)
if (compareAndSetTail(t, node)) { // 尝试将队尾这设置为 该新节点
t.next = node; // 成功后将 原本的尾节点的后继改为 新节点
return t;
}
}
}
}
捋一下思路
理想情况下
对于当前AQS来说, 资源的拥有者为thread0 ,并且该AQS同步队列的头节点尾节点都为空
现在thread1来争抢资源失败就会被打包成Node节点
new 一个新节点并将其设置为头节点
new 一个 新的 Node节点(“空”节点),这个节点不包含任何数据
并通过CAS的方式将 这个(“空”节点)设置为 头节点,并将尾指针也指向这个头节点(这个(“空”节点))
把 新节点 的前驱节点指向 尾节点 (“空”节点)
尝试将队尾这设置为 该新节点, 成功后将 原本的尾节点的后继改为 新节点
以上就是理想情况下(只有一个线程来争抢资源到进入等待队列的过程)
多线程并发可能的情况
现在thread1,2都进入了 enq这个函数
线程在尝试修改的头节点的时候,发现,头节点已经被CAS修改过了,那么就会修改失败,返回到 for循环首部
这时因为尾节点不为空(可能时“空节点” 但不是 null 也可能是 线程2节点)
就会进入else语句把 新节点 的前驱节点指向 尾节点 (线程2节点)
尝试将队尾设置为 该线程1节点, (这一步也会用CAS的方式尝试,如果没能成功 — 不成功的原因:可能因为设置尾节点的时候发现尾节点 变了(由原来的“空节点”变成了 线程2节点) , 如果没能成功就会重新进入循环再次 CAS直到成功为止),
成功后将 原本的尾节点(线程2节点)的后继改为 新节点
- 并发不出错的关键原因: 通过CAS的方式修改头节点和尾节点
仔细对比addWaiter与enq方法会发现,addWaiter if种逻辑的部分与 enq else 逻辑的部分相同,作者为什么不把他们写到一起呢?
可能是这样想的,如果合并写,就需要每次进来判空(但是大多数情况多线程时,这个队列时不为空的,没必要判空)
那其实我们就可以把 非空的语句放到上面即可呀,然后合并不久行了?
其实java高版本就是这样做的,他已经把enq删除了,先去判断非空
接下来依旧按照lock() 的逻辑往下走,( acquireQueued(addWaiter(Node.EXCLUSIVE), arg))他把addWaiter返回的结果 (已经加入到同步队列的节点)传给了 acquireQueued
acquireQueued的实现
获取队列
// AQS默认实现
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 获取锁是否成功
try {
boolean interrupted = false; //执行过程中是否发生了中断
for (;;) {
final Node p = node.predecessor(); // 获取新加入现成的前驱节点
if (p == head && tryAcquire(arg)) { //判断该前驱节点是否为头节点,(如果是头节点)并尝试获取锁
// 假设争抢到了锁
// 把 把原来的头节点内容置空 并设置为头节点,相当于把原来的 头节点替换了,并把 线程取出来作为头节点
setHead(node);
p.next = null; // help GC
failed = false;
// 执行 return之前需要先执行 finally当中的代码
// if (failed) cancelAcquire(node);
// 不会执行
// 返回false
return interrupted;
}
// 那些 没有资格抢到锁(前驱节点不是头节点)以及抢了锁但是没有成功的,都会进入到这个if进行判断,
// shouldParkAfterFailedAcquire 获取到锁失败后时候应该阻塞?
// parkAndCheckInterrupt 阻塞并检查中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在进入shouldParkAfterFailedAcquire之前再讲讲AQS节点的状态,lock方法默认实现的是一种 独占的,不可响应中断的锁,
不可响应中断也就是 放进阻塞队列的线程,发生了终端否不会抛出异常也不会取消该线程的任务,也就是并不会因为终端而将这个线程节点从AQS同步队列种移除,也就是不会由 AQS定义的那几种状态中的 CANCELLED,SIGNAL表示有“责任”去叫下一位,肯定有这个状态,CONDITION表示条件队列,没有;PROPAGATE表示共享资源队列,没有
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 首先拿到该节点前驱节点的等待状态
int ws = pred.waitStatus;
// 针对前驱节点的状态尽享操作
// ReenTrantLock 非公平锁默认 只有 INITIAL以及 SIGNAL两种状态
// ws大于零指的是其他锁实现的情况
if (ws == Node.SIGNAL)
// 如果前驱节点的等待状态等于SIGNAL 当前节点就应该阻塞
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//到了这里 前驱节点的状态只能是 0 或者 共享锁的PROPAGATE
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 通过CAS的方式修改前驱节点的状态 修改为 SIGANL状态
// 相当于 告诉 前驱节点,“前面的大哥,老师叫到我的话你告诉我一声,我先睡了(阻塞了)再次执行时才阻塞”
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 当节点的前驱节点为SIGNAL,表名该节点应该被阻塞,阻塞的逻辑在这里实现
// 发现利用的就是LockSupport.park()方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
接着上一小节的例子开始
新加的节点 其状态都为 INITIAL=0
理想情况下
线程 0 执行完毕 会将 AQS 的锁拥有者改为 NULL,线程 1 尝试获取锁成功
多线程阻塞的情况
想要获取锁的话,必须满足 p==head
也就是必须是该节点的前驱节点是头节点
如果前驱节点不是头节点以及尝试获取锁失败,会检查前驱节点是不是SIGNAL ,前驱节点不是SIGNAL 他会尝试查前驱节点改成SIGNAL并陷入阻塞
至此我们分析完了 ReentrantLock的lock() 方法的执行流程
上面总结了 lock的流程,会发现,陷入阻塞的线程有两种唤醒方式,
- 前驱节点执行完毕释放锁唤醒下个节点,会继续执行上面的循环逻辑
- 但不能有这样的误解,释放锁的线程会把锁交给队列中的下一个线程
- 唤醒一个线程只是说再一次赋予了它抢占锁的能力,至于能否获取到,还得看争抢结果,还会回到了 acquireQueued
- 发生中断
- park 方法比较特殊,他会响应中断,回事阻塞的线程唤醒,但是他并不会抛出异常,这样来看就前面逻辑执行相同
总结:
- 一个Node节点并不是一旦尝试获取锁失败就立马进入阻塞,而是在阻塞之前至少经历多次获取锁的流程
ReentrantLock解锁
看看怎么解锁的吧
public void unlock() {
sync.release(1);
}
嗯,早就预料到了,调用的是最终调用的还是 AQS的实现
release 有两个作用
1、 检查释放锁是否成功
2、唤醒后继节点(同步队列唤醒节点:)
什么情况下同步队列有需要唤醒的节点呢?考虑下面几种情况
1、头节点为空,说明同步队列中没有节点 不需要唤醒
2、同步队列 正在加入第一个节点,这时候 加入的节点还没有发生阻塞,也就是他还有机会抢锁,不需要唤醒
3、队列中的所有阻塞节点已经出队完毕, 不需要唤醒
4、综上:要执行唤醒后继节点的操作的前提是: 头节点不为空,且头节点的等待状态不为零(不是INITIAL)
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁
Node h = head; //
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //唤醒后继节点
return true;
}
return false;
}
tryRelease
那么看看tryRelease(),自然的和上面一样,他和tryLock一样是需要子类去实现的
//Sync 实现的
protected final boolean tryRelease(int releases) {
// 将state的值 -1 ,因为已经执行完毕了
int c = getState() - releases;
// 判断 当前线程时候是不是AQS持有锁的线程
// 这句话的意思是,想要释放锁,前提是 你是所得持有者
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全释放锁的标志(ReentrantLock是一个可重入锁 state 可以往上叠加),只有state == 0 才算完全释放锁
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
唤醒后继节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 获取头节点 状态
int ws = node.waitStatus;
//如果小于零 将其变为 INITIAL状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 拿到头节点的后继节点,
Node s = node.next;
// 有些节点可能因为终端而取消,我们需要跳过这些节点,找到第一个真正需要唤醒的第一个节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 头节点和后继节点不为空,表明需要唤醒这个节点
if (s != null)
LockSupport.unpark(s.thread);
}
关于中断
在lock流程那里,
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这里的意思是当线程被唤醒线程发生了中断而被唤醒将返回true
那么acquireQueued(final Node node, int arg) 返回true
那么最终进入
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
执行selfInterrupt 方法
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
其实内部调用了 线程的中断方法实现一个中断
也就是也就是AQS是不管你是不是中断,而是发生中断了会给你一个 中断的标志,具体要不要中断是你自己决定的
注意:ReentrantLock是可以响应中断的,但是这里说的是lock方法不可响应中断
关于公平和非公平
错误观点:
公平就是按排队的顺序来唤醒,非公平就是随机唤醒?
在AQS持有锁的线程释放锁以后,可能来竞争锁的线程有两种类型,一种就是在同步队列中线程,准确的说就是队列中第一个等待的线程,因为只有它才有争抢锁的资格(FIFO规则)。另外一种想要争抢锁的线程就是队列外的线程,也就是没有排队的线程。如果释放的锁可以被这些没有排队的线程优先抢到的话,那对于排队的线程来说就是非公平的,如果我们优先队列中的线程获取到锁的话,那对于他们就是公平的。说到底就是允不允许争抢锁的时候插队,当然这里的插队,是指插在队头!允许插队就是不公平的,不允许就是公平的。
也就是非公平表示允许直接插队插到队头,这也太不公平了!!哈哈哈
再次回到AQS
我们从AQS讲到他的一个实现 ReentrantLock,作为一个模板类,他还有许多其他的实现,这里我们回到AQS总结一下
AQS 的全称为( AbstractQueuedSynchronizer ),这个类在 java.util.concurrent.locks 包下⾯。
AQS ( Abstract Queued Synchronizer )是一个抽象的队列同步器,通过维护一个共享资源状态( Volatile Int State )和一个先进先出( FIFO )的线程等待队列来实现一个多线程访问共享资源的同步框架。
状态信息通过 protected 类型的 getState , setState , compareAndSetState 进行操作
AQS 定义两种资源共享方式
1)Exclusive(独占)
只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁,下面以 ReentrantLock 对这两种锁的定义做介绍:
公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
非公平锁:当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。
了解一下 ReentrantLock 中相关知识:
ReentrantLock 实现了 Lock 接口,Lock 接口中定义了 lock 与 unlock 相关操作,并且还存在newCondition 方法,表示生成一个条件。
ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)。
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
// 默认非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2) Share(共享):多个线程可同时执⾏,如
CountDownLatch 、 Semaphore、 CyclicBarrier 、 ReadWriteLock 我们都会在后⾯讲到
ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某⼀资源进⾏读
不同的⾃定义同步器争⽤共享资源的⽅式也不同。⾃定义同步器在实现时只需要实现共享资源state 的获取与释放⽅式即可,⾄于具体线程等待队列的维护(如获取资源失败⼊队/唤醒出队等),AQS 已经在顶层实现好了。
AQS 底层使⽤了模板⽅法模式
同步器的设计是基于模板⽅法模式的,如果需要⾃定义同步器⼀般的⽅式是这样(模板⽅法模式很经典的⼀个应⽤)
使⽤者继承 AbstractQueuedSynchronizer 并重写指定的⽅法。(这些重写⽅法很简单,⽆⾮是对于共享资源 state 的获取和释放)
将 AQS 组合在⾃定义同步组件的实现中,并调⽤其模板⽅法,⽽这些模板⽅法会调⽤使⽤者重写的⽅法。
AQS 使⽤了模板⽅法模式,⾃定义同步器时需要重写下⾯⼏个 AQS 提供的模板⽅法:
isHeldExclusively()//该线程是否正在独占资源。只有⽤到condition才需要去实现它。
tryAcquire(int)//独占⽅式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占⽅式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享⽅式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可⽤资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享⽅式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个⽅法都抛出 UnsupportedOperationException 。 这些⽅法的实现必须是内部线程安全的,并且通常应该简短⽽不是阻塞。AQS 类中的其他⽅法都是 final ,所以⽆法被其他类使⽤,只有这⼏个⽅法可以被其他类使⽤
以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调⽤tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失败,直到 A 线程unlock()到 state=0(即释放锁)为⽌,其它线程才有机会获取该锁。当然,释放锁之前,A 线程⾃⼰是可以重复获取此锁的(state 会累加),这就是可重⼊的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state是能回到零态的。
再以 CountDownLatch 以例,任务分为 N 个⼦线程去执⾏,state 也初始化为 N(注意 N 要与线程个数⼀致)。这 N 个⼦线程是并⾏执⾏的,每个⼦线程执⾏完后 countDown() ⼀次,state 会CAS(Compare and Swap)减 1。等到所有⼦线程都执⾏完后(即 state=0),会 unpark()主调⽤线程,然后主调⽤线程就会从 await() 函数返回,继续后余动作。
⼀般来说,⾃定义同步器要么是独占⽅法,要么是共享⽅式,他们也只需实现 tryAcquire tryRelease 、 tryAcquireShared-tryReleaseShared 中的⼀种即可。但 AQS 也⽀持⾃定义同步器同时实现独占和共享两种⽅式,如 ReentrantReadWriteLock 。
推荐两篇 AQS 原理和相关源码分析的⽂章:
http://www.cnblogs.com/waterystone/p/4920797.html
https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html
AQS 组件总结
Semaphore (信号量) 允许多个线程同时访问: synchronized 和 ReentrantLock 都是⼀次只允许⼀个线程访问某个资源, Semaphore (信号量)可以指定多个线程同时访问某个资源。
CountDownLatch (倒计时器): CountDownLatch 是⼀个同步⼯具类,⽤来协调多个线程之间的同步。这个⼯具通常⽤来控制线程等待,它可以让某⼀个线程等待直到倒计时结束,再开始执⾏。
CyclicBarrier (循环栅栏): CyclicBarrier 和 CountDownLatch ⾮常类似,它也可以实现线程间的技术等待,但是它的功能⽐ CountDownLatch 更加复杂和强⼤。主要应⽤场景和CountDownLatch 类似。 CyclicBarrier 的字⾯意思是可循环使⽤( Cyclic )的屏障( Barrier )。它要做的事情是,让⼀组线程到达⼀个屏障(也可以叫同步点)时被阻塞,直到最后⼀个线程到达屏障时,屏障才会开⻔,所有被屏障拦截的线程才会继续⼲活。 CyclicBarrier 默认的构造⽅法是 CyclicBarrier(int parties) ,其参数表示屏障拦截的线程数量,每个线程调⽤ await() ⽅法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
Semaphore (信号量)
如下面我指定最多可以允许20的线程同时访问资源
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
*
* @author Snailclimb
* @date 2018年9月30日
* @Description: 需要一次性拿一个许可的情况
*/
public class SemaphoreExample1 {
// 请求的数量
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
// 一次只能允许执行的线程数量。
final Semaphore semaphore = new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20
test(threadnum);
semaphore.release();// 释放一个许可
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}
执行 acquire 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 release 方法增加一个许可证,这可能会释放一个阻塞的 acquire 方法。然而,其实并没有实际的许可证这个对象,Semaphore 只是维持了一个可获得许可证的数量。 Semaphore 经常用于限制获取某种资源的线程数量。
当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:
semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4
test(threadnum);
semaphore.release(5);// 获取5个许可,所以可运行线程数量为20/5=4
除了 acquire 方法之外,另一个比较常用的与之对应的方法是 tryAcquire 方法,该方法如果获取不到许可就立即返回 false
Semaphore 有两种模式,公平模式和非公平模式。
公平模式: 调用 acquire 的顺序就是获取许可证的顺序,遵循 FIFO;
非公平模式: 抢占式的。
Semaphore 对应的两个构造方法如下:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
补充:Semaphore与CountDownLatch一样,也是共享锁的一种实现。它默认构造AQS的state为permits。当执行任务的线程数量超出permits,那么多余的线程将会被放入阻塞队列Park,并自旋判断state是否大于0。只有当state大于0的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行release方法,release方法使得state的变量会加1,那么自旋的线程便会判断成功。
如此,每次只有最多不超过permits数量的线程能自旋成功,便限制了执行任务线程的数量。
CountDownLatch(倒计时器)
CountDownLatch是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用countDown方法时,其实使用了 tryReleaseShared 方法以CAS的操作来减少state,直至state为0就代表所有的线程都调用了countDown方法。当调用await方法的时候,如果state不为0,就代表仍然有线程没有调用countDown方法,那么就把已经调用过countDown的线程都放入阻塞队列Park,并自旋CAS判断state== 0,直至最后一个线程调用了countDown,使得state == 0,于是阻塞的线程便判断成功,全部往下执行。
简单来说:CountDownLatch 的作⽤就是 允许 count 个线程阻塞在⼀个地⽅,直⾄所有线程的任务都执⾏完毕。
具体有两种使用场景
1、某一线程在开始运行前等待 n 个线程执行完毕。将 CountDownLatch 的计数器初始化为 n : new CountDownLatch(n) ,每当一个任务线程执行完毕,就将计数器减 1 countdownlatch.countDown() ,当计数器的值变为 0 时,在 CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
2、实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 : new CountDownLatch(1) ,多个线程在开始执行任务前首先 coundownlatch.await() ,当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。
之前在项⽬中,有⼀个使⽤多线程读取多个⽂件处理的场景,我⽤到了 CountDownLatch 。
我们要读取处理 6 个⽂件,这 6 个任务都是没有执⾏顺序依赖的任务,但是我们需要返回给⽤户的时候将这⼏个⽂件的处理的结果进⾏统计整理。
为此我们定义了⼀个线程池和 count 为 6 的 CountDownLatch 对象 。使⽤线程池处理读取任务,每⼀个线程处理完之后就将 count-1并将任务状态更新 ,或者中途中处理失败count-1, 任务状态不更新,调⽤ CountDownLatch 对象的 await() ⽅法, 等待,为了防止无线等待,这里设置一个超时时间为30分钟,若到时间还未处理完,则结束任务,直到所有任务处理完之后,才会接着执⾏后⾯的逻辑。
public class CountDownLatchExample1 {
// 处理⽂件的数量
private static final int threadCount = 6;
public static void main(String[] args) throws InterruptedException {
// 创建⼀个具有固定线程数量的线程池对象(推荐使⽤构造⽅法创建)
ExecutorService threadPool = Executors.newFixedThreadPool(10);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {
try {
//处理⽂件的业务操作
......
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//表示⼀个⽂件已经被完成
countDownLatch.countDown();
}
});
}
countDownLatch.await(30, TimeUnit.MINUTES);
threadPool.shutdown();
System.out.println("finish");
}
}
改进之处
可以使⽤ CompletableFuture 类来改进!Java8 的 CompletableFuture 提供了很多对多线程友好的⽅法,使⽤它可以很⽅便地为我们编写多线程程序,什么异步、串⾏、并⾏或者等待所有线程执⾏完任务什么的都⾮常⽅便
CompletableFuture<Void> task1 =
CompletableFuture.supplyAsync(()->{
//⾃定义业务操作
});
......
CompletableFuture<Void> task6 =
CompletableFuture.supplyAsync(()->{
//⾃定义业务操作
});
......
CompletableFuture<Void>
headerFuture=CompletableFuture.allOf(task1,.....,task6);
try {
headerFuture.join();
} catch (Exception ex) {
......
}
System.out.println("all done. ");
上⾯的代码还可以接续优化,当任务过多的时候,把每⼀个 task 都列出来不太现实,可以考虑通过循环来添加任务。
//⽂件夹位置
List<String> filePaths = Arrays.asList(...)
// 异步处理所有⽂件
List<CompletableFuture<String>> fileFutures = filePaths.stream()
.map(filePath -> doSomeThing(filePath))
.collect(Collectors.toList());
// 将他们合并起来
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
fileFutures.toArray(new CompletableFuture[fileFutures.size()])
)
与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown() 方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await() 方法,恢复执行自己的任务。
CyclicBarrier(循环栅栏)
CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。
CountDownLatch的实现是基于AQS的,而CycliBarrier是基于 ReentrantLock(ReentrantLock也属于AQS同步器)和 Condition 的.
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties) ,其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
其中,parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
也就是 相比于CountDownLatch 是减的方式 CyclicBarrier是加的方式
但实质上,CyclicBarrier底层采用 ReentrantLock实现,最终,底层 内部通过一个 count 变量作为计数器,cout 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。也就是还是减的方式😀
/**
*
* @author Snailclimb
* @date 2018年10月1日
* @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
*/
public class CyclicBarrierExample2 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
public static void test(int threadnum) throws InterruptedException,
BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒,保证子线程完全执行结束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}
}
// 输出
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......
可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await 方法之后的方法才被执行。
另外,CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties, RunnablebarrierAction) ,用于在线程到达屏障时,优先执行 barrierAction ,方便处理更复杂的业务场景。
CyclicBarrier 和 CountDownLatch 的区别
CountDownLatch 是计数器,只能使用一次,而 CyclicBarrier 的计数器提供 reset 功能,可以多次使用。
此外对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待
CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
ReentrantLock 和 ReentrantReadWriteLock
读写锁 ReentrantReadWriteLock 可以保证多个线程可以同时读,所以在读操作远大于写操作的时候,读写锁就非常有用了。