文章目录
  1. 1. Java线程池架构原理及源码解析
    1. 1.1. 构建参数源码
    2. 1.2. 参数解释
    3. 1.3. 源码详细解析
      1. 1.3.1. excute源码
      2. 1.3.2. addIfUnderCorePoolSize源码
      3. 1.3.3. addThread源码
      4. 1.3.4. ThreadFactory接口默认实现DefaultThreadFactory
      5. 1.3.5. Worker的run方法
      6. 1.3.6. getTask源码
      7. 1.3.7. execute方法部分实现
      8. 1.3.8. ensureQueuedTaskHandled源码
      9. 1.3.9. reject源码
      10. 1.3.10. 再次回到execute方法
      11. 1.3.11. addIfUnderMaximumPoolSize源码
      12. 1.3.12. workerDone源码
      13. 1.3.13. runTask(task)源码
    4. 1.4. 添加任务处理流程
      1. 1.4.1. AbortPolicy()
      2. 1.4.2. CallerRunsPolicy()
      3. 1.4.3. DiscardOldestPolicy()
      4. 1.4.4. DiscardPolicy()
  2. 2. 违反Java高质量代码案例
    1. 2.1. 异步运算使用Callable接口
    2. 2.2. 优先选择线程池
    3. 2.3. 线程死锁
    4. 2.4. 忽略设置阻塞队列长度
    5. 2.5. 使用stop方法停止线程
    6. 2.6. 覆写start方法
    7. 2.7. 使用过多线程优先级
    8. 2.8. Lock与synchronized
    9. 2.9. 线程池异常处理
    10. 2.10. 使用SimpleThread类
    11. 2.11. 线程使用不当导致内存溢出
    12. 2.12. 工作队列

本文为Java高级编程中的一些知识总结,其中第一章对Jdk 1.7.0_25中的多线程架构中的线程池ThreadPoolExecutor源码进行架构原理介绍以及源码解析。第二章则分析了几个违反Java高质量代码案例以及相应解决办法。如有总结的不好的地方,欢迎大家提出宝贵的意见和建议。

Java线程池架构原理及源码解析

ThreadPoolExecutor是一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。

构建参数源码

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

{

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

参数解释

  • corePoolSize:核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量。

  • maximumPoolSize: 线程池维护线程的最大数量。

  • keepAliveTime: 线程池维护线程所允许的空闲时间,当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为
    true,则所有线程均会退出直到线程数量为0。
    unit: 线程池维护线程所允许的空闲时间的单位、可选参数值为:TimeUnit中的几个静态属性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

  • workQueue:线程池所使用的缓冲队列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。

  • handler: 线程池中的数量大于maximumPoolSize,对拒绝任务的处理策略,默认值ThreadPoolExecutor.AbortPolicy()。

源码详细解析

excute源码

public void execute(Runnable command)
{

if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
{
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}

一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个Runnable类型的对象,任务的执行方法就是run()方法,如果传入的为null,侧抛出NullPointerException。
首先第一个判定空操作就不用说了,下面判定的poolSize >= corePoolSize成立时候会进入if的区域,当然它不成立也有可能会进入,他会判定addIfUnderCorePoolSize是否返回false,如果返回false就会进去。
如果当前线程数小于corePoolSize,调用addIfUnderCorePoolSize方法,addIfUnderCorePoolSize方法首先调用mainLock加锁,再次判断当前线程数小于corePoolSize并且线程池处于RUNNING状态,则调用addThread增加线程。



图一:ThreadPoolExecutor运行状态图

addIfUnderCorePoolSize源码

private boolean addIfUnderCorePoolSize(Runnable firstTask)
{

Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
}
finally
{
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}

addThread方法首先创建Work对象,然后调用threadFactory创建新的线程,如果创建的线程不为null,将Work对象的 thread属性设置为此创建出来的线程,并将此Work对象放入workers中,然后在增加当前线程池的中线程数,增加后回到 addIfUnderCorePoolSize方法 ,释放mainLock,最后启动这个新创建的线程来执行新传入的任务。
可以发现,这段源码是如果发现小于corePoolSize就会创建一个新的线程,并且调用线程的start()方法将线程运行起来:这个addThread()方法,我们先不考虑细节,因为我们还要先看到前面是怎么进去的,这里可以发信啊,只有没有创建成功Thread才会返回false,也就是当当前的poolSize > corePoolSize的时候,或线程池已经不是在running状态的时候才会出现。
注意:这里在外部判定一次poolSize和corePoolSize只是初步判定,内部是加锁后判定的,以得到更为准确的结果,而外部初步判定如果是大于了,就没有必要进入这段有锁的代码了。

addThread源码

private Thread addThread(Runnable firstTask)
{
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
< span style = "color:#ff0000;" > < / span >
if (t != null)
{
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}

ThreadFactory接口默认实现DefaultThreadFactory

public Thread newThread(Runnable r)
{
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}

这里创建了一个Work,其余的操作,就是讲poolSize叠加,然后将将其放入workers的运行队列等操作;
我们主要关心Worker是干什么的,因为这个threadFactory对我们用途不大,只是做了Thread的命名处理;而Worker你会发现它的定义也是一个Runnable,外部开始在代码段中发现了调用哪个这个Worker的start()方法,也就是线程的启动方法,其实也就是调用了Worker的run()方法,那么我们重点要关心run方法是如何处理的。

Worker的run方法

public void run()
{
try
{
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null)
{
runTask(task);
task = null;
}
}
finally
{
workerDone(this);
}
}

从以上方法可以看出,Worker所在的线程启动后,首先执行创建其时传入的Runnable任务,执行完成后,循环调用getTask来获取新的任务,在没有任务的情况下,退出此线程。FirstTask其实就是开始在创建work的时候,由外部传入的Runnable对象,也就是你自己的Thread,你会发现它如果发现task为空,就会调用getTask()方法再判定,直到两者为空,并且是一个while循环体。

getTask源码

Runnable getTask()
{

for (;;)
{
try
{
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit())
{
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
}
catch (InterruptedException ie)
{
// On interruption, re-check runState
}
}
}

你会发现它是从workQueue队列中,也就是等待队列中获取一个元素出来并返回!当前线程运行完后,在到workQueue中去获取一个task出来,继续运行,这样就保证了线程池中有一定的线程一直在运行;此时若跳出了while循 环,只有workQueue队列为空才会出现或出现了类似于shutdown的操作,自然运行队列会减少1,当再有新的线程进来的时候,就又开始向 worker里面放数据了,这样以此类推,实现了线程池的功能。

execute方法部分实现

if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
如果当前线程池数量大于corePoolSize或addIfUnderCorePoolSize方法执行失败,则执行后续操作;如果线程池处于运行状态 并且workQueue中成功加入任务,再次判断如果线程池的状态不为运行状态或当前线程池数为0,则调用 ensureQueuedTaskHandled方法。

ensureQueuedTaskHandled源码

private void ensureQueuedTaskHandled(Runnable command)
{

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try
{
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
}
finally
{
mainLock.unlock();
}
if (reject)
reject(command);
else if (t != null)
t.start();
}

第一个if,也就是当当前状态为running的时候,就会去执行workQueue.offer(command),这个workQueue其实就是一 个BlockingQueue,offer()操作就是在队列的尾部写入一个对象,此时写入的对象为线程的对象而已;所以你可以认为只有线程池在 RUNNING状态,才会在队列尾部插入数据,否则就执行else if,其实else if可以看出是要做一个是否大于MaximumPoolSize的判定,如果大于这个值,就会做reject的操作。ensureQueuedTaskHandled方法判断线程池运行,如果状态不为运行状态,从workQueue中删除,并调用reject做拒绝处理。

reject源码

void reject(Runnable command)
{

handler.rejectedExecution(command, this);
}

再次回到execute方法

if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated

如线程池workQueue offer失败或不处于运行状态,调用addIfUnderMaximumPoolSize, addIfUnderMaximumPoolSize方法基本和addIfUnderCorePoolSize实现类似,不同点在于根据最大线程数(maximumPoolSize)进行比较,如果超过最大线程数,返回false,调用reject方法。

addIfUnderMaximumPoolSize源码

private boolean addIfUnderMaximumPoolSize(Runnable firstTask)
{

Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
}
finally
{
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}

也就是如果线程池满了,而且线程池调用了shutdown后,还在调用execute方法时,就会抛出上面说明的异常:RejectedExecutionException。

workerDone源码

void workerDone(Worker w)
{

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
completedTaskCount += w.completedTasks;
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
}
finally
{
mainLock.unlock();
}
}

注意这里将workers.remove(w)掉,并且调用了—poolSize来做操作。至于tryTerminate是做了更多关于回收方面的操作。

runTask(task)源码

private void runTask(Runnable task)
{
final ReentrantLock runLock = this.runLock;
runLock.lock();
try
{
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
boolean ran = false;
beforeExecute(thread, task);
try
{
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
}
catch (RuntimeException ex)
{
if (!ran)
afterExecute(task, ex);
throw ex;
}
}
finally
{
runLock.unlock();
}
}

你可以看到,这里面的task为传入的task信息,调用的不是start方法,而是run方法,因为run方法直接调用不会启动新的线程,也是因为这样,导致了你无法获取到你自己的线程的状态,因为线程池是直接调用的run方法,而不是start方法来运行。
这里有个beforeExecute和afterExecute方法,分别代表在执行前和执行后,你可以做一段操作,在这个类中,这两个方法都是空的,因为普通线程池无需做更多的操作。
如果你要实现类似暂停等待通知的或其他的操作,可以自己extends后进行重写构造。

添加任务处理流程

AbortPolicy()

public static class AbortPolicy implements RejectedExecutionHandler
{

/**
* Creates an {@code AbortPolicy}.
*/

public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{

throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/*当线程池中的数量等于最大线程数时,直接抛出抛出java.util.concurrent.RejectedExecutionException异常。*/

CallerRunsPolicy()

public static class CallerRunsPolicy implements RejectedExecutionHandler
{

/**
* Creates a {@code CallerRunsPolicy}.
*/

public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{

if (!e.isShutdown())
{
r.run();
}
}
}

当线程池中的数量等于最大线程数时、重试执行当前的任务,交由调用者线程来执行任务。

DiscardOldestPolicy()

public static class DiscardOldestPolicy implements RejectedExecutionHandler
{

/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/

public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{

if (!e.isShutdown())
{
e.getQueue().poll();
e.execute(r);
}
}
}

当线程池中的数量等于最大线程数时、抛弃线程池中最后一个要执行的任务,并执行新传入的任务。

DiscardPolicy()

public static class DiscardPolicy implements RejectedExecutionHandler
{

/**
* Creates a {@code DiscardPolicy}.
*/

public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/

public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
{

}
}

当线程池中的数量等于最大线程数时,不做任何动作。
通常你得到线程池后,会调用其中的:submit方法或execute方法去操作;其实你会发现,submit方法最终会调用execute方法来进行操 作,只是他提供了一个Future来托管返回值的处理而已,当你调用需要有返回值的信息时,你用它来处理是比较好的;这个Future会包装对 Callable信息,并定义一个Sync对象,当你发生读取返回值的操作的时候,会通过Sync对象进入锁,直到有返回值的数据通知。

违反Java高质量代码案例

异步运算使用Callable接口

Callable接口代码如下:

public interface Callable<V>{
v call() throws Exception;
}

实现Callable接口,只是表明它是一个可调用的任务,并不表示它具有多线程运算的能力,还是要执行器来执行。代码如下:

class TaxCalculator implements Callable<Integer>{

private int seedMoney;
public TaxCalculator(int _seedMoney){
seedMoney=_seedMoney;
}
@Override
public Integer call() throws Exception {
TimeUnit.MILLISECONDS.sleep(10000);
return seedMoney/10;
}

}

这里模拟税款计算器运算,可能花费10秒钟时间。用户输入即有输出,若耗时较长,则显示运算进度。如果我们直接计算,就只有一个main线程,是不可能友好提示的,如果税金不计算完毕,也不会执行后续动作,所以最好的办法就是重启一个线程来运算,让main线程做进度提示

public static void main(String[] args) throws Exception{
ExecutorService es=Executors.newSingleThreadExecutor();
Future<Integer> future=es.submit(new TaxCalculator(100));
while(!future.isDone()){
TimeUnit.MILLISECONDS.sleep(200);
System.out.println("#");
}
System.out.println("\n 计算完成,税金是:"+future.get()+"元");
es.shutdown();

}

Executors是一个静态工具类,提供了异步执行器的创建能力,如单线程执行newSingleThreadExcutor、固定线程数量的执行器newFixedThreadPool等,一般是异步计算的入口类。

优先选择线程池

线程的状态只能由新建状态转变为运行态后才可能被阻塞或等待,最后终结,不可能产生本末倒置的情况,代码如下:

public static void main(String[] args) throws Exception{

Thread t=new Thread(new Runnable() {

@Override
public void run() {
System.out.println("线程在运行");

}
});
t.start();
while(!t.getState().equals(Thread.State.TERMINATED)){
TimeUnit.MILLISECONDS.sleep(10);
}
t.start();
}

此时程序运行会报IllegalThreadStateException异常,原因就是不能从结束状态直接转换为可运行状态。这时可以引入线程池,当系统需要时直接从线程池中获得线程,运算出结果,再把线程返回到线程池中,代码如下:

public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);
for (int i = 0; i < 4; i++) {
es.submit(new Runnable() {

@Override
public void run() {

System.out.println(Thread.currentThread().getName());

}

});
}
es.shutdown();
}

线程死锁

Java是单线程语言,一旦线程死锁,只能借助外部进程重启应用才能解决。

static class A {
public synchronized void a1(B b) {
String name = Thread.currentThread().getName();
System.out.println(name + "进入A.a1()");
try {
Thread.sleep(1000);
} catch (Exception e) {
// TODO: handle exception
}
System.out.println(name + "试图访问B.b2()");
b.b2();
}

public synchronized void a2() {
System.out.println("进入 a.a2()");
}
}

static class B {
public synchronized void b1(A a) {
String name = Thread.currentThread().getName();
System.out.println(name + "进入B.b1()");
try {
Thread.sleep(1000);
} catch (Exception e) {
// TODO: handle exception
}
System.out.println(name + "试图访问A.a2()");
a.a2();
}

public synchronized void b2() {
System.out.println("进入 B.b2()");
}
}

public static void main(String[] args) {
final A a = new A();
final B b = new B();
new Thread(new Runnable() {

@Override
public void run() {
a.a1(b);
}
}, "线程A").start();
;
new Thread(new Runnable() {

@Override
public void run() {
b.b1(a);
}
}, "线程B").start();
;
}

此段程序定义了两个资源A和B,然后在两个线程A、B中使用了该资源,由于两个资源之间有交互操作,并且都是同步方法,因此在线程A休眠1秒钟后,它会试图访问资源B的b2方法,但是线程B持有该类的锁,并同时在等待A线程释放其锁资源,所以此时就出现了两个线程在互相等待释放资源的情况,也就是死锁。可以使用自旋锁改进,代码如下:

public  void b2()
{

try
{
if(Lock.trylock(2, TimeUnit.SECONDS))
{
System.out.println("进入 B.b2()");
}
}
catch (InterruptedException e)
{
// TODO: handle exception
}
finally
{
Lock.unlock();
}


}

它原理和互斥锁一样,如果一个执行单元要想访问被自旋锁保护的共享资源,则必须先得到锁,在访问完共享资源后,也必须释放锁。

忽略设置阻塞队列长度

BlockingQueue是一种集合,实现了Collection接口,容量是不可以自行管理的,代码如下:

public static void main(String[] args) throws Exception {
BlockingDeque<String> bq = (BlockingDeque<String>) new ArrayBlockingQueue<String>(
5);
for (int i = 0; i < 10; i++) {
bq.add("");
}
}

阻塞队列容量是固定的,非阻塞队列则是变长的。阻塞队列可以在声明是指定队列的容量,若指定的容量,则元素的数量不可超过该容量,若不指定,队列的容量为Integer的最大值

public  class ArrayBlockingQueue<E> extends AbstractQueue<E> implements
BlockingDeque<E>, java.io.Serializable
{

public final E[] items;
private int count;

public boolean add(E e)
{
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

public boolean offer(E e)
{
final ReentrantLock lock = this.lock;
lock.lock();
try
{
if (count == items.length)
;
else
{
insert(e);
return true;
}
}
finally
{
lock.unlock();
}
}

}

上面在加入元素时,如果判断当前队列已满,则返回false,表示插入失败,之后再包装成队列满异常。

使用stop方法停止线程

stop方法会破坏原子逻辑,代码如下:

class MutiThread implements Runnable {
int a = 0;

@Override
public void run() {
// TODO Auto-generated method stub
synchronized ("") {
a++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
a--;
String tn = Thread.currentThread().getName();
System.out.println(tn + ":a=" + a);

}
}

public static void main(String[] args) {
MutiThread t = new MutiThread();
Thread t1 = new Thread(t);
t1.start();
for (int i = 0; i < 5; i++) {
new Thread(t).start();
}
t1.stop();
}
}

所有线程共享了一个MutilThread的实例变量t,由于在run方法中加入了同步代码块,所以只能有一个线程进入到synchronized块中,可以自定义标志位来决定线程执行情况,代码如下:

class SafeStopThread extends Thread{
private volatile boolean stop=false;
@Override
public void run()
{//判断线程体是否运行
while(stop)
{}
}
//线程终止
public void terminate(){
stop=true;
}
}

在线程体中判断是否需要停止运行,即可保证线程体的逻辑完整性,而且也不会破坏原子逻辑。

覆写start方法

代码:

class MutiThread implements Thread
{



@Override
public void start()
{

//调用线程体
run();
}
}
@Override
public void run()
{


}
}
public static void main(String[] args)
{

MutiThread t = new MutiThread();
t.start();
}

}

main方法根本就没有启动一个子线程,整个应用程序中只有一个主线程在运行,并不会创建其他的线程。改进后代码如下:

class MutiThread implements Thread
{



@Override
public void start()
{

/*线程启动前的业务处理*/
super.start();
/*线程启动后的业务处理*/
}
}
@Override
public void run()
{


}
}

start方法调用父类的start方法,没有主动调用run方法,由JVM自行调用,不用我们的显式实现。

使用过多线程优先级

Java线程有10个基本,级别为0代表JVM
代码如下:

class MutiThread implements Runnable {
public void start(int _priority) {
Thread t = new Thread(this);
t.setPriority(_priority);
t.start();
}

@Override
public void run() {

for (int i = 0; i < 10000; i++) {
Math.hypot(Math.pow(924526789, i), Math.cos(i));
}
System.out.println("Priority:"+Thread.currentThread().getPriority());
}
public static void main(String[] args) {
for(int i=0;i<20;i++)
{
new MutiThread().start(i%10+1);
}
}
}

Java优先级只是代表抢占CPU机会大小,优先级越高,抢占CPU机会越大,被优先执行的可能性越高,优先级相差不大,则抢占CPU机会差别也不大。导致优先级为9的线程比优先级为10的线程先运行。于是在Thread类中设置三个优先级,建议使用优先级常量,而不是1到10的随机数字,代码如下:

  public final static int MIN_PRIORITY = 1;

/**
* The default priority that is assigned to a thread.
*/

public final static int NORM_PRIORITY = 5;

/**
* The maximum priority that a thread can have.
*/

public final static int MAX_PRIORITY = 10;

/**
* Returns a reference to the currently executing thread object.
*
* @return the currently executing thread.
*/

}

Lock与synchronized

Lock为显式锁,synchronized为内部锁,代码如下:

class  Task
{

public void dosomething(){
try {
Thread.sleep(2000);
} catch (Exception e) {
// TODO: handle exception
}
StringBuffer sb=new StringBuffer();
sb.append("线程名:"+Thread.currentThread().getName());
sb.append(",线程时间:"+Calendar.getInstance().get(13)+"s");
System.out.println(sb);
}
}
//显示锁任务
class TaskWithLock extends Task implements Runnable{
private final Lock lock=new ReentrantLock();
@Override
public void run() {
try {
lock.lock();
dosomething();
} finally
{
lock.unlock();
}

}};
//內部锁任务
class TaskWithSync extends Task implements Runnable{

@Override
public void run() {

synchronized ("A") {
dosomething();

}


}};

对于同步资源来说,显式锁时对象级别的锁,而内部锁时类级别的锁,也就是说lock锁时跟随对象的,synchronized锁时跟随类
改进方法:把Lock定义为所有线程的共享变量。

public static void main(String[] args) {
//多个线程共享锁
final Lock lock=new ReentrantLock();
……
}

线程池异常处理

Java中线程执行的任务接口java.lang.Runnable 要求不抛出Checked异常,

public interface Runnable {   

public abstract void run();
}

那么如果 run() 方法中抛出了RuntimeException,将会怎么处理了?
通常java.lang.Thread对象运行设置一个默认的异常处理方法:

java.lang.Thread.setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)

而这个默认的静态全局的异常捕获方法时输出堆栈。当然,我们可以覆盖此默认实现,只需要一个自定义的java.lang.Thread.UncaughtExceptionHandler接口实现即可。

public interface UncaughtExceptionHandler {   

void uncaughtException(Thread t, Throwable e);
}

而在线程池中却比较特殊。默认情况下,线程池 java.util.concurrent.ThreadPoolExecutor 会Catch住所有异常, 当任务执行完成(java.util.concurrent.ExecutorService.submit(Callable))获取其结果 时(java.util.concurrent.Future.get())会抛出此RuntimeException。

/**   
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
*/
V get() throws InterruptedException, ExecutionException;

其中 ExecutionException 异常即是java.lang.Runnable 或者 java.util.concurrent.Callable 抛出的异常。

也就是说,线程池在执行任务时捕获了所有异常,并将此异常加入结果中。这样一来线程池中的所有线程都将无法捕获到抛出的异常。 从而无法通过设置线程的默认捕获方法拦截的错误异常。也不同通过 自定义线程来完成异常的拦截。好在java.util.concurrent.ThreadPoolExecutor 预留了一个方法,运行在任务执行完毕进行扩展(当然也预留一个protected方法beforeExecute(Thread t, Runnable r)):

protected void afterExecute(Runnable r, Throwable t) { }

此方法的默认实现为空,这样我们就可以通过继承或者覆盖ThreadPoolExecutor 来达到自定义的错误处理。

解决办法如下:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(11, 100, 1, TimeUnit.MINUTES, //   
new ArrayBlockingQueue<Runnable>(10000),//
new DefaultThreadFactory()) {

protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
printException(r, t);
}
};

private static void printException(Runnable r, Throwable t) {
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone())
future.get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
log.error(t.getMessage(), t);
}

使用SimpleThread类

TestThreadPool类是一个测试程序,用来模拟客户端的请求,当你运行它时,系统首先会显示线程池的初始化信息,然后提示你从键盘上输入字符串,并按下回车键,这时你会发现屏幕上显示信息,告诉你某个线程正在处理你的请求,如果你快速地输入一行行字符串,那么你会发现线程池中不断有线程被唤醒,来处理你的请求,在本例中,我创建了一个拥有10个线程的线程池,如果线程池中没有可用线程了,系统会提示你相应的警告信息,但如果你稍等片刻,那你会发现屏幕上会陆陆续续提示有线程进入了睡眠状态,这时你又可以发送新的请求了。
代码如下:

//TestThreadPool.java

import java.io.*;
public class TestThreadPool
{

public static void main(String[] args)
{
try
{
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String s;
ThreadPoolManager manager = new ThreadPoolManager(10);
while((s = br.readLine()) != null)
{
manager.process(s);
}
}
catch(IOException e) {}
}
}

ThreadPoolManager类,顾名思义,它是一个用于管理线程池的类,它的主要职责是初始化线程池,并为客户端的请求分配不同的线程来进行处理,如果线程池满了,它会对你发出警告信息。
代码如下:

import java.util.*;
  
  class ThreadPoolManager
 
{
 
   private int maxThread;
   public Vector vector;
   public void setMaxThread(int threadCount)
  
{

   maxThread = threadCount;
  
}
  
   public ThreadPoolManager(int threadCount)
  
{

   setMaxThread(threadCount);
  System.out.println("Starting thread pool...");
   vector = new Vector();
   for(int i = 1; i <= 10; i++)
   {
   SimpleThread thread = new SimpleThread(i);
   vector.addElement(thread);
   thread.start();
  
}
  
}
  
   public void process(String argument)
  
{

   int i;
   for(i = 0; i < vector.size(); i++)
  {
  SimpleThread currentThread = (SimpleThread)vector.elementAt(i);
   if(!currentThread.isRunning())
   {
  System.out.println("Thread " + (i + 1) + " is processing:" +
  argument);
 currentThread.setArgument(argument);
   currentThread.setRunning(true);
   return;
 
}
  
}
  if(i == vector.size())
   {
   System.out.println("pool is full, try in another time.");
  
}
  
}
  
}//end of class ThreadPoolManager

我们先关注一下这个类的构造函数,然后再看它的process()方法。第16-24行是它的构造函数,首先它给ThreadPoolManager类的成员变量maxThread赋值,maxThread表示用于控制线程池中最大线程的数量。第18行初始化一个数组vector,它用来存放所有的SimpleThread类,这时候就充分体现了JAVA语言的优越性与艺术性:如果你用C语言的话,至少要写100行以上的代码来完成vector的功能,而且C语言数组只能容纳类型统一的基本数据类型,无法容纳对象。好了,闲话少说,第19-24行的循环完成这样一个功能:先创建一个新的SimpleThread类,然后将它放入vector中去,最后用thread.start()来启动这个线程,为什么要用start()方法来启动线程呢?因为这是JAVA语言中所规定的,如果你不用的话,那这些线程将永远得不到激活,从而导致本示例程序根本无法运行。

process()方法,第30-40行的循环依次从vector数组中选取SimpleThread线程,并检查它是否处于激活状态(所谓激活状态是指此线程是否正在处理客户端的请求),如果处于激活状态的话,那继续查找vector数组的下一项,如果vector数组中所有的线程都处于激活状态的话,那它会打印出一条信息,提示用户稍候再试。相反如果找到了一个睡眠线程的话,那第35-38行会对此进行处理,它先告诉客户端是哪一个线程来处理这个请求,然后将客户端的请求,即字符串argument转发给SimpleThread类的setArgument()方法进行处理,并调用SimpleThread类的setRunning()方法来唤醒当前线程,来对客户端请求进行处理。

解决办法是引入SimpleThread类,它是Thread类的一个子类,它才真正对客户端的请求进行处理,SimpleThread在示例程序初始化时都处于睡眠状态,但如果它接受到了ThreadPoolManager类发过来的调度信息,则会将自己唤醒,并对请求进行处理。
代码如下:

class SimpleThread extends Thread
 
{

   private boolean runningFlag;
   private String argument;
   public boolean isRunning()
  
{

   return runningFlag;
  
}
  public synchronized void setRunning(boolean flag)
  
{

   runningFlag = flag;
   if(flag)
   this.notify();
  
}
  
   public String getArgument()
  
{

   return this.argument;
  
}
   public void setArgument(String string)
  
{

   argument = string;
  
}
  
   public SimpleThread(int threadNumber)
  
{

   runningFlag = false;
   System.out.println("thread " + threadNumber + "started.");
  
}
  
   public synchronized void run()
  
{

   try{
   while(true)
   {
   if(!runningFlag)
   {
   this.wait();
  
}
   else
   {
   System.out.println("processing " + getArgument() + "... done.");
   sleep(5000);
   System.out.println("Thread is sleeping...");
   setRunning(false);
  
}
  
}
  
}
catch(InterruptedException e)
{
   System.out.println("Interrupt");
  
}
  
}//end of run()

  
}//end of class SimpleThread

线程使用不当导致内存溢出

代码如下:

class IndexCallable implements Callable
{

private List<?> t;
@override
public object call()
{

……
}
}

程序是这样的,有一个线程会往List中插入对象,线程池中的多个线程丛List中取数据,然后进行处理,处理完以后把对象从List中删除。outofmemory有几种可能:

1.线程池中的处理线程在处理完以后没有从List中删掉元素

2.向List中插入元素的速度高于从List中删除元素的速度,造成List中积累的元素数量不断攀升,可以随时打印一下List中的元素数量,看是否是一支攀升。

3.ArrayList和LinkedList都不是线程安全的,把List换成Vector或者保证List变量通过Synchronized同步访问。

4.在程序的其他地方还持有List中的对象句柄,虽然从List中删掉了,如果别的地方还保存着该对象的句柄,那么也不会被垃圾回收。

5.JVM的应用程序最大可用内存参数(-Xmx)配置过低

如:

JAVA_OPTS="-server -Xms800m -Xmx800m -XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m -Djava.awt.headless=true "

工作队列

是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表,下边的代码显示了简单的合用工作队列的示例。尽管 Thread API 没有对使用 Runnable 接口强加特殊要求,但使用 Runnable 对象队列的这种模式是调度程序和工作队列的公共约定。

public class WorkQueue
{

private final int nThreads;

private final PoolWorker[] threads;

private final LinkedList queue;

public WorkQueue(int nThreads)
{


this.nThreads = nThreads;

queue = new LinkedList();

threads = new PoolWorker[nThreads];

for (int i = 0; i

threads[i] = new PoolWorker();

threads[i].start();

}

}

public void execute(Runnable r)
{


synchronized(queue)
{

queue.addLast(r);

queue.notify();

}

}

private class PoolWorker extends Thread
{

public void run()
{


Runnable r;

while (true)
{

synchronized(queue)
{

while (queue.isEmpty())
{

try

{

queue.wait();

}

catch (InterruptedException ignored)

{

}

}

r = (Runnable) queue.removeFirst();

}

// If we don't catch RuntimeException,

// the pool could leak threads

try
{

r.run();

}

catch (RuntimeException e)
{

// You might want to log something here

}

}

}

}

}

实现使用的是 notify() 而不是 notifyAll() 。大多数专家建议使用 notifyAll() 而不是 notify() ,而且理由很充分:使用 notify() 具有难以捉摸的风险,只有在某些特定条件下使用该方法才是合适的。另一方面,如果使用得当, notify() 具有比 notifyAll() 更可取的性能特征;特别是,notify() 引起的环境切换要少得多,这一点在服务器应用程序中是很重要的。

文章目录
  1. 1. Java线程池架构原理及源码解析
    1. 1.1. 构建参数源码
    2. 1.2. 参数解释
    3. 1.3. 源码详细解析
      1. 1.3.1. excute源码
      2. 1.3.2. addIfUnderCorePoolSize源码
      3. 1.3.3. addThread源码
      4. 1.3.4. ThreadFactory接口默认实现DefaultThreadFactory
      5. 1.3.5. Worker的run方法
      6. 1.3.6. getTask源码
      7. 1.3.7. execute方法部分实现
      8. 1.3.8. ensureQueuedTaskHandled源码
      9. 1.3.9. reject源码
      10. 1.3.10. 再次回到execute方法
      11. 1.3.11. addIfUnderMaximumPoolSize源码
      12. 1.3.12. workerDone源码
      13. 1.3.13. runTask(task)源码
    4. 1.4. 添加任务处理流程
      1. 1.4.1. AbortPolicy()
      2. 1.4.2. CallerRunsPolicy()
      3. 1.4.3. DiscardOldestPolicy()
      4. 1.4.4. DiscardPolicy()
  2. 2. 违反Java高质量代码案例
    1. 2.1. 异步运算使用Callable接口
    2. 2.2. 优先选择线程池
    3. 2.3. 线程死锁
    4. 2.4. 忽略设置阻塞队列长度
    5. 2.5. 使用stop方法停止线程
    6. 2.6. 覆写start方法
    7. 2.7. 使用过多线程优先级
    8. 2.8. Lock与synchronized
    9. 2.9. 线程池异常处理
    10. 2.10. 使用SimpleThread类
    11. 2.11. 线程使用不当导致内存溢出
    12. 2.12. 工作队列