1 2 3 4
| public interface JobRun { void concreteJob(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| import java.util.concurrent.TimeUnit;
public class Job implements JobRun { @Override public void concreteJob() { System.out.println("执行某一个工作,耗时1s"); try { TimeUnit.SECONDS.sleep(1); } catch(InterruptedException e) { e.printStackTrace(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| public interface ThreadPool<Job extends JobRun> { void execute(Job job); void shutdown(); void addWorkers(int num); void removeWorker(int num); int getJobSize(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicLong;
public class DefaultThreadPool<Job extends JobRun> implements ThreadPool<Job> { private static final int MAX_WORKER_NUMBERS = 10; private static final int DEFAULT_WORKER_NUMBERS = 5; private static final int MIN_WORKER_NUMBERS = 2; private final LinkedList<Job> jobs = new LinkedList<Job>(); private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>()); private int workerNum = DEFAULT_WORKER_NUMBERS; private AtomicLong threadNum = new AtomicLong();
public DefaultThreadPool(){ initializeWorkers(DEFAULT_WORKER_NUMBERS); }
public DefaultThreadPool(int num){ if(num > MAX_WORKER_NUMBERS){ workerNum = MAX_WORKER_NUMBERS; } if(num<MIN_WORKER_NUMBERS){ workerNum = MIN_WORKER_NUMBERS; } initializeWorkers(workerNum); }
public void execute(Job job){ if(job != null){ synchronized (jobs){ jobs.addLast(job); jobs.notify(); } } }
public void shutdown(){ for(Worker worker : workers){ worker.shutdown(); } }
public void addWorkers(int num){ synchronized (jobs){ if(num + this.workerNum > MAX_WORKER_NUMBERS){ num = MAX_WORKER_NUMBERS - this.workerNum; } initializeWorkers(num); this.workerNum += num; } }
public void removeWorker(int num){ synchronized (jobs){ if(num >= this.workerNum){ throw new IllegalArgumentException("beyond workNum"); } int count = 0; while(count < num){ Worker worker = workers.get(0); if(workers.remove(worker)){ worker.shutdown(); ++count; } } this.workerNum -= count; } }
public int getJobSize(){ return jobs.size(); }
private void initializeWorkers(int num){ for(int i =0;i<num;++i){ Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker,"ThreadPool-worker-" + threadNum.incrementAndGet()); thread.start(); } }
class Worker implements Runnable{ private volatile boolean running = true; @Override public void run(){ while (running){ Job job = null; synchronized (jobs){ while(jobs.isEmpty()){ try{ jobs.wait(); }catch(InterruptedException ex){ Thread.currentThread().interrupt(); return; } } job = jobs.removeFirst(); } if(job != null){ try{ job.concreteJob(); }catch(Exception ex){ System.out.println(ex.getMessage()); } } } }
public void shutdown(){ running = false; } } }
|
1 2 3 4 5 6 7 8 9 10
| public class ApplicationDemo { public static void main(String[] args){ DefaultThreadPool<Job> defaultThreadPool = new DefaultThreadPool<Job>(); defaultThreadPool.removeWorker(3); for(int i = 0;i<20;++i){ Job job = new Job(); defaultThreadPool.execute(job); } } }
|
1 2 3 4 5 6 7 8 9 10 11
| List<String> list = Collections.synchronizedList(new ArrayList<String>()); list.add("1"); list.add("2"); list.add("3");
synchronized(list) { Iterator iter = list.iterator(); while(iter.hasNext()) { System.out.println(iter.next()); } }
|
Collections.synchronizedList会对add方法加锁。
遍历的时候,应避免数据被其他线程篡改,应加锁保护。
参考:Java并发编程的艺术
摘自/参考链接