SDK 30
线程池可以缓存一定数量的线程。重用线程池中的线程,可以避免创建和销毁线程的开销。
能有效控制最大线程数。
能对线程进行简单的管理,如定时执行或指定间隔循环执行。
Java 线程池接口是 Executor,实现是 ThreadPoolExecutor。
ThreadPoolExecutor 源码注释
An ExecutorService
that executes each submitted task using one of possibly several pooled threads, normally configured using Executors
factory methods.
Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks. Each ThreadPoolExecutor
also maintains some basic statistics, such as the number of completed tasks.
To be useful across a wide range of contexts, this class provides many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient Executors
factory methods Executors.newCachedThreadPool()
(unbounded thread pool, with automatic thread reclamation), Executors.newFixedThreadPool(int)
(fixed size thread pool) and Executors.newSingleThreadExecutor()
(single background thread), that preconfigure settings for the most common usage scenarios. Otherwise, use the following guide when manually configuring and tuning this class:
-
Core and maximum pool sizes
A
ThreadPoolExecutor
will automatically adjust the pool size (seegetPoolSize()
) according to the bounds set by corePoolSize (seegetCorePoolSize()
) and maximumPoolSize (seegetMaximumPoolSize()
). When a new task is submitted in methodexecute(Runnable)
, and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such asInteger.MAX_VALUE
, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically usingsetCorePoolSize(int)
andsetMaximumPoolSize(int)
. -
On-demand construction
By default, even core threads are initially created and started only when new tasks arrive, but this can be overridden dynamically using method
prestartCoreThread()
orprestartAllCoreThreads()
. You probably want to prestart threads if you construct the pool with a non-empty queue. -
Creating new threads
New threads are created using a
ThreadFactory
. If not otherwise specified, aExecutors.defaultThreadFactory()
is used, that creates threads to all be in the sameThreadGroup
and with the sameNORM_PRIORITY
priority and non-daemon status. By supplying a different ThreadFactory, you can alter the thread’s name, thread group, priority, daemon status, etc. If aThreadFactory
fails to create a thread when asked by returning null fromnewThread
, the executor will continue, but might not be able to execute any tasks. Threads should possess the “modifyThread”RuntimePermission
. If worker threads or other threads using the pool do not possess this permission, service may be degraded: configuration changes may not take effect in a timely manner, and a shutdown pool may remain in a state in which termination is possible but not completed. -
Keep-alive times
If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see
getKeepAliveTime(TimeUnit)
). This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed. This parameter can also be changed dynamically using methodsetKeepAliveTime(long, TimeUnit)
. Using a value ofLong.MAX_VALUE
TimeUnit.NANOSECONDS
effectively disables idle threads from ever terminating prior to shut down. By default, the keep-alive policy applies only when there are more than corePoolSize threads. But methodallowCoreThreadTimeOut(boolean)
can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero. -
Queuing
Any
BlockingQueue
may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing: If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing. If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread. If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected. There are three general strategies for queuing: Direct handoffs. A good default choice for a work queue is aSynchronousQueue
that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed. Unbounded queues. Using an unbounded queue (for example aLinkedBlockingQueue
without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn’t have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed. Bounded queues. A bounded queue (for example, anArrayBlockingQueue
) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput. -
Rejected tasks
New tasks submitted in method
execute(Runnable)
will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, theexecute
method invokes theRejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
method of itsRejectedExecutionHandler
. Four predefined handler policies are provided: In the defaultThreadPoolExecutor.AbortPolicy
, the handler throws a runtimeRejectedExecutionException
upon rejection. InThreadPoolExecutor.CallerRunsPolicy
, the thread that invokesexecute
itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted. InThreadPoolExecutor.DiscardPolicy
, a task that cannot be executed is simply dropped. InThreadPoolExecutor.DiscardOldestPolicy
, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.) It is possible to define and use other kinds ofRejectedExecutionHandler
classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies. -
Hook methods
This class provides
protected
overridablebeforeExecute(Thread, Runnable)
andafterExecute(Runnable, Throwable)
methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, methodterminated()
can be overridden to perform any special processing that needs to be done once the Executor has fully terminated. If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate. -
Queue maintenance
Method
getQueue()
allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods,remove(Runnable)
andpurge()
are available to assist in storage reclamation when large numbers of queued tasks become cancelled. -
Finalization
A pool that is no longer referenced in a program AND has no remaining threads will be
shutdown
automatically. If you would like to ensure that unreferenced pools are reclaimed even if users forget to callshutdown()
, then you must arrange that unused threads eventually die, by setting appropriate keep-alive times, using a lower bound of zero core threads and/or settingallowCoreThreadTimeOut(boolean)
.
Extension example. Most extensions of this class override one or more of the protected hook methods. For example, here is a subclass that adds a simple pause/resume feature:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
构造方法
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
7 个参数
corePoolSize:核心线程数。
maximumPoolSize:线程池允许创建的最大线程数。
keepAliveTime:非核心线程闲置的超时时间。
unit:时间单位。
workQueue:任务队列。
threadFactory:线程工厂。
handler:饱和策略。
线程池的处理流程
当提交任务时,如果线程数未超过 corePoolSize,将创建新线程执行任务。否则如果 workQueue 未满,将把任务入队。否则如果线程数量小于 maximumPoolSize,创建新线程执行任务。否则执行饱和策略。
ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
线程池的好处
3 点。
- 减少因创建和销毁线程带来的资源消耗。
- 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能直接执行。
- 可以对线程进行简单、统一的管理。
线程池中线程的创建
销毁
拒绝策略
ThreadPoolExecutor 中有四个拒绝策略。
AbortPolicy
CallerRunsPolicy
DiscardPolicy
DiscardOldestPolicy
线程池的种类
通过设置不同的 ThreadPoolExecutor 参数可以构建不同种类的线程池,Executors 类中提供了创建各种线程池的方法。比较常用的 4 种:
FixedThreadPool
可重用固定线程数的线程池。
核心线程数是 nThreads,最大线程数是 nThreads。只有核心线程,不会创建非核心线程。任务队列无界。
LinkedBlockingQueue FIFO。
某个线程发生了未预期的 Exception 而结束,将补充一个新的线程。
能快速响应外界的请求。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
CachedThreadPool
根据需要创建线程。
核心线程数为 0。非核心线程数不限。超时 60 秒。SynchronousQueue 是不存储元素的阻塞队列,每一个插入操作必须等待一个移除操作,一个移除操作必须等待一个插入操作。
适合大量需要立即处理且耗时较少的线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
SingleThreadPool
核心线程和最大线程数都是 1。
确保所有任务按照顺序执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ScheduledThreadPool
能实现定时和周期性处理任务的线程池。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
任务队列
LinkedBlockingQueue
基于链表。FIFO。可以不指定 capacity 的大小,则是 Integer.MAX_VALUE。基于链表的队列通常比基于数组的队列有更高的吞吐量,但更不可预测的性能,在大多数并发的应用中。当插入时,会动态创建节点。
ArrayBlockingQueue
基于数组。FIFO。创建时需指定 capacity,创建后不可改变。插入时,如果数组已满,将被阻塞。从队列中获取时,如果是空的,也会阻塞。构造函数中可以传入 boolean fair = true,表示多个线程访问时确保 FIFO,如果没设置,顺序是不能保证的。
SynchronousQueue
不存储元素。无界。插入操作必须等待移除操作,移除操作必须等待插入操作。fair 参数支持。
线程池 异常处理
深度解析Java线程池的异常处理机制 · Issue #3 · aCoder2013/blog
终结线程池
- 要向执行程序表明想要完成它,可以使用
ThreadPoolExecutor
类的shutdown()
方法。当执行程序完成所有挂起任务的执行时,它就完成了执行。在您调用shutdown()
方法之后,如果您尝试将另一个任务发送给执行程序,它将被拒绝,执行程序将抛出RejectedExecutionException
异常。 ThreadPoolExecutor
类提供了许多方法来获取关于其状态的信息。在示例中,我们使用getPoolSize()
、getActiveCount()
和getCompletedTaskCount()
方法来获取关于池大小、线程数和执行程序完成的任务数的信息。您还可以使用getLargestPoolSize()
方法,该方法每次返回池中已经存在的最大线程数。ThreadPoolExecutor
类还提供了与执行器的终结相关的其他方法。这些方法有:
- shutdownNow():该方法立即关闭执行程序。它不执行挂起的任务。它返回一个包含所有这些挂起任务的列表。调用此方法时正在运行的任务将继续执行,但该方法不会等待它们的结束。
- isTerminated():如果您调用了
shutdown()
或shutdownNow()
方法,并且执行程序完成了关闭该方法的过程,则该方法返回true。 - isShutdown():如果调用了executor的shutdown()方法,则该方法返回true。
- awaitTermination(long timeout,TimeUnitunit):该方法阻塞调用线程,直到执行程序的任务结束或超时发生。
TimeUnit
类是一个枚举,它具有以下常量:DAYS
,HOURS
,MICROSECONDS
等。
this 逃逸
AsyncTask
源码注释
AsyncTask was intended to enable proper and easy use of the UI thread. However, the most common use case was for integrating into UI, and that would cause Context leaks, missed callbacks, or crashes on configuration changes. It also has inconsistent behavior on different versions of the platform, swallows exceptions from doInBackground, and does not provide much utility over using Executors directly. AsyncTask is designed to be a helper class around Thread and Handler and does not constitute a generic threading framework. AsyncTasks should ideally be used for short operations (a few seconds at the most.) If you need to keep threads running for long periods of time, it is highly recommended you use the various APIs provided by the java.util.concurrent package such as Executor, ThreadPoolExecutor and FutureTask. An asynchronous task is defined by a computation that runs on a background thread and whose result is published on the UI thread. An asynchronous task is defined by 3 generic types, called Params, Progress and Result, and 4 steps, called onPreExecute, doInBackground, onProgressUpdate and onPostExecute. Developer Guides For more information about using tasks and threads, read the Processes and Threads developer guide. Usage AsyncTask must be subclassed to be used. The subclass will override at least one method (doInBackground), and most often will override a second one (onPostExecute.) Here is an example of subclassing:
private class DownloadFilesTask extends AsyncTask<URL, Integer, Long> {
protected Long doInBackground(URL... urls) {
int count = urls.length;
long totalSize = 0;
for (int i = 0; i < count; i++) {
totalSize += Downloader.downloadFile(urls[i]);
publishProgress((int) ((i / (float) count) * 100));
// Escape early if cancel() is called
if (isCancelled()) break;
}
return totalSize;
}
protected void onProgressUpdate(Integer... progress) {
setProgressPercent(progress[0]);
}
protected void onPostExecute(Long result) {
showDialog("Downloaded " + result + " bytes");
}
}
Once created, a task is executed very simply:
new DownloadFilesTask().execute(url1, url2, url3);
AsyncTask’s generic types The three types used by an asynchronous task are the following: Params, the type of the parameters sent to the task upon execution. Progress, the type of the progress units published during the background computation. Result, the type of the result of the background computation. Not all types are always used by an asynchronous task. To mark a type as unused, simply use the type Void:
private class MyTask extends AsyncTask<Void, Void, Void> { ... }
The 4 steps When an asynchronous task is executed, the task goes through 4 steps: onPreExecute(), invoked on the UI thread before the task is executed. This step is normally used to setup the task, for instance by showing a progress bar in the user interface. doInBackground, invoked on the background thread immediately after onPreExecute() finishes executing. This step is used to perform background computation that can take a long time. The parameters of the asynchronous task are passed to this step. The result of the computation must be returned by this step and will be passed back to the last step. This step can also use publishProgress to publish one or more units of progress. These values are published on the UI thread, in the onProgressUpdate step. onProgressUpdate, invoked on the UI thread after a call to publishProgress. The timing of the execution is undefined. This method is used to display any form of progress in the user interface while the background computation is still executing. For instance, it can be used to animate a progress bar or show logs in a text field. onPostExecute, invoked on the UI thread after the background computation finishes. The result of the background computation is passed to this step as a parameter. Cancelling a task A task can be cancelled at any time by invoking cancel(boolean). Invoking this method will cause subsequent calls to isCancelled() to return true. After invoking this method, onCancelled(Object), instead of onPostExecute(Object) will be invoked after doInBackground(Object[]) returns. To ensure that a task is cancelled as quickly as possible, you should always check the return value of isCancelled() periodically from doInBackground(Object[]), if possible (inside a loop for instance.) Threading rules There are a few threading rules that must be followed for this class to work properly: The AsyncTask class must be loaded on the UI thread. This is done automatically as of Build.VERSION_CODES.JELLY_BEAN. The task instance must be created on the UI thread. execute must be invoked on the UI thread. Do not call onPreExecute(), onPostExecute, doInBackground, onProgressUpdate manually. The task can be executed only once (an exception will be thrown if a second execution is attempted.) Memory observability AsyncTask guarantees that all callback calls are synchronized to ensure the following without explicit synchronizations. The memory effects of onPreExecute, and anything else executed before the call to execute, including the construction of the AsyncTask object, are visible to doInBackground. The memory effects of doInBackground are visible to onPostExecute. Any memory effects of doInBackground preceding a call to publishProgress are visible to the corresponding onProgressUpdate call. (But doInBackground continues to run, and care needs to be taken that later updates in doInBackground do not interfere with an in-progress onProgressUpdate call.) Any memory effects preceding a call to cancel are visible after a call to isCancelled that returns true as a result, or during and after a resulting call to onCancelled. Order of execution When first introduced, AsyncTasks were executed serially on a single background thread. Starting with Build.VERSION_CODES.DONUT, this was changed to a pool of threads allowing multiple tasks to operate in parallel. Starting with Build.VERSION_CODES.HONEYCOMB, tasks are executed on a single thread to avoid common application errors caused by parallel execution. If you truly want parallel execution, you can invoke executeOnExecutor(Executor, Object[]) with THREAD_POOL_EXECUTOR. Deprecated Use the standard java.util.concurrent or Kotlin concurrency utilities instead.
源码分析
AsyncTask#execute:
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
executeOnExecutor
sDefaultExecutor 是 AsyncTask 内部的 SerialExecutor 类的一个实例,是一个串行的线程池。
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
如果队列中还有 task 就,继续 execute。SerialExecutor 用于 task 的排队,THREAD_POOL_EXECUTOR 用于 task 的真正执行。
@Deprecated
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), sThreadFactory);
threadPoolExecutor.setRejectedExecutionHandler(sRunOnSerialPolicy);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
核心线程数 1,最大线程 20,超时时间 3s,阻塞队列是 SynchronousQueue。
还有一个内部类 InternalHandler 用于将线程池切换到主线程。
先调用 onPreEexcute,再调用 exec.execute,将一个 FutureTask 传进去。
AsyncTask 的构造方法中有个 mWorker,mWorker 的 call 最终将在线程池中执行:
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
这里调用了 doInBackground 方法,得到 result,再传递给 postResult 方法:
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
通过 mHandler,获得了一个 MESSAGE_POST_RESULT 的 msg:
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
getMainHandler:
private static Handler getMainHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler(Looper.getMainLooper());
}
return sHandler;
}
}
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
sHandler 是一个静态对象,为了能将执行环境切换到主线程,就要在主线程创建。静态变量在类加载的时候进行初始化,因此要求 AsyncTask 要在主线程加载。
finish
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
没取消就调用 onPostExecute。
Message#sendToTarget
public void sendToTarget() {
target.sendMessage(this);
}
HandlerThread
该类继承自 Thread,内部创建了一个使用使用了 Handler 机制,start() 之后开启了 Looper 的死循环处理消息,Looper 可以退出,Message 消息在该线程中处理。可以利用其执行有序的任务!
IntentService
继承自 Service 拥有 Serivce 的特性,同时内部使用率 HandlerThread,当 onStart() 函数触发时,向 HandlerThread 中的 MessageQueue 中添加一条 Message ,自建的 Handler 处理完毕之后立即调用 stopSelf() 函数停止 Service 的运行。可以利用其在后台执行任务,执行完成之后不需要管理,自动停止。当然,记得和一般 Service 一样清单文件注册。
IntentService 中有一个继承了 Handler 的 ServiceHandler 和一个 HandlerThread。在 IntentaService 的 onCreate 方法中会创建一个名称为 IntentService[mName]
的 HandlerThread,再以这个 HandlerThread 的 Looper 为参数创建 ServiceHandler。startService 将 Intent 传递进来,会将 intent 作为 msg 的 obj,调用 mServiceHandler.sendMessage(msg);
交给 ServiceHandler 处理。在 ServiceHandler 的 handleMessage 方法中,会调用 IntentService 的 onHandleIntent 方法,处理完 stopSelf。
引申:Service 的生命周期。
一些问题
Handler 使用哪个 Looper 有什么区别?
Thread Runnbale Callable
为什么推荐使用 ThreadPoolExecutor 来创建线程?
规约一 :线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
规约二 :强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 返回线程池对象的弊端如下: FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能会堆积大量请求,从而导致 OOM。 CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM。
核心线程数设为 0,最大线程数设为 1000,提交 1000 个任务,会创建 1000 个线程吗?
LinkedBlockingQueue 不会。只会创建一个线程。
SynchronousQueue 会。
线程池参数根据什么设置?
简单的拟定判断
CPU 密集型任务(N+1):
这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N):
这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
线程超时后会怎样?
超时后将返回 null 的 Runnable,
ThreadPoolExecutor#getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
LinkedBlockingQueue#poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
ThreadPoolExecutor#runWorkder
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。
线程回收的工作是在processWorkerExit方法完成的:
参考
Java线程池实现原理及其在美团业务中的实践 - 美团技术团队
各类的源码