原创

Java并发编程(二)-线程池的介绍

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lyhkmm/article/details/83586491

什么是线程池?为什么要有线程池?

       线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
       如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
  那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
  在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起

ThreadPoolExecutor类

       JDK1.8中的java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的部分实现源码。

public class ThreadPoolExecutor extends AbstractExecutorService {
	public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
	public ThreadPoolExecutor(int corePoolSize,
	                          int maximumPoolSize,
	                          long keepAliveTime,
	                          TimeUnit unit,
	                          BlockingQueue<Runnable> workQueue,
	                          RejectedExecutionHandler handler) {
	    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
	         Executors.defaultThreadFactory(), handler);
	}
	public ThreadPoolExecutor(int corePoolSize,
	                          int maximumPoolSize,
	                          long keepAliveTime,
	                          TimeUnit unit,
	                          BlockingQueue<Runnable> workQueue,
	                          ThreadFactory threadFactory) {
	    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
	         threadFactory, defaultHandler);
	}
	public ThreadPoolExecutor(int corePoolSize,
	                          int maximumPoolSize,
	                          long keepAliveTime,
	                          TimeUnit unit,
	                          BlockingQueue<Runnable> workQueue,
	                          ThreadFactory threadFactory,
	                          RejectedExecutionHandler handler) {
	    if (corePoolSize < 0 ||
	        maximumPoolSize <= 0 ||
	        maximumPoolSize < corePoolSize ||
	        keepAliveTime < 0)
	        throw new IllegalArgumentException();
	    if (workQueue == null || threadFactory == null || handler == null)
	        throw new NullPointerException();
	    this.acc = System.getSecurityManager() == null ?
	            null :
	            AccessController.getContext();
	    this.corePoolSize = corePoolSize;
	    this.maximumPoolSize = maximumPoolSize;
	    this.workQueue = workQueue;
	    this.keepAliveTime = unit.toNanos(keepAliveTime);
	    this.threadFactory = threadFactory;
	    this.handler = handler;
	    }
}

       从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作,以下介绍构成方法参数的意思。

    corePoolSize

      核心池的大小,如果池中的实际线程数小于corePoolSize,无论是否其中有空闲的线程,都会给新的任务产生新的线程
      如果池中的线程数>corePoolSize and <maximumPoolSize,而又有空闲线程,就给新任务使用空闲线程,如没有空闲线程,则产生新线程
    如果池中的线程数=maximumPoolSize,则有空闲线程使用空闲线程,否则新任务放入workQueue。

    maximumPoolSize

      线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    keepAliveTime

      表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    unit

       参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒

    workQueue

       一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响, BlockingQueue是个接口,有如下实现类:
       ArrayBlockQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。创建其对象必须明确大小,像数组一样。
       LinkedBlockQueue:一个可改变大小的阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。创建其对象如果没有明确大小,默认值是Integer.MAX_VALUE。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
       PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
       SynchronousQueue:同步队列。同步队列没有任何容量,每个插入必须等待另一个线程移除,反之亦然。

threadFactory

       线程工厂,主要用来创建线程;

handler

       表示当拒绝处理任务时的策略,有以下四种取值:
       ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
       ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
       ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
       ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系

       Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
  接着抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
       最后ThreadPoolExecutor继承了类AbstractExecutorService。

ThreadPoolExecutor类中有几个非常重要的方法

        execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
        submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
        shutdown()当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
        shutdownNow()当线程池调用该方法时,线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。
它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

Executors类的静态方法

       Java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池(PS:不过阿里巴巴的代码规范插件推荐使用ThreadPoolExecutor):

   newCachedThreadPool

       创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。这种类型的线程池特点是:
       工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
       如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
       在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。

   newFixedThreadPool

       创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
       FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

   newSingleThreadExecutor

       创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

   newScheduleThreadPool

       创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行。

   newWorkStealingPool

       创建一个带并行级别的线程池,并行级别决定了同一时刻最多有多少个线程在执行,如不传并行级别参数,将默认为当前系统的CPU个数。

线程池的使用

       通过Executors.newFixedThreadPool(10)静态方法创建一个10个工作线程数量的线程池。
       通过new ThreadPoolExecutor()创建,具体参数的意思上文有详细介绍。如果线程池不再使用时,最好使用shutdown()方法关闭,代码如下。

public class TestThreadPool {
    public static void main(String str[]){
        testExecutors();
        testThreadPool();
    }

    /** * 使用Executors静态方法创建线程池 */
    public static void testExecutors(){
        //推荐使用newFixedThreadPool线程池
        ExecutorService executorService=Executors.newFixedThreadPool(10);
        //往线程池添加10个线程
        for(int i=1;i<=10;i++){
            executorService.execute(new RunnableDemo("Executors:"+i));
        }
        executorService.shutdown();
    }

    /** * 使用ThreadPoolExecutor创建线程池 */
    public static void testThreadPool(){
        ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(
                5, 10, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                //如果线程池添加线程失败,则暂停1s后再同步线程
                try {
                    System.out.println("线程添加失败");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        for(int i=1;i<=10;i++){
            threadPoolExecutor.execute(new RunnableDemo("ThreadPool:"+i));
        }
        threadPoolExecutor.shutdown();
    }
}
正文到此结束
Loading...