Skip to content

线程池

Posted on:April 8, 2021 at 19:20:13 GMT+8

SDK 30

线程池可以缓存一定数量的线程。重用线程池中的线程,可以避免创建和销毁线程的开销。

能有效控制最大线程数。

能对线程进行简单的管理,如定时执行或指定间隔循环执行。

Java 线程池接口是 Executor,实现是 ThreadPoolExecutor。

ThreadPoolExecutor 源码注释

An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.

Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks. Each ThreadPoolExecutor also maintains some basic statistics, such as the number of completed tasks.

To be useful across a wide range of contexts, this class provides many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient Executors factory methods Executors.newCachedThreadPool() (unbounded thread pool, with automatic thread reclamation), Executors.newFixedThreadPool(int) (fixed size thread pool) and Executors.newSingleThreadExecutor() (single background thread), that preconfigure settings for the most common usage scenarios. Otherwise, use the following guide when manually configuring and tuning this class:

Extension example. Most extensions of this class override one or more of the protected hook methods. For example, here is a subclass that adds a simple pause/resume feature:

 class PausableThreadPoolExecutor extends ThreadPoolExecutor {
   private boolean isPaused;
   private ReentrantLock pauseLock = new ReentrantLock();
   private Condition unpaused = pauseLock.newCondition();

   public PausableThreadPoolExecutor(...) { super(...); }

   protected void beforeExecute(Thread t, Runnable r) {
     super.beforeExecute(t, r);
     pauseLock.lock();
     try {
       while (isPaused) unpaused.await();
     } catch (InterruptedException ie) {
       t.interrupt();
     } finally {
       pauseLock.unlock();
     }
   }

   public void pause() {
     pauseLock.lock();
     try {
       isPaused = true;
     } finally {
       pauseLock.unlock();
     }
   }

   public void resume() {
     pauseLock.lock();
     try {
       isPaused = false;
       unpaused.signalAll();
     } finally {
       pauseLock.unlock();
     }
   }
 }

构造方法

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

7 个参数

corePoolSize:核心线程数。

maximumPoolSize:线程池允许创建的最大线程数。

keepAliveTime:非核心线程闲置的超时时间。

unit:时间单位。

workQueue:任务队列。

threadFactory:线程工厂。

handler:饱和策略。

线程池的处理流程

当提交任务时,如果线程数未超过 corePoolSize,将创建新线程执行任务。否则如果 workQueue 未满,将把任务入队。否则如果线程数量小于 maximumPoolSize,创建新线程执行任务。否则执行饱和策略。

ThreadPoolExecutor#execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

线程池的好处

3 点。

  1. 减少因创建和销毁线程带来的资源消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能直接执行。
  3. 可以对线程进行简单、统一的管理。

线程池中线程的创建

销毁

拒绝策略

ThreadPoolExecutor 中有四个拒绝策略。

AbortPolicy

CallerRunsPolicy

DiscardPolicy

DiscardOldestPolicy

线程池的种类

通过设置不同的 ThreadPoolExecutor 参数可以构建不同种类的线程池,Executors 类中提供了创建各种线程池的方法。比较常用的 4 种:

FixedThreadPool

可重用固定线程数的线程池。

核心线程数是 nThreads,最大线程数是 nThreads。只有核心线程,不会创建非核心线程。任务队列无界。

LinkedBlockingQueue FIFO。

某个线程发生了未预期的 Exception 而结束,将补充一个新的线程。

能快速响应外界的请求。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

CachedThreadPool

根据需要创建线程。

核心线程数为 0。非核心线程数不限。超时 60 秒。SynchronousQueue 是不存储元素的阻塞队列,每一个插入操作必须等待一个移除操作,一个移除操作必须等待一个插入操作。

适合大量需要立即处理且耗时较少的线程。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

SingleThreadPool

核心线程和最大线程数都是 1。

确保所有任务按照顺序执行。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

ScheduledThreadPool

能实现定时和周期性处理任务的线程池。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

任务队列

LinkedBlockingQueue

基于链表。FIFO。可以不指定 capacity 的大小,则是 Integer.MAX_VALUE。基于链表的队列通常比基于数组的队列有更高的吞吐量,但更不可预测的性能,在大多数并发的应用中。当插入时,会动态创建节点。

ArrayBlockingQueue

基于数组。FIFO。创建时需指定 capacity,创建后不可改变。插入时,如果数组已满,将被阻塞。从队列中获取时,如果是空的,也会阻塞。构造函数中可以传入 boolean fair = true,表示多个线程访问时确保 FIFO,如果没设置,顺序是不能保证的。

SynchronousQueue

不存储元素。无界。插入操作必须等待移除操作,移除操作必须等待插入操作。fair 参数支持。

线程池 异常处理

Java线程池「异常处理」正确姿势:有病就得治 - 掘金

Java线程池异常处理方案 - 简书

深度解析Java线程池的异常处理机制 · Issue #3 · aCoder2013/blog

线程池异常处理详解,一文搞懂! - 小推爱学习 - 博客园

终结线程池

  1. 要向执行程序表明想要完成它,可以使用ThreadPoolExecutor类的shutdown()方法。当执行程序完成所有挂起任务的执行时,它就完成了执行。在您调用shutdown()方法之后,如果您尝试将另一个任务发送给执行程序,它将被拒绝,执行程序将抛出RejectedExecutionException异常。
  2. ThreadPoolExecutor类提供了许多方法来获取关于其状态的信息。在示例中,我们使用getPoolSize()getActiveCount()getCompletedTaskCount()方法来获取关于池大小、线程数和执行程序完成的任务数的信息。您还可以使用getLargestPoolSize()方法,该方法每次返回池中已经存在的最大线程数。
  3. ThreadPoolExecutor类还提供了与执行器的终结相关的其他方法。这些方法有:

this 逃逸

AsyncTask

源码注释

AsyncTask was intended to enable proper and easy use of the UI thread. However, the most common use case was for integrating into UI, and that would cause Context leaks, missed callbacks, or crashes on configuration changes. It also has inconsistent behavior on different versions of the platform, swallows exceptions from doInBackground, and does not provide much utility over using Executors directly. AsyncTask is designed to be a helper class around Thread and Handler and does not constitute a generic threading framework. AsyncTasks should ideally be used for short operations (a few seconds at the most.) If you need to keep threads running for long periods of time, it is highly recommended you use the various APIs provided by the java.util.concurrent package such as Executor, ThreadPoolExecutor and FutureTask. An asynchronous task is defined by a computation that runs on a background thread and whose result is published on the UI thread. An asynchronous task is defined by 3 generic types, called Params, Progress and Result, and 4 steps, called onPreExecute, doInBackground, onProgressUpdate and onPostExecute. Developer Guides For more information about using tasks and threads, read the Processes and Threads developer guide. Usage AsyncTask must be subclassed to be used. The subclass will override at least one method (doInBackground), and most often will override a second one (onPostExecute.) Here is an example of subclassing:

   private class DownloadFilesTask extends AsyncTask<URL, Integer, Long> {
       protected Long doInBackground(URL... urls) {
           int count = urls.length;
           long totalSize = 0;
           for (int i = 0; i < count; i++) {
               totalSize += Downloader.downloadFile(urls[i]);
               publishProgress((int) ((i / (float) count) * 100));
               // Escape early if cancel() is called
               if (isCancelled()) break;
           }
           return totalSize;
       }

       protected void onProgressUpdate(Integer... progress) {
           setProgressPercent(progress[0]);
       }

       protected void onPostExecute(Long result) {
           showDialog("Downloaded " + result + " bytes");
       }
   }

Once created, a task is executed very simply:

   new DownloadFilesTask().execute(url1, url2, url3);

AsyncTask’s generic types The three types used by an asynchronous task are the following: Params, the type of the parameters sent to the task upon execution. Progress, the type of the progress units published during the background computation. Result, the type of the result of the background computation. Not all types are always used by an asynchronous task. To mark a type as unused, simply use the type Void:

   private class MyTask extends AsyncTask<Void, Void, Void> { ... }

The 4 steps When an asynchronous task is executed, the task goes through 4 steps: onPreExecute(), invoked on the UI thread before the task is executed. This step is normally used to setup the task, for instance by showing a progress bar in the user interface. doInBackground, invoked on the background thread immediately after onPreExecute() finishes executing. This step is used to perform background computation that can take a long time. The parameters of the asynchronous task are passed to this step. The result of the computation must be returned by this step and will be passed back to the last step. This step can also use publishProgress to publish one or more units of progress. These values are published on the UI thread, in the onProgressUpdate step. onProgressUpdate, invoked on the UI thread after a call to publishProgress. The timing of the execution is undefined. This method is used to display any form of progress in the user interface while the background computation is still executing. For instance, it can be used to animate a progress bar or show logs in a text field. onPostExecute, invoked on the UI thread after the background computation finishes. The result of the background computation is passed to this step as a parameter. Cancelling a task A task can be cancelled at any time by invoking cancel(boolean). Invoking this method will cause subsequent calls to isCancelled() to return true. After invoking this method, onCancelled(Object), instead of onPostExecute(Object) will be invoked after doInBackground(Object[]) returns. To ensure that a task is cancelled as quickly as possible, you should always check the return value of isCancelled() periodically from doInBackground(Object[]), if possible (inside a loop for instance.) Threading rules There are a few threading rules that must be followed for this class to work properly: The AsyncTask class must be loaded on the UI thread. This is done automatically as of Build.VERSION_CODES.JELLY_BEAN. The task instance must be created on the UI thread. execute must be invoked on the UI thread. Do not call onPreExecute(), onPostExecute, doInBackground, onProgressUpdate manually. The task can be executed only once (an exception will be thrown if a second execution is attempted.) Memory observability AsyncTask guarantees that all callback calls are synchronized to ensure the following without explicit synchronizations. The memory effects of onPreExecute, and anything else executed before the call to execute, including the construction of the AsyncTask object, are visible to doInBackground. The memory effects of doInBackground are visible to onPostExecute. Any memory effects of doInBackground preceding a call to publishProgress are visible to the corresponding onProgressUpdate call. (But doInBackground continues to run, and care needs to be taken that later updates in doInBackground do not interfere with an in-progress onProgressUpdate call.) Any memory effects preceding a call to cancel are visible after a call to isCancelled that returns true as a result, or during and after a resulting call to onCancelled. Order of execution When first introduced, AsyncTasks were executed serially on a single background thread. Starting with Build.VERSION_CODES.DONUT, this was changed to a pool of threads allowing multiple tasks to operate in parallel. Starting with Build.VERSION_CODES.HONEYCOMB, tasks are executed on a single thread to avoid common application errors caused by parallel execution. If you truly want parallel execution, you can invoke executeOnExecutor(Executor, Object[]) with THREAD_POOL_EXECUTOR. Deprecated Use the standard java.util.concurrent or Kotlin concurrency utilities instead.

源码分析

AsyncTask#execute:

@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);
}

executeOnExecutor

sDefaultExecutor 是 AsyncTask 内部的 SerialExecutor 类的一个实例,是一个串行的线程池。

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

如果队列中还有 task 就,继续 execute。SerialExecutor 用于 task 的排队,THREAD_POOL_EXECUTOR 用于 task 的真正执行。

@Deprecated
public static final Executor THREAD_POOL_EXECUTOR;

static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(), sThreadFactory);
    threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

核心线程数 1,最大线程 20,超时时间 3s,阻塞队列是 SynchronousQueue。

还有一个内部类 InternalHandler 用于将线程池切换到主线程。

先调用 onPreEexcute,再调用 exec.execute,将一个 FutureTask 传进去。

AsyncTask 的构造方法中有个 mWorker,mWorker 的 call 最终将在线程池中执行:

mWorker = new WorkerRunnable<Params, Result>() {
    public Result call() throws Exception {
        mTaskInvoked.set(true);
        Result result = null;
        try {
            Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
            //noinspection unchecked
            result = doInBackground(mParams);
            Binder.flushPendingCommands();
        } catch (Throwable tr) {
            mCancelled.set(true);
            throw tr;
        } finally {
            postResult(result);
        }
        return result;
    }
};

这里调用了 doInBackground 方法,得到 result,再传递给 postResult 方法:

private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}

通过 mHandler,获得了一个 MESSAGE_POST_RESULT 的 msg:

mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
    ? getMainHandler()
    : new Handler(callbackLooper);

getMainHandler:

private static Handler getMainHandler() {
    synchronized (AsyncTask.class) {
        if (sHandler == null) {
            sHandler = new InternalHandler(Looper.getMainLooper());
        }
        return sHandler;
    }
}
private static class InternalHandler extends Handler {
    public InternalHandler(Looper looper) {
        super(looper);
    }

    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
    @Override
    public void handleMessage(Message msg) {
        AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
        switch (msg.what) {
            case MESSAGE_POST_RESULT:
                // There is only one result
                result.mTask.finish(result.mData[0]);
                break;
            case MESSAGE_POST_PROGRESS:
                result.mTask.onProgressUpdate(result.mData);
                break;
        }
    }
}

sHandler 是一个静态对象,为了能将执行环境切换到主线程,就要在主线程创建。静态变量在类加载的时候进行初始化,因此要求 AsyncTask 要在主线程加载。

finish

private void finish(Result result) {
    if (isCancelled()) {
        onCancelled(result);
    } else {
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}

没取消就调用 onPostExecute。

Message#sendToTarget

public void sendToTarget() {
    target.sendMessage(this);
}

HandlerThread

该类继承自 Thread,内部创建了一个使用使用了 Handler 机制,start() 之后开启了 Looper 的死循环处理消息,Looper 可以退出,Message 消息在该线程中处理。可以利用其执行有序的任务!

IntentService

继承自 Service 拥有 Serivce 的特性,同时内部使用率 HandlerThread,当 onStart() 函数触发时,向 HandlerThread 中的 MessageQueue 中添加一条 Message ,自建的 Handler 处理完毕之后立即调用 stopSelf() 函数停止 Service 的运行。可以利用其在后台执行任务,执行完成之后不需要管理,自动停止。当然,记得和一般 Service 一样清单文件注册。

IntentService 中有一个继承了 Handler 的 ServiceHandler 和一个 HandlerThread。在 IntentaService 的 onCreate 方法中会创建一个名称为 IntentService[mName] 的 HandlerThread,再以这个 HandlerThread 的 Looper 为参数创建 ServiceHandler。startService 将 Intent 传递进来,会将 intent 作为 msg 的 obj,调用 mServiceHandler.sendMessage(msg); 交给 ServiceHandler 处理。在 ServiceHandler 的 handleMessage 方法中,会调用 IntentService 的 onHandleIntent 方法,处理完 stopSelf。

引申:Service 的生命周期。

一些问题

Handler 使用哪个 Looper 有什么区别?

Thread Runnbale Callable

为什么推荐使用 ThreadPoolExecutor 来创建线程?

规约一 :线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。

使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

规约二 :强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

Executors 返回线程池对象的弊端如下: FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能会堆积大量请求,从而导致 OOM。 CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM。

核心线程数设为 0,最大线程数设为 1000,提交 1000 个任务,会创建 1000 个线程吗?

LinkedBlockingQueue 不会。只会创建一个线程。

SynchronousQueue 会。

线程池参数根据什么设置?

简单的拟定判断

CPU 密集型任务(N+1):

这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型任务(2N):

这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

线程超时后会怎样?

超时后将返回 null 的 Runnable,

ThreadPoolExecutor#getTask

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

LinkedBlockingQueue#poll

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

ThreadPoolExecutor#runWorkder

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

线程回收的工作是在processWorkerExit方法完成的:

参考

Java线程池实现原理及其在美团业务中的实践 - 美团技术团队

关于 Handler 的一切

各类的源码

Java线程池原理与实战详解 - 知乎

Java线程池 - ThreadPoolExecutor示例 - 简书