• 线程池框架

  • 采用线程池好处

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
    • 提高线程的可管理性。
  • Excutor:执行者,java线程池框架最上层父接口。接口中只有一个execute方法,该方法的作用是向线程池提交任务并执行。

  • ExecutorService:该接口继承自Executor接口,添加了shutdown,shutdownAll,submit,invokeAll等一系列对线程的操作方法。

  • AbstractExecutorService:抽象类,实现ExecutorService接口。

  • ThreadPoolExecutor:java线程池最核心的一个类,继承自AbstractExecutorService,主要功能是创建线程池,给任务分配线程资源,执行任务。

  • ScheduledExecutorService和ScheduledThreadPoolExecutor:延迟执行和周期性执行的线程池。

  • Executors:静态工厂类,该类定义了一系列静态工厂方法,通过这些工厂方法可以返回各种不同的线程池。

  • SingleThreadExecutor:仅有一个线程,以顺序方式执行任务。如果此线程在执行任务时因异常挂掉,则会创建一个新线程来替代此线程,后续任务将在新线程中执行。

1
ExecutorService executorService = Executors.newSingleThreadExecutor();
  • FixedThreadPool(n):拥有固定数量线程的线程池。提交给Executor的任务由固定的n个线程执行,如果有更多的任务,它们会存储在LinkedBlockingQueue里。n通常与底层处理器支持的线程总数有关。
1
ExecutorService executorService = Executors.newFixedThreadPool(4);
  • CachedThreadPool:主要用于执行大量短期并行任务的场景。与固定线程池不同,此线程池线程数量不受限制。如果所有的线程都忙于执行任务且又有新的任务到来,这个线程池将创建一个新的线程并将其提交到Executor。只要其中一个线程变为空闲,他就会执行新的任务。如果一个线程有60s的时间都是空闲的,它们将被结束生命周期并从缓存中被删除。
1
ExecutorService executorService = Executors.newCachedThreadPool();
  • ScheduledExecutor:当我们有一个需要定期运行的任务,或者我们希望延迟某个任务时,就会使用此类型。

    • scheduleAtFixedRate:以固定的频率来执行某项任务。以上一个任务开始的时间计时,period时间过去后,检查上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后再立即执行。
    • scheduleWithFixedDelay:以上一个任务结束时开始计时,period时间过去后,立即执行。
    • 参数command为具体的执行任务,initialDelay为初始延迟时间(多久时间之后,第一次跑任务),unit为时间单位。

    摘自/参考链接

1
2
3
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
scheduledExecutorService.scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit);
  • Future对象

    交给Executor的任务是异步的,处理结果由Future对象来接收。

1
Future<String> result = executorService.submit(callableTask);

调用者可继续执行主程序,当需要提交任务结果时,调用者主程序可调用Future对象的.get()方法来获取。如果任务完成,结果将立即返回给调用者,否则调用者将被阻塞,直到Executor完成此操作的执行并计算出结果。

Future.get(long timeout, TimeUnit unit):如果在规定时间内没有返回结果,则抛出TimeoutException,调用者可做相应处理。

如果在执行任务时出现异常,则对get方法的调用将抛出ExecutionException异常。

另外,只有提交的任务实现了java.util.concurrent.Callable接口时才会返回Future。如果任务实现了Runnable接口,那么一旦任务成功,对.get()方法的调用将返回null.

Future.cancel(boolean mayInterruptIfRunning)方法用于取消提交任务的执行。如果任务已在执行,则Executor将尝试在mayInterruptIfRunning标志为true时,中断任务执行。

  • 简单实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.Callable;

public class Task implements Callable<String> {
private String message;

public Task(String message) {
this.message = message;
}

@Override
public String call() throws Exception {
return "Hello " + message + "!";
}
}


import java.util.concurrent.*;

public class ApplicationDemo {
public static void main(String[] args) {
Task task = new Task("World");

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> result = executorService.submit(task);

try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
System.out.println("Error occured while executing the submitted task");
e.printStackTrace();
}
executorService.shutdown();
}
}

//打印Hello World
  • 合理分配线程池大小:

    对于CPU密集型任务,线程数可为:NCPU + 1;

    对于IO密集型任务,线程数可为:2 * NCPU.

  • 除了使用Executors工厂类来生成线程池,我们也可以自己配置参数生成个性化的线程池。

1
ThreadPoolExecutor tpe = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, runnableTaskQueue, handler);

corePoolSize:核心线程池大小。如果调用了prestarAllCoreThread()方法,那么线程池会提前创建,并启动所有基本线程。

maximumPoolSize:线程池大小。

keepAliveTime:线程空闲后,线程存活时间。

timeUnit:存活时间单位。

runnableTaskQueue:阻塞队列,可使用ArrayBlockingQueue,LinkBlockingQueue,SynchronousQueue,PriorityBlockingQueue。其中,newFixedThreadPool使用LinkedBlockingQueue,newCachedThreadPool使用SynchronousQueue.

handler:拒绝策略,当线程池满了时使用。AbortPolicy:默认策略,直接抛出异常;CallerRunsPolicy:用调用者所在线程运行任务;DiscardOldestPolicy:丢弃队列中最老任务,执行当前任务;DiscardPolicy:不处理,直接把当前任务丢弃。

  • submit()用于提交需要返回值的对象;execute()用于提交无返回值的对象。

  • shutdown(),shutdownNow()方法关闭线程池。原理:遍历线程池中线程,逐个调用线程的interrupt()f方法来终端线程,所以不响应中断的线程可能无法终止。shutdown中断未执行任务的线程,已执行任务的线程继续执行至结束。shutdownNow尝试中断所有线程,并返回等待执行的任务列表。在调用了shutdown或者shutdownNow后,调用isShutDown()返回true;当所有任务都关闭后,调用isTerminaed()方法返回true.

  • FixedThreadPool、SingleThreadExecutor、CachedThreadPool是ThreadPoolExecutor的子类。

1
2
3
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

注:使用LinkedBlockingQueue无限双向队列,线程不拒绝任务。且keepAliveTime无效。

1
2
3
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

注:仅一个线程,其他同上。

1
2
3
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

注:使用SynchronousQueue<Runnable>(),不存储任务元素,实现一对一交付,即每次向线程池put一个任务,必须有线程来take这个任务,否则会一直阻塞该任务。属于无界线程池,可以一直不断创建线程。执行完任务后,空闲线程要么直接接下一个任务,要么保持keepAliveTime的时间之后销毁。

线程池类图

1
2
3
public static ExecutorService newSingleThreadExecuor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

摘自/参考链接1

摘自/参考链接2