线程类的执行方式

一、runnable的执行 , 作为thread的参数

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        // 在这里定义可在线程中执行的代码
    }
}

MyRunnable myRunnable = new MyRunnable();
Thread thread = new Thread(myRunnable);
thread.start();

or

Thread thread = new Thread(new Runnable() {
    @Override
    public void run() {
        // 在这里定义可在线程中执行的代码
    }
});
thread.start();

二、callable的执行,被ExecutorService执行or作为FeatureTask的参数

(后面看 底层,发现最终还是底层还是用execute(Runnable r)实现的 只不过这里的r是经过 FutureTask 封装过的Runnable

public class MyCallable implements Callable<T> {
    @Override
    public T call() throws Exception {
        // 在这里定义可被调用的代码
    }
}

MyCallable myCallable = new MyCallable();
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<T> future = executor.submit(myCallable);
T result = future.get();

or

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<T> future = executor.submit(new Callable<T>() {
    @Override
    public T call() throws Exception {
        // 在这里定义可被调用的代码
    }
});

T result = future.get();

or
使用FeatureTask (用FutureTask 包装 callable 再将FutureTask 对象作为参数创建线程,开启,获取参数通过FutureTask 对象获取 )

Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。

其实FutureTask 又实现了Runnable接口, 所以线程接受的还是一个Runnable变量…..

Callable<Process> task = () -> {
   // 执行异步任务
   Runtime runtime = Runtime.getRuntime();
   Process process = runtime.exec("/Users/mac/Desktop/qc-java-runtime/src/main/java/com/qc/runtime/shell.sh");
   return process;
};

// 将Callable包装成FutureTask
FutureTask<Process> future = new FutureTask<>(task);

// 启动新线程来执行异步任务
new Thread(future).start();

// 获取异步任务的结果
Process result = future.get();
System.out.println(result);

实现 Runnable 接⼝和 Callable 接⼝的区别

  • 1、最大的区别,runnable没有返回值,而实现callable接口的任务线程能返回执行结果(调用结果时主线程会阻塞来等待执行获取结果)
  • 2、callable接口实现类中的run方法允许异常向上抛出,可以在内部处理,try catch,但是runnable接口实现类中run方法的异常必须在内部处理,不能抛出;所以,如果任务不需要返回结果或抛出异常推荐使⽤ Runnable 接⼝,这样代码看起来会更加简洁

  • 3、callable和runnable都可以应用于executors(线程池)。而thread类只支持runnable

    • ⼯具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。( Executors.callable Runnable task )或 Executors.callable Runnable task Object resule )

线程池

为什么要⽤线程池?

池化技术相⽐⼤家已经屡⻅不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应⽤。池化技术的思想主要是为了减少每次获取资源的消耗,提⾼对资源的利⽤率。(避免每次重新建立对象,而直接保留这个对象复用)

线程池提供了⼀种限制和管理资源(包括执⾏⼀个任务)。 每个线程池还维护⼀些基本统计信息,例如已完成任务的数量。

这⾥借⽤《Java 并发编程的艺术》提到的来说⼀下使⽤线程池的好处:

  • 降低资源消耗。通过重复利⽤已创建的线程降低线程创建和销毁造成的消耗。
  • 提⾼响应速度。当任务到达时,任务可以不需要的等到线程创建就能⽴即执⾏。
  • 提⾼线程的可管理性。线程是稀缺资源,如果⽆限制的创建,不仅会消耗系统资源,还会降
  • 低系统的稳定性,使⽤线程池可以进⾏统⼀的分配,调优和监控。

Executor 框架

是 Java5 之后引进的,通过 Executor 来启动线程比使用 Thread 的start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。

补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

Executor 框架结构

1) 任务( Runnable / Callable )
执行任务需要实现的 Runnable 接口 或 Callable 接口。 Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

2) 任务的执行( Executor )

如下图所示,包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的ExecutorService 接口。 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口

image-20230331093320424

这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。

3) 异步计算的结果( Future ) (当需要获取线程执行结果时)

Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。
当我们把 Runnable 接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

Executor 框架的使用示意图

image-20230331093643850

  1. 主线程首先要创建实现 Runnable 或者Callable 接口的任务对象。
  2. 把创建完成的实现 Runnable 接口的 对象直接交给execute(Runnable r),或者也可以把 Runnable 对象或Callable 对象提交给 submit(Runnable r) or submit(Callable c)
  3. 如果执行 ExecutorService.submit(…)ExecutorService 将返回一个实现 Future 接口的对象submit() 会返回一个FutureTask 对象)。由于 FutureTask 实现了Runnable,我们也可以创建FutureTask,然后直接交给 ExecutorService执行。
  4. 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning) 来取消此任务的执行。

执行 execute() 方法和 submit() 方法的区别

  1. execute() (只能接收实现了Runable接口的实现类)

    该⽅法⽤于提交不需要返回值的任务,所以⽆法判断任务是否被线程池执⾏成功与否;

  2. submit() (可以接收Callable、Runnable两种类型的参数)

    该⽅法⽤于提交需要返回值的任务。

    线程池会返回⼀个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执⾏成功,并且可以通过 Future 的 get() ⽅法来获取返回值, get() ⽅法会阻塞当前线程直到任务完成,⽽使⽤ get long timeout TimeUnitunit ⽅法则会阻塞当前线程⼀段时间后⽴即返回,这时候有可能任务没有执⾏完

  3. 即使用了submit()底层 ExecutorService 还是用的execute

异常

execute会直接抛出任务执行时的异常,可以用try、catch来捕获,和普通线程的处理方式完全一致

submit会吃掉异常,可通过Future的get方法将任务执行时的异常重新抛出。

如何创建线程池

线程池的创建⽅式总共包含以下 7 种(其中 6 种是通过 Executors (工具类)创建的, 1 种是通过ThreadPoolExecutor 创建的, 其实前四个底层都是ThreadPoolExecutor)

  1. Executors.newFixedThreadPool:创建⼀个固定⼤⼩的线程池,可控制并发的线程数,超出的线程会在队列中等待;
    • 允许请求的队列⻓度为Integer.MAX_VALUE
  2. Executors.newCachedThreadPool:创建⼀个可缓存的线程池,若线程数超过处理所需,缓存⼀段时间后会回收,若线程数不够,则新建线程;
    • 允许创建的线程数量为 Integer.MAX_VALUE
  3. Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执⾏顺序;
    • 允许请求的队列⻓度为Integer.MAX_VALUE
  4. Executors.newScheduledThreadPool:创建⼀个可以执⾏延迟任务的线程池;
    • 允许创建的线程数量为Integer.MAX_VALUE
  5. Executors.newSingleThreadScheduledExecutor:创建⼀个单线程的可以执⾏延迟任务的线程池;
  6. Executors.newWorkStealingPool:创建⼀个抢占式执⾏的线程池(任务执⾏顺序不确定)【JDK1.8 添加】。
  7. ThreadPoolExecutor:最原始的创建线程池的⽅式,它包含了 7 个参数可供设置,后⾯会详细讲。

《阿⾥巴巴 Java 开发⼿册》中强制线程池不允许使⽤ Executors 去创建,⽽是通过ThreadPoolExecutor 的⽅式,这样的处理⽅式让写的同学更加明确线程池的运⾏规则,规避资源耗尽的⻛险

Executors 返回线程池对象的弊端如下:

FixedThreadPool SingleThreadExecutor : 允许请求的队列⻓度为Integer.MAX_VALUE ,可能堆积⼤量的请求,从⽽导致 OOM。

CachedThreadPool ScheduledThreadPool : 允许创建的线程数量为Integer.MAX_VALUE ,可能会创建⼤量线程,从⽽导致 OOM。

ThreadPoolExecutor 类分析

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
  • corePoolSize :线程池中核心线程数的最大值
  • maximumPoolSize :线程池中能拥有最多线程数
  • workQueue:用于缓存任务的阻塞队列

当调用线程池execute() 方法添加一个任务时,线程池会做如下判断:

  • 如果有空闲线程,则直接执行该任务;
  • 如果没有空闲线程,且当前运行的线程数少于corePoolSize,则创建新的线程执行该任务;
  • 如果没有空闲线程,且当前的线程数等于corePoolSize,同时阻塞队列未满,则将任务入队列,而不添加新的线程;
  • 如果没有空闲线程,且阻塞队列已满,同时池中的线程数小于maximumPoolSize ,则创建新的线程执行任务;
  • 如果没有空闲线程,且阻塞队列已满,同时池中的线程数等于maximumPoolSize ,则根据构造函数中的 handler 指定的策略来拒绝新的任务。

KeepAliveTime:

  • keepAliveTime :表示空闲线程的存活时间
  • TimeUnit unit :表示keepAliveTime的单位

当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

workQueue 任务队列:

  • workQueue :它决定了缓存任务的排队策略

ThreadPoolExecutor线程池推荐了三种等待队列,它们是:SynchronousQueue 、LinkedBlockingQueue和 ArrayBlockingQueue。

1)有界队列:

SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于 阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
ArrayBlockingQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

2)无界队列:

LinkedBlockingQueue:基于链表结构的无界阻塞队列,它可以指定容量也可以不指定容量(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE)
PriorityBlockingQueue:是一个按照优先级进行内部元素排序的无界阻塞队列。队列中的元素必须实现 Comparable 接口,这样才能通过实现compareTo()方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue 不会保证优先级一样的元素的排序。
注意:keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。

threadFactory:

  • threadFactory :指定创建线程的工厂。(可以不指定) 一般使用Executors 工具类里面指定的工厂

  • 如果不指定线程工厂时,ThreadPoolExecutor 会使用ThreadPoolExecutor.defaultThreadFactory 创建线程。默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY 的优先级,以及名为 “pool-XXX-thread-” 的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。

handler 拒绝策略:

  • handler :表示当 workQueue 已满,且池中的线程数达到 maximumPoolSize 时,线程池拒绝添加新任务时采取的策略。(可以不指定)

  • | 策略 | BB |
    | ———————————————————— | —————————————————————————————— |
    | ThreadPoolExecutor.AbortPolicy() | 来拒绝新任务的处理。抛出RejectedExecutionException异常。默认策略 |
    | ThreadPoolExecutor.CallerRunsPolicy() | 由向线程池提交任务的线程来执行该任务,当最⼤池被填满时,此策略为我们提供可伸缩队列。 |
    | ThreadPoolExecutor.DiscardPolicy() | 不处理新任务,直接丢弃掉。 |
    | ThreadPoolExecutor.DiscardOldestPolicy() | 此策略将丢弃最早的队列中那个未处理的任务请求 |

    最科学的的还是 AbortPolicy 提供的处理方式:抛出异常,由开发人员进行处理。

Atomic 原⼦类

Atomic 翻译成中⽂是原⼦的意思。在化学上,我们知道原⼦是构成⼀般物质的最⼩单位,在化学反应中是不可分割的。在我们这⾥ Atomic 是指⼀个操作是不可中断的。即使是在多个线程⼀起执⾏的时候,⼀个操作⼀旦开始,就不会被其他线程⼲扰。

并发包 java.util.concurrent 的原⼦类都存放在 java.util.concurrent.atomic中

JUC 包中的原⼦类(4类)

原子基本类型

使⽤原⼦的⽅式更新基本类型

  • AtomicInteger :整形原⼦类
  • AtomicLong :⻓整型原⼦类
  • AtomicBoolean :布尔型原⼦类
数组类型

使⽤原⼦的⽅式更新数组⾥的某个元素

  • AtomicIntegerArray :整形数组原⼦类
  • AtomicLongArray :⻓整形数组原⼦类
  • AtomicReferenceArray :引⽤类型数组原⼦类
引⽤类型
  • AtomicReference :引⽤类型原⼦类
  • AtomicStampedReference :原⼦更新带有版本号的引⽤类型。该类将整数值与引⽤关联起来,可⽤于解决原⼦的更新数据和数据的版本号,可以解决使⽤ CAS 进⾏原⼦更新时可能出现的 ABA 问题。
  • AtomicMarkableReference :原⼦更新带有标记位的引⽤类型
对象的属性修改类型
  • AtomicIntegerFieldUpdater :原⼦更新整形字段的更新器
  • AtomicLongFieldUpdater :原⼦更新⻓整形字段的更新器
  • AtomicReferenceFieldUpdater :原⼦更新引⽤类型字段的更新器

AQS

关于AQS 具体看看 我的 AQS 与ReentrantLock这篇博客

​ AQS 的全称为( AbstractQueuedSynchronizer ),这个类在 java.util.concurrent.locks 包下⾯。

AQS ( Abstract Queued Synchronizer )是一个抽象的队列同步器,通过维护一个共享资源状态( Volatile Int State )和一个先进先出( FIFO )的线程等待队列来实现一个多线程访问共享资源的同步框架。

在这里插入图片描述

状态信息通过 protected 类型的 getState , setState , compareAndSetState 进行操作

AQS 定义两种资源共享方式

1)Exclusive(独占)

只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁,下面以 ReentrantLock 对这两种锁的定义做介绍:

  1. 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁

  2. 非公平锁:当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。

了解一下 ReentrantLock 中相关知识:

ReentrantLock 实现了 Lock 接口,Lock 接口中定义了 lock 与 unlock 相关操作,并且还存在newCondition 方法,表示生成一个条件。

ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)。

/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
    // 默认非公平锁
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

2) Share(共享):多个线程可同时执⾏,如

CountDownLatch 、 Semaphore 、 CountDownLatch 、 CyclicBarrier 、 ReadWriteLock 我们都会在后⾯讲到

ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某⼀资源进⾏读

不同的⾃定义同步器争⽤共享资源的⽅式也不同。⾃定义同步器在实现时只需要实现共享资源state 的获取与释放⽅式即可,⾄于具体线程等待队列的维护(如获取资源失败⼊队/唤醒出队等),AQS 已经在顶层实现好了。

AQS 底层使⽤了模板⽅法模式

同步器的设计是基于模板⽅法模式的,如果需要⾃定义同步器⼀般的⽅式是这样(模板⽅法模式很经典的⼀个应⽤)

  1. 使⽤者继承 AbstractQueuedSynchronizer 并重写指定的⽅法。(这些重写⽅法很简单,⽆⾮是对于共享资源 state 的获取和释放)

  2. 将 AQS 组合在⾃定义同步组件的实现中,并调⽤其模板⽅法,⽽这些模板⽅法会调⽤使⽤者重写的⽅法。

AQS 使⽤了模板⽅法模式,⾃定义同步器时需要重写下⾯⼏个 AQS 提供的模板⽅法:

isHeldExclusively()//该线程是否正在独占资源。只有⽤到condition才需要去实现它。
tryAcquire(int)//独占⽅式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占⽅式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享⽅式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可⽤资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享⽅式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个⽅法都抛出 UnsupportedOperationException 。 这些⽅法的实现必须是内部线程安全的,并且通常应该简短⽽不是阻塞。AQS 类中的其他⽅法都是 final ,所以⽆法被其他类使⽤,只有这⼏个⽅法可以被其他类使⽤

以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调⽤tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失败,直到 A 线程unlock()到 state=0(即释放锁)为⽌,其它线程才有机会获取该锁。当然,释放锁之前,A 线程⾃⼰是可以重复获取此锁的(state 会累加),这就是可重⼊的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state是能回到零态的。

再以 CountDownLatch 以例,任务分为 N 个⼦线程去执⾏,state 也初始化为 N(注意 N 要与线程个数⼀致)。这 N 个⼦线程是并⾏执⾏的,每个⼦线程执⾏完后 countDown() ⼀次,state 会CAS(Compare and Swap)减 1。等到所有⼦线程都执⾏完后(即 state=0),会 unpark()主调⽤线程,然后主调⽤线程就会从 await() 函数返回,继续后余动作。

⼀般来说,⾃定义同步器要么是独占⽅法,要么是共享⽅式,他们也只需实现 tryAcquire tryRelease 、 tryAcquireShared-tryReleaseShared 中的⼀种即可。但 AQS 也⽀持⾃定义同步器同时实现独占和共享两种⽅式,如 ReentrantReadWriteLock 。

推荐两篇 AQS 原理和相关源码分析的⽂章:

http://www.cnblogs.com/waterystone/p/4920797.html

https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html