线程池类型
Executors.newSingleThreadExecutor
:一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。Executors.newFixedThreadPool(int nThreads)
:一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。Executors.newCachedThreadPool
:一个可以无限扩大的线程池,比较适合处理执行时间比较小的任务。Executors.newScheduledThreadPool(int corePoolSize)
:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。Executors.newWorkStealingPool
:一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行
线程池的处理流程
- 线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。
- 线程池的运行主要分成任务管理和线程管理两部分。
- 任务管理:充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
(1)直接申请线程执行该任务;
(2)缓冲到队列中等待线程执行;
(3)拒绝该任务。 - 线程管理:是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
ThreadPoolExecutor
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
其实看这几种方式创建的源码就会发现,实际上还是利用 ThreadPoolExecutor
类实现的。不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池。
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
}
我们重点来看下 ThreadPoolExecutor
。
* @param corePoolSize 核心线程池中最大线程数
* @param maximumPoolSize 线程池中的最大线程数
* @param keepAliveTime 空闲线程的存活时间
* @param unit 存活时间的单位
* @param workQueue 任务队列,保存已经提交但是还没有执行的任务
* @param threadFactory 线程工厂
* @param handler 拒绝策略
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这几个核心参数的作用:
-
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
-
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程
-
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了
allowCoreThreadTimeOut(true)
方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0 -
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
-
workQueue :用来保存等待被执行的任务的阻塞队列
任务必须实现Runable接口,一共4有个阻塞队列:- ArrayBlockingQueue:基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQuene:基于链表结构的无界阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
- SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
- priorityBlockingQuene:具有优先级的无界阻塞队列;
-
threadFactory :创建线程的工厂
可以选择DefaultThreadFactory,将创建一个同线程组且默认优先级的线程,也可以选择PrivilegedThreadFactory,使用访问权限创建一个权限控制的线程。但默认采用DefaultThreadFactory,当然也可以创建自定义线程工厂给每个新建的线程设置一个具有识别度的线程名。 -
handler:线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略
- AbortPolicy:直接抛出RejectedExecutionException异常,默认策略;
- CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程,即用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
线程池的默认拒绝策略为AbortPolicy
然后看看 execute()
方法是如何处理的:
- 获取当前线程池的状态。
- 当前线程数量小于 coreSize 时创建一个新的线程运行。
- 如果当前线程处于运行状态,并且写入阻塞队列成功。
- 双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
- 如果当前线程池为空就新创建一个线程并执行。
- 如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略。
线程池生命周期
RUNNING
自然是运行状态,指可以接受任务执行队列里的任务SHUTDOWN
指调用了shutdown()
方法,不再接受新任务了,但是队列里的任务得执行完毕。STOP
指调用了shutdownNow()
方法,不再接受新任务,同时抛弃阻塞队列里的所有任务并中断所有正在执行任务。TIDYING
所有任务都执行完毕,在调用shutdown()/shutdownNow()
中都会尝试更新为这个状态。TERMINATED
终止状态,当执行terminated()
后会更新为这个状态。
任务执行机制
任务调度
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
任务缓冲
线程池中是以生产者消费者模式,通过一个阻塞队列来实现的,阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。上面也提到了阻塞队列类型,这里再全面的总结一下
任务申请
由上文的任务分配部分可知,任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。
任务拒绝
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。拒绝策略是一个接口,其设计如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略。再次总结一下现有的拒绝策略。
优雅的关闭线程池
有运行任务自然也有关闭任务,从上文提到的 5 个状态就能看出如何来关闭线程池。
其实无非就是两个方法 shutdown()/shutdownNow()
。
但他们有着重要的区别:
shutdown()
执行后停止接受新任务,会把队列的任务执行完毕。shutdownNow()
也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。
两个方法都会中断线程,用户可自行判断是否需要响应中断。
shutdownNow()
要更简单粗暴,可以根据实际场景选择不同的方法。
我通常是按照以下方式关闭线程池的:
long start = System.currentTimeMillis();
for (int i = 0; i <= 5; i++) {
pool.execute(new Job());
}
pool.shutdown();
while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.info("线程还在执行。。。");
}
long end = System.currentTimeMillis();
LOGGER.info("一共处理了【{}】", (end - start));
pool.awaitTermination(1, TimeUnit.SECONDS)
会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED
),当从 while 循环退出时就表明线程池已经完全终止了。
线程之间的协作
当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。
join()
在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。
wait()、notify()、notifyAll()
调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 随机唤醒或者 notifyAll() 来唤醒所有挂起的线程。
它们都属于 Object 的一部分,而不属于 Thread。
只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException。
使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。
public class MultiThreadPrint {
private static final Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
System.out.println("Main start " + LocalTime.now());
Thread threadOne = new Thread(new MyThread());
threadOne.start();
Thread threadTwo = new Thread(new MyThread());
threadTwo.start();
System.out.println("Main sleep 5s " + LocalTime.now());
Thread.sleep(5000);
System.out.println("Main try to get lock... " + LocalTime.now());
synchronized (lock) {
System.out.println("Main get lock and notifyAll " + LocalTime.now());
lock.notifyAll();
}
threadOne.join();
threadTwo.join();
System.out.println("Main end");
}
public static class MyThread implements Runnable {
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " run and try to get lock... " + LocalTime.now());
synchronized (lock) {
try {
System.out.println(threadName + " get lock and sleep 2s... " + LocalTime.now());
Thread.sleep(2000);
System.out.println(threadName + " wait " + LocalTime.now());
lock.wait();
System.out.println(threadName + " get lock again and sleep 2s " + LocalTime.now());
Thread.sleep(2000);
} catch (Exception e) {
//
}
}
System.out.println(threadName + " end " + LocalTime.now());
}
}
}
生产者和消费者都有两层while,外层的while是用来判断是否运行生产者和消费者。内存的while用来判断队列queue是否已满或者为空,如果满足条件,则使得当前线程变成等待状态
当线程被wait之后,会释放对象锁。当等待的线程被notify之后,必须再次尝试去获取对象锁,如果没有获取到对象锁,那还必须等待,直到拿到对象锁之后才能向后执行。
await()、signal()、signalAll()
java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。
/**
* 多线程打印
*/
public class MultiThreadPrint {
private static final Lock lock = new ReentrantLock();
private static final Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
System.out.println("Main start " + LocalTime.now());
Thread threadOne = new Thread(new MyThread());
threadOne.start();
Thread threadTwo = new Thread(new MyThread());
threadTwo.start();
System.out.println("Main sleep 5s " + LocalTime.now());
Thread.sleep(5000);
System.out.println("Main try to get lock... " + LocalTime.now());
lock.lock();
System.out.println("Main get lock and signalAll " + LocalTime.now());
condition.signalAll();
lock.unlock();
threadOne.join();
threadTwo.join();
System.out.println("Main end");
}
static class MyThread implements Runnable {
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " run and try to get lock... " + LocalTime.now());
lock.lock();
try {
System.out.println(threadName + " get lock and sleep 2s... " + LocalTime.now());
Thread.sleep(2000);
System.out.println(threadName + " wait " + LocalTime.now());
condition.await();
System.out.println(threadName + " get lock again and sleep 2s " + LocalTime.now());
Thread.sleep(2000);
System.out.println(threadName + " end " + LocalTime.now());
} catch (Exception e) {
//
} finally {
lock.unlock();
}
}
}
}
和 wait 一样,await 在进入等待队列后会释放锁和 cpu
,当被其他线程唤醒或者超时或中断后都需要重新获取锁,获取锁后才会从 await 方法中退出,await 同样和 wait 一样存在等待返回不代表条件成立的问题,所以也需要主动循环条件判断。
Condition更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,“读线程"需要等待。如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程”,而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。 但是,通过Condition,就能明确的指定唤醒读线程。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[5];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
while (count == items.length)
notFull.await();
// 将x添加到缓冲中
items[putptr] = x;
// 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
if (++putptr == items.length) putptr = 0;
// 将“缓冲”数量+1
++count;
// 唤醒take线程,因为take线程通过notEmpty.await()等待
notEmpty.signal();
// 打印写入的数据
System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x);
} finally {
lock.unlock(); // 释放锁
}
}
public Object take() throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
while (count == 0)
notEmpty.await();
// 将x从缓冲中取出
Object x = items[takeptr];
// 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
if (++takeptr == items.length) takeptr = 0;
// 将“缓冲”数量-1
--count;
// 唤醒put线程,因为put线程通过notFull.await()等待
notFull.signal();
// 打印取出的数据
System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
return x;
} finally {
lock.unlock(); // 释放锁
}
}
}
public class ConditionTest2 {
private static BoundedBuffer bb = new BoundedBuffer();
public static void main(String[] args) {
// 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
// 启动10个“读线程”,从BoundedBuffer中不断的读数据。
for (int i=0; i<10; i++) {
new PutThread("p"+i, i).start();
new TakeThread("t"+i).start();
}
}
static class PutThread extends Thread {
private int num;
public PutThread(String name, int num) {
super(name);
this.num = num;
}
public void run() {
try {
Thread.sleep(1); // 线程休眠1ms
bb.put(num); // 向BoundedBuffer中写入数据
} catch (InterruptedException e) {
}
}
}
static class TakeThread extends Thread {
public TakeThread(String name) {
super(name);
}
public void run() {
try {
Thread.sleep(10); // 线程休眠1ms
Integer num = (Integer)bb.take(); // 从BoundedBuffer中取出数据
} catch (InterruptedException e) {
}
}
}
}
CountDownLatch
CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。
CountDownLatch 的 await() 方法使用不当很容易产生死锁,如果无法保证最后count为0的话。
@Slf4j
public class CountDownLacth {
private static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(()->{
try {
test(threadNum);
} catch (InterruptedException e) {
log.info("{}", e);
}finally {
countDownLatch.countDown();//每一次完成之后都要减一
}
});
}
//countDownLatch.await(10,TimeUnit.MILLISECONDS); //传入参数等待10ms之后没有完成也不再等了
countDownLatch.await(); //调用await,要等待所有之前的线程完成后完成
log.info("finish");
exec.shutdown();//关闭线程池
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
CountDownLatch 的两种典型用法
- 某一线程在开始运行前等待 n 个线程执行完毕。将 CountDownLatch 的计数器初始化为 n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减 1 countdownlatch.countDown(),当计数器的值变为 0 时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
- 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。
CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。
Semaphore
@Slf4j
public class SemaphoreExample1 {
private static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(10); //当前有多少个许可
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(()->{
try {
semaphore.acquire(); //获取许可。可以选择参数,一次性拿走多少个许可
test(threadNum);
semaphore.release(); //释放许可
} catch (InterruptedException e) {
log.info("{}", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
除了 acquire方法之外,另一个比较常用的与之对应的方法是tryAcquire方法,该方法如果获取不到许可就立即返回 false。
Semaphore 有两种模式,公平模式和非公平模式。
- 公平模式: 调用 acquire 的顺序就是获取许可证的顺序,遵循 FIFO;
- 非公平模式: 抢占式的。
CyclicBarrier(循环栅栏)
CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。来看一下它的构造函数:
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
其中,parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。
CyclicBarrier 的应用场景
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);//设定计数器为5
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
exec.execute(()->{
try {
rece(threadNum);
} catch (Exception e) {
log.error("except", e);
}
});
}
exec.shutdown();
}
private static void rece(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await(); //加计数器,可以加入参数,表示等待时间
log.info("{} is continue",threadNum);
}
}
CyclicBarrier 和 CountDownLatch 的区别
对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
线程核心参数理论设置
-
corePoolSize = 每秒需要多少个线程处理?
- threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
-8020原则,即80%情况下系统每秒任务数,若系统80%的情况下任务数小于200,最多时为1000,则corePoolSize可设置为20
- threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
-
queueCapacity = (coreSizePool/taskcost)*responsetime
- 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
- 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
-
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
- 计算可得 maxPoolSize = (1000-80)/10 = 92
- (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数