×

多线程任务调度工具类

发表于3年前(Dec 24, 2014 2:22:19 PM)  阅读 990  评论 0

分类: Java 类库工具 MultiThreadJob

标签: 多线程任务调度

1、工具类介绍:

首先,笔者不知道是不是已经有很多这种开源的成熟的多线程任务调度的工具,笔者也是最近才开始实践多线程编程,这个工具类也算这段学习的一点心得吧。再来说下这个工具类的作用,举个例子,网站抓图,抓图的代码很多,也很简单,网上一搜一大把,但是那些都是单线程的,使用多线程来抓图是不是会更快呢,这个当然。我们把每张图片当成一个job,每个job都是执行同一个操作,下载图片,不同的只是图片地址以及保存的文件名。在这里我们已经抽象出来了一个job,笔者最初的设想就是,我只要实现这个job,然后创建一堆job往一个任务线程池(暂时我们这么称呼它)里放,然后这些job就自动被调度执行了,听起来还不错吧。

2、工具类实现:

下面通过一个实例来简单介绍一下设计思想,这个例子是一个算术操作,求1到n的平方和。这里我们不讨论用平方和公式去计算该值(n*(n+1)*(2n+1)/6),简单的一般我们做一个for循环,将每个自然数的平方加起来就完了。在这里,我们通过多线程来实现。

工程项目主要分为了executor,thread,event,listener几个包还有还有multithreadjob总包,这就包括了这工具里面的所有概念。

2.1、Job

首先multithreadjob包里包含了Job抽象类,里面定义了任务执行的一些参数,其中重要的有maxRetry,定义了任务最大尝试执行次数,默认为1,如果设置为2,在第一次失败后,该任务会放到队列末尾,会再执行一次。还一个参数resultMap,用来保存任务执行的结果。

package com.cangzhitao.multithreadjob;

import java.util.HashMap;
import java.util.Map;

/**
 * 任务
 * @author Administrator
 *
 */
public abstract class Job {

        /**
         * 等待执行状态
         */
        public final static int JOB_READY = 0;

        /**
         * 执行成功状态
         */
        public final static int JOB_SUCESS = 1;

        /**
         * 执行失败状态
         */
        public final static int JOB_FAIL = 2;

        /**
         * 失败重试状态
         */
        public final static int JOB_RETRY =3;

        /**
         * 最大尝试执行次数,默认为1,失败后会再尝试执行maxRetry-1次,此次失败,该任务会移到队列尾部
         */
        protected int maxRetry = 1;

        /**
         * 任务名称
         */
        protected String jobName;

        /**
         * job状态
         */
        protected int status = JOB_READY;

        /**
         * job已执行次数
         */
        protected int count = 0;

        /**
         * 结果map集合
         */
        protected Map<String, Object> resultMap = new HashMap<String, Object>();

        public Job() {

        }

        public Job(String jobName, int maxRetry) {
                super();
                this.maxRetry = maxRetry;
                this.jobName = jobName;
        }

        public Job(int maxRetry) {
                super();
                this.maxRetry = maxRetry;
        }

        public Job(String jobName) {
                super();
                this.jobName = jobName;
        }

        public int getStatus() {
                return status;
        }

        public void setStatus(int status) {
                this.status = status;
        }

        public int getCount() {
                return count;
        }

        public void setCount(int count) {
                this.count = count;
        }

        /**
         * 真正执行任务的方法
         * @return
         */
        public abstract boolean doJob();

        /**
         * 执行任务,设置任务执行状态
         * @return
         */
        public int excute() {
                boolean sucess = doJob();
                count++;
                if(sucess) {
                        this.setStatus(JOB_SUCESS);
                } else {
                        if(this.getCount()<maxRetry) {
                                this.setStatus(JOB_RETRY);
                        } else {
                                this.setStatus(JOB_FAIL);
                        }
                }
                return status;
        }

        public String toString() {
                return "任务名:"+getJobName()+",任务状态:"+getStatusMark()+",执行次数:"+getCount();
        }

        public String getStatusMark() {
                if(this.status==JOB_FAIL) {
                        return "失败";
                } else if(this.status==JOB_SUCESS) {
                        return  "成功";
                } else if(this.status==JOB_READY) {
                        return "准备";
                } else if(this.status==JOB_RETRY) {
                        return "重试";
                }
                return "未知";
        }

        public String getJobName() {
                return jobName;
        }

        public void setJobName(String jobName) {
                this.jobName = jobName;
        }

        public int getMaxRetry() {
                return maxRetry;
        }

        public void setMaxRetry(int maxRetry) {
                this.maxRetry = maxRetry;
        }

        public Map<String, Object> getResultMap() {
                return resultMap;
        }

        public void mergeResult(Map<String, Object> mergeResultMap, Map<String, Object> needMergeResultMap) {

        }

}

在使用时,我们需要写一个java类来继承Job类,其中doJob方法是必须实现的,doJob方法即你job真正要做的事,例如计算平方。其中如果结果需要合并,你就要覆盖job中的mergeResult方法,mergeResult是将该线程任务执行的结果合并成一个resultMap,在这里其实是借鉴了Hadoop里面的Map-Reduce思想。Map过程就相当于把job分配到多个线程去执行,不同的是,每个job执行完都会跟执行该job的线程merge一次,线程执行完所有任务最后和Executor的resultMap进行merge,最后所有job的执行结果全部merge到excutor里面了。

TestJob示例

package staticjob;

import java.math.BigDecimal;
import java.util.Map;

import com.cangzhitao.multithreadjob.Job;

public class TestJob extends Job {

        private long i;

        public TestJob() {

        }

        public TestJob(long i) {
                this.i = i;
        }

        @Override
        public boolean doJob() {
                getResultMap().put("result", new BigDecimal(i).multiply(new BigDecimal(i)));
                return true;
        }

        @Override
        public void mergeResult(Map<String, Object> mergeResultMap,
                        Map<String, Object> needMergeResultMap) {
                if(mergeResultMap.get("result")!=null) {
                        BigDecimal n = (BigDecimal) mergeResultMap.get("result");
                        if(needMergeResultMap.get("result")!=null) {
                                BigDecimal m = (BigDecimal) needMergeResultMap.get("result");
                                mergeResultMap.put("result", m.add(n));
                        }
                } else {
                        mergeResultMap.putAll(needMergeResultMap);
                }

        }

}

2.2、executor任务线程池

executor任务线程池现在有两个,一个为StaticJobExecutor,一个为DynamicJobExecutor,字面上为静态和动态,其实一时没想好怎么命令,暂时使用了这么个名字。下面讲下两者区别。所谓静态是指,所有要执行的任务在一开始就已经确定,已经存在建立好了,我们可以一次性提交给executor去执行,而动态则是指,最开始我并不知道总共要执行多少任务,一有任务过来,我就提交给executor去执行。

静态任务线程池适用于一开始任务就比较明确,构造任务集合迅速,且任务数量不是很庞大的情况。其中任务数量庞大是因为大量job对象会占用内存,当然如果内存不是问题,那任务数也就不存在问题了。使用静态任务线程池,会将所有成功的任务和失败的任务分别保存下来。之前项目中的实现,大部分使用了静态任务线程池,因为任务是一开始就全部提交,所以我们可以知道任务总数,对任务执行进度进行监控,在实际生产中,可能进度监控是很重要的一个指标。在这次重构中,暂时未加入进度提示,以后根据需要再加。

StaticJobExecutor有两个构造方法,一个指定任务集合,另一个在指定任务集合的同时需指定线程数量。本来线程数想用cpu核数的两倍的,但是实际测试中发现,不同任务对最佳线程数的要求是不一样的。这个很好理解,我们都知道线程数并不是越多越好,通常的解释是,线程之间管理切换也需耗时,所以主要是看线程管理切换所消耗的时间足不足以抵消任务执行的时间。如果一个任务很耗cpu,那你线程再多也没用,这时我觉得线程数应该为cpu核数才合适,如果一个任务不耗cpu,那跟线程数也就没太大关系。所以使用多少线程需要根据你实际的任务多测试几次,才能测出最佳值。

StaticJobExecutor在实例化后,必须执行execute才真正开始执行任务,execute需要接收一个参数,为true的话,则会阻塞,等待所有任务执行完成。为false则立即返回。

StaticJobExecutor源码

package com.cangzhitao.multithreadjob.executor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.cangzhitao.multithreadjob.Job;
import com.cangzhitao.multithreadjob.listener.StaticJobExecutorThreadStopListener;
import com.cangzhitao.multithreadjob.thread.StaticJobExecutorThread;

public class StaticJobExecutor {

        /**
         * 要执行的任务列表
         */
        private LinkedBlockingQueue<Job> jobQueue;

        /**
         * 要执行的任务集合
         */
        private List<Job> jobList;

        /**
         * 成功完成任务集合
         */
        private List<Job> finishedJobList;

        /**
         * 失败的任务集合
         */
        private List<Job> failedJobList;

        /**
         * 启动线程数
         */
        private int threadNum;

        private Map<String, Object> resultMap = new HashMap<String, Object>();

        public StaticJobExecutor(List<Job> jobList) {
                this.jobList = jobList;
                jobQueue = new LinkedBlockingQueue<Job>(jobList);
                finishedJobList = new ArrayList<Job>();
                failedJobList = new ArrayList<Job>();

                //启动的线程数量不超过cpu线程数*2
                threadNum = jobList.size();
                int cpuNum = Runtime.getRuntime().availableProcessors()*2;
                if(threadNum>cpuNum) {
                        threadNum = cpuNum;
                }
        }

        public StaticJobExecutor(List<Job> jobList, int threadNum) {
                this.jobList = jobList;
                jobQueue = new LinkedBlockingQueue<Job>(jobList);
                finishedJobList = new ArrayList<Job>();
                failedJobList = new ArrayList<Job>();
                this.threadNum = threadNum;
        }

        private ExecutorService pool = null;

        public void execute(boolean wait) {
                pool = Executors.newCachedThreadPool();

                StaticJobExecutorThreadStopListener stopListener = new StaticJobExecutorThreadStopListener(this);
                for(int i=0;i<threadNum;i++) {
                        StaticJobExecutorThread thread = new StaticJobExecutorThread(jobQueue, finishedJobList, failedJobList);
                        thread.addJobExecutorStopListener(stopListener);
                        pool.execute(thread);
                }
                pool.shutdown();
                if(wait) {
                        try {
                                pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }
        }

        public List<Job> getJobList() {
                return jobList;
        }

        public List<Job> getFinishedJobList() {
                return finishedJobList;
        }

        public List<Job> getFailedJobList() {
                return failedJobList;
        }

        public Map<String, Object> getResultMap() {
                return resultMap;
        }

}
使用StaticJobExecutor测试的例子:

package staticjob;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import com.cangzhitao.multithreadjob.Job;
import com.cangzhitao.multithreadjob.executor.StaticJobExecutor;

public class TestStaticJobExecutor {

        /**
         * @param args
         */
        public static void main(String[] args) {
                testThread();
        }

        public static void testThread() {
                long d1 = new Date().getTime();
                List<Job> jobList = new ArrayList<Job>();
                for(long i=1;i<=1000000l;i++) {
                        Job job = new TestJob(i);
                        jobList.add(job);
                }
                StaticJobExecutor executor = new StaticJobExecutor(jobList,8);
                executor.execute(true);
                long d2 = new Date().getTime();
                System.out.println("executor result:"+executor.getResultMap().get("result"));
                System.out.println("executor time:"+(d2-d1));

        }

}

相比静态任务线程池,动态任务线程池最大的特点就是在任务开始执行后,只要不发出停止信号,可以继续提交任务。这时任务可以动态产生,动态添加进去,这其实就是生产者-消费者模式。中间使用一个“仓库”用来存储任务,这样也避免了内存溢出。同时为了节省内存使用,动态任务线程池没有保留执行成功和失败的任务列表。需要注意的是,如果任务产生的结果合并后还是很大的话,可能还是会造成内存溢出,如果发生这种情况,那就要想想如果在任务执行阶段或者结果合并阶段做处理了。

DynamicJobExecutor也有两个构造方法,一个传“仓库”大小,另一个还要传线程数。线程数的确定同StaticJobExecutor一样。关于仓库大小,同样也需要多次测试才能达到最佳值。

DynamicJobExecutor实例化后,同样需要执行execute方法,execute接收一个参数,但这个参数同StaticJobExecutor不一样,DynamicJobExecutor的execute方法是立即返回的,接收的参数Class<? extends Job> jobClass是真正任务的实现类,这里主要是告诉executor去调用Job的mergeResult方法将线程执行的结果合并到executor。

在执行execute方法后,只要仓库中有任务,会一直执行,如果仓库任务全部执行完了,就会处于阻塞状态,等待下一个任务的到来。如果确定所有任务都提交完了,这是需要执行stopExecutor方法来通知线程池停止运作。这也是和StaticJobExecutor不同的一个地方。

DynamicJobExecutor源码:

package com.cangzhitao.multithreadjob.executor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.cangzhitao.multithreadjob.Job;
import com.cangzhitao.multithreadjob.listener.DynamicJobExecutorThreadStopListener;
import com.cangzhitao.multithreadjob.thread.DynamicJobExecutorThread;

public class DynamicJobExecutor {

        /**
         * 要执行的任务列表
         */
        private LinkedBlockingQueue<Job> jobQueue;

        /**
         * 启动线程数
         */
        private int threadNum;

        private Map<String, Object> resultMap = new HashMap<String, Object>();

        /**
         *
         * @param threadNum 线程数
         * @param queueLength job队列缓存长度
         */
        public DynamicJobExecutor(int threadNum, int queueLength) {
                this.threadNum = threadNum;
                this.jobQueue = new LinkedBlockingQueue<Job>(queueLength);
        }

        /**
         * 线程数默认为cpu核数*2
         * @param queueLength job队列缓存长度
         */
        public DynamicJobExecutor(int queueLength) {
                this.jobQueue = new LinkedBlockingQueue<Job>(queueLength);
                //启动的线程数量不超过cpu线程数*2
                threadNum = Runtime.getRuntime().availableProcessors()*2;
        }

        private ExecutorService pool = null;

        private List<DynamicJobExecutorThread> threadList = new ArrayList<DynamicJobExecutorThread>();

        /**
         * 执行任务
         * @param jobClass 任务类,用来处理线程结果合并
         */
        public void execute(Class<? extends Job> jobClass) {
                pool = Executors.newCachedThreadPool();
                DynamicJobExecutorThreadStopListener stopListener = new DynamicJobExecutorThreadStopListener(this, jobClass);
                for(int i=0;i<threadNum;i++) {
                        DynamicJobExecutorThread thread = new DynamicJobExecutorThread(jobQueue);
                        thread.addJobExecutorStopListener(stopListener);
                        threadList.add(thread);
                        pool.execute(thread);
                }
                pool.shutdown();
        }

        /**
         * 停止任务
         * @param wait 是否阻塞等待线程执行完成
         */
        public void stopExecutor(boolean wait) {
                for(int i=0;i<threadList.size();i++) {
                        DynamicJobExecutorThread thread = threadList.get(i);
                        thread.setStopWork(true);
                }
                if(wait) {
                        try {
                                pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }
        }

        public Map<String, Object> getResultMap() {
                return resultMap;
        }

        public void addJob(Job job) {
                if(job==null) {
                        return;
                }
                try {
                        jobQueue.put(job);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
        }

        public void addJob(List<Job> jobList) {
                if(jobList==null||jobList.size()==0) {
                        return;
                }
                for(int i=0;i<jobList.size();i++) {
                        addJob(jobList.get(i));
                }
        }

}
使用TestDynamicJobExecutor测试的例子:

package staticjob;

import java.util.Date;

import com.cangzhitao.multithreadjob.Job;
import com.cangzhitao.multithreadjob.executor.DynamicJobExecutor;

public class TestDynamicJobExecutor {

        /**
         * @param args
         */
        public static void main(String[] args) {
                testThread();
        }

        public static void testThread() {
                long d1 = new Date().getTime();
                DynamicJobExecutor executor = new DynamicJobExecutor(8, 800);
                executor.execute(TestJob.class);

                for(long i=1;i<=1000000000l;i++) {
                        Job job = new TestJob(i);
                        executor.addJob(job);
                }
                executor.stopExecutor(true);

                long d2 = new Date().getTime();
                System.out.println("executor result:"+executor.getResultMap().get("result"));
                System.out.println("executor time:"+(d2-d1));

        }

}

2.3、thread线程

线程包下面有StaticJobExecutorThread,DynamicJobExecutorThread,分别对应StaticJobExecutor与DynamicJobExecutor,thread比较简单,主要从仓库队列中获取任务执行,同时添加了各种监听,以便扩展。

StaticJobExecutorThread源码:

package com.cangzhitao.multithreadjob.thread;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.cangzhitao.multithreadjob.Job;
import com.cangzhitao.multithreadjob.event.JobExecutorThreadStopEvent;
import com.cangzhitao.multithreadjob.event.JobFailEvent;
import com.cangzhitao.multithreadjob.event.JobReadyEvent;
import com.cangzhitao.multithreadjob.event.JobRetryEvent;
import com.cangzhitao.multithreadjob.event.JobSucessEvent;
import com.cangzhitao.multithreadjob.listener.JobExecutorThreadStopListener;
import com.cangzhitao.multithreadjob.listener.JobFailListener;
import com.cangzhitao.multithreadjob.listener.JobReadyListener;
import com.cangzhitao.multithreadjob.listener.JobRetryListener;
import com.cangzhitao.multithreadjob.listener.JobSucessListener;

/**
 * 静态任务提交线程,在线程开始执行前就将所有任务提交
 * @author Administrator
 *
 */
public class StaticJobExecutorThread extends Thread {

        private static final Log log = LogFactory.getLog(StaticJobExecutorThread.class);

        private final LinkedBlockingQueue<Job> jobQueue;

        private Set<JobSucessListener> jobSucessListenerSet = null;

        private Set<JobFailListener> jobFailListenerSet = null;

        private Set<JobRetryListener> jobRetryListenerSet = null;

        private Set<JobReadyListener> jobReadyListenerSet = null;

        private Set<JobExecutorThreadStopListener> jobExecutorStopListenerSet = null;

        private List<Job> finishedJobList;

        private List<Job> failedJobList;

        private Map<String, Object> resultMap = new HashMap<String, Object>();

        public StaticJobExecutorThread(LinkedBlockingQueue<Job> jobQueue, List<Job> finishedJobList, List<Job> failedJobList) {
                this.jobQueue = jobQueue;
                this.finishedJobList = finishedJobList;
                this.failedJobList = failedJobList;
        }

        public void run() {
                Job job = null;
                while((job=jobQueue.poll())!=null) {
                        if(job.getStatus()==Job.JOB_READY||job.getStatus()==Job.JOB_RETRY) {
                                int result = job.excute();
                                log.debug(this.getName()+job);
                                //重试
                                if(result==Job.JOB_RETRY) {
                                        try {
                                                jobQueue.put(job);
                                                notifyJobRetry(job);
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                        }
                                //成功
                                } else if(result==Job.JOB_SUCESS) {
                                        try {
                                                finishedJobList.add(job);
                                                mergeResult(job);
                                                notifyJobSucess(job);
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                        }
                                //失败
                                } else if(result==Job.JOB_FAIL) {
                                        try {
                                                failedJobList.add(job);
                                                notifyJobFail(job);
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                        }
                                //重新开始
                                } else if(result==Job.JOB_READY) {
                                        try {
                                                jobQueue.put(job);
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                        }
                                        notifyJobReady(job);
                                }
                        } else {
                                log.error(this.getName()+job);
                        }

                }
                log.info(this.getName()+":任务执行线程退出。");
                notifyJobStop();
        }

        private void mergeResult(Job job) {
                job.mergeResult(resultMap, job.getResultMap());
        }

        private void notifyJobStop() {
                if(jobExecutorStopListenerSet!=null&&jobExecutorStopListenerSet.size()>0) {
                        Iterator<JobExecutorThreadStopListener> it = jobExecutorStopListenerSet.iterator();
                        while(it.hasNext()) {
                                JobExecutorThreadStopListener listener = it.next();
                                listener.jobExecutorStopCallBack(new JobExecutorThreadStopEvent(this));
                        }
                }
        }

        private void notifyJobRetry(Job job) {
                if(jobRetryListenerSet!=null&&jobRetryListenerSet.size()>0) {
                        Iterator<JobRetryListener> it = jobRetryListenerSet.iterator();
                        while(it.hasNext()) {
                                JobRetryListener listener = it.next();
                                listener.jobRetryCallBack(new JobRetryEvent(job));
                        }
                }
        }

        private void notifyJobFail(Job job) {
                if(jobFailListenerSet!=null&&jobFailListenerSet.size()>0) {
                        Iterator<JobFailListener> it = jobFailListenerSet.iterator();
                        while(it.hasNext()) {
                                JobFailListener listener = it.next();
                                listener.jobFailCallBack(new JobFailEvent(job));
                        }
                }
        }

        private void notifyJobReady(Job job) {
                if(jobReadyListenerSet!=null&&jobReadyListenerSet.size()>0) {
                        Iterator<JobReadyListener> it = jobReadyListenerSet.iterator();
                        while(it.hasNext()) {
                                JobReadyListener listener = it.next();
                                listener.jobReadyCallBack(new JobReadyEvent(job));
                        }
                }
        }

        private void notifyJobSucess(Job job) {
                if(jobSucessListenerSet!=null&&jobSucessListenerSet.size()>0) {
                        Iterator<JobSucessListener> it = jobSucessListenerSet.iterator();
                        while(it.hasNext()) {
                                JobSucessListener listener = it.next();
                                listener.jobSucessCallBack(new JobSucessEvent(job));
                        }
                }
        }

        public void addJobExecutorStopListener(JobExecutorThreadStopListener listener) {
                if(jobExecutorStopListenerSet==null) {
                        jobExecutorStopListenerSet = new HashSet<JobExecutorThreadStopListener>();
                }
                jobExecutorStopListenerSet.add(listener);
        }

        public void addJobSucessListener(JobSucessListener listener) {
                if(jobSucessListenerSet==null) {
                        jobSucessListenerSet = new HashSet<JobSucessListener>();
                }
                jobSucessListenerSet.add(listener);
        }

        public void addJobRetryListener(JobRetryListener listener) {
                if(jobRetryListenerSet==null) {
                        jobRetryListenerSet = new HashSet<JobRetryListener>();
                }
                jobRetryListenerSet.add(listener);
        }

        public void addJobFailListener(JobFailListener listener) {
                if(jobFailListenerSet==null) {
                        jobFailListenerSet = new HashSet<JobFailListener>();
                }
                jobFailListenerSet.add(listener);
        }

        public void addJobReadyListener(JobReadyListener listener) {
                if(jobReadyListenerSet==null) {
                        jobReadyListenerSet = new HashSet<JobReadyListener>();
                }
                jobReadyListenerSet.add(listener);
        }

        public Map<String, Object> getResultMap() {
                return resultMap;
        }

}
DynamicJobExecutorThread源码:

package com.cangzhitao.multithreadjob.thread;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.cangzhitao.multithreadjob.Job;
import com.cangzhitao.multithreadjob.event.JobExecutorThreadStopEvent;
import com.cangzhitao.multithreadjob.event.JobFailEvent;
import com.cangzhitao.multithreadjob.event.JobReadyEvent;
import com.cangzhitao.multithreadjob.event.JobRetryEvent;
import com.cangzhitao.multithreadjob.event.JobSucessEvent;
import com.cangzhitao.multithreadjob.listener.JobExecutorThreadStopListener;
import com.cangzhitao.multithreadjob.listener.JobFailListener;
import com.cangzhitao.multithreadjob.listener.JobReadyListener;
import com.cangzhitao.multithreadjob.listener.JobRetryListener;
import com.cangzhitao.multithreadjob.listener.JobSucessListener;

/**
 *
 * @author Administrator
 *
 */
public class DynamicJobExecutorThread extends Thread {

        private static final Log log = LogFactory.getLog(DynamicJobExecutorThread.class);

        private final LinkedBlockingQueue<Job> jobQueue;

        private Set<JobSucessListener> jobSucessListenerSet = null;

        private Set<JobFailListener> jobFailListenerSet = null;

        private Set<JobRetryListener> jobRetryListenerSet = null;

        private Set<JobReadyListener> jobReadyListenerSet = null;

        private Set<JobExecutorThreadStopListener> jobExecutorStopListenerSet = null;

        private Map<String, Object> resultMap = new HashMap<String, Object>();

        private boolean stopWork = false;

        public DynamicJobExecutorThread(LinkedBlockingQueue<Job> jobQueue) {
                this.jobQueue = jobQueue;
        }

        public void run() {
                Job job = null;
                while(!stopWork) {
                        while((job=jobQueue.poll())!=null) {
                                if(job.getStatus()==Job.JOB_READY||job.getStatus()==Job.JOB_RETRY) {
                                        int result = job.excute();
                                        log.debug(this.getName()+job);
                                        //重试
                                        if(result==Job.JOB_RETRY) {
                                                try {
                                                        jobQueue.put(job);
                                                        notifyJobRetry(job);
                                                } catch (Exception e) {
                                                        e.printStackTrace();
                                                }
                                        //成功
                                        } else if(result==Job.JOB_SUCESS) {
                                                try {
                                                        mergeResult(job);
                                                        notifyJobSucess(job);
                                                } catch (Exception e) {
                                                        e.printStackTrace();
                                                }
                                        //失败
                                        } else if(result==Job.JOB_FAIL) {
                                                try {
                                                        notifyJobFail(job);
                                                } catch (Exception e) {
                                                        e.printStackTrace();
                                                }
                                        //重新开始
                                        } else if(result==Job.JOB_READY) {
                                                try {
                                                        jobQueue.put(job);
                                                } catch (Exception e) {
                                                        e.printStackTrace();
                                                }
                                                notifyJobReady(job);
                                        }
                                } else {
                                        log.error(this.getName()+job);
                                }

                        }
                }
                log.info(this.getName()+":任务执行线程退出。");
                notifyJobStop();
        }

        private void mergeResult(Job job) {
                job.mergeResult(resultMap, job.getResultMap());
        }

        private void notifyJobStop() {
                if(jobExecutorStopListenerSet!=null&&jobExecutorStopListenerSet.size()>0) {
                        Iterator<JobExecutorThreadStopListener> it = jobExecutorStopListenerSet.iterator();
                        while(it.hasNext()) {
                                JobExecutorThreadStopListener listener = it.next();
                                listener.jobExecutorStopCallBack(new JobExecutorThreadStopEvent(this));
                        }
                }
        }

        private void notifyJobRetry(Job job) {
                if(jobRetryListenerSet!=null&&jobRetryListenerSet.size()>0) {
                        Iterator<JobRetryListener> it = jobRetryListenerSet.iterator();
                        while(it.hasNext()) {
                                JobRetryListener listener = it.next();
                                listener.jobRetryCallBack(new JobRetryEvent(job));
                        }
                }
        }

        private void notifyJobFail(Job job) {
                if(jobFailListenerSet!=null&&jobFailListenerSet.size()>0) {
                        Iterator<JobFailListener> it = jobFailListenerSet.iterator();
                        while(it.hasNext()) {
                                JobFailListener listener = it.next();
                                listener.jobFailCallBack(new JobFailEvent(job));
                        }
                }
        }

        private void notifyJobReady(Job job) {
                if(jobReadyListenerSet!=null&&jobReadyListenerSet.size()>0) {
                        Iterator<JobReadyListener> it = jobReadyListenerSet.iterator();
                        while(it.hasNext()) {
                                JobReadyListener listener = it.next();
                                listener.jobReadyCallBack(new JobReadyEvent(job));
                        }
                }
        }

        private void notifyJobSucess(Job job) {
                if(jobSucessListenerSet!=null&&jobSucessListenerSet.size()>0) {
                        Iterator<JobSucessListener> it = jobSucessListenerSet.iterator();
                        while(it.hasNext()) {
                                JobSucessListener listener = it.next();
                                listener.jobSucessCallBack(new JobSucessEvent(job));
                        }
                }
        }

        public void addJobExecutorStopListener(JobExecutorThreadStopListener listener) {
                if(jobExecutorStopListenerSet==null) {
                        jobExecutorStopListenerSet = new HashSet<JobExecutorThreadStopListener>();
                }
                jobExecutorStopListenerSet.add(listener);
        }

        public void addJobSucessListener(JobSucessListener listener) {
                if(jobSucessListenerSet==null) {
                        jobSucessListenerSet = new HashSet<JobSucessListener>();
                }
                jobSucessListenerSet.add(listener);
        }

        public void addJobRetryListener(JobRetryListener listener) {
                if(jobRetryListenerSet==null) {
                        jobRetryListenerSet = new HashSet<JobRetryListener>();
                }
                jobRetryListenerSet.add(listener);
        }

        public void addJobFailListener(JobFailListener listener) {
                if(jobFailListenerSet==null) {
                        jobFailListenerSet = new HashSet<JobFailListener>();
                }
                jobFailListenerSet.add(listener);
        }

        public void addJobReadyListener(JobReadyListener listener) {
                if(jobReadyListenerSet==null) {
                        jobReadyListenerSet = new HashSet<JobReadyListener>();
                }
                jobReadyListenerSet.add(listener);
        }

        public Map<String, Object> getResultMap() {
                return resultMap;
        }

        public boolean isStopWork() {
                return stopWork;
        }

        public void setStopWork(boolean stopWork) {
                if(stopWork) {
                        log.info(this.getName()+":执行线程接受到退出信号,即将推出任务执行。");
                }
                this.stopWork = stopWork;
        }

}

2.4、event事件

这里定义了五种类型的事件,分别是JobExecutorThreadStopEvent,JobFailEvent,JobReadyEvent,JobRetryEvent,JobSucessEvent。其中暂时只用到了JobExecutorThreadStopEvent,该事件是当一个线程结束任务的时候触发。其他四个类型的事件对应了Job各种状态的转化,其中JobReady可能没用,因为现在暂时设计所有Job在初始化时就是ready状态,执行过程中,暂时不会有转化到ready状态的可能。event的源码就不列出了,参见附件整个工程源码。

2、5、listener监听

五种listener接口对应了上面五种event,暂时只添加了一个DynamicJobExecutorThreadStopListener实现了JobExecutorThreadStopListener,该listener里面定义了jobExecutorStopCallBack方法,每个线程结束后都会调用该方法,这里DynamicJobExecutorThreadStopListener使用该方法用来合并线程与executor的resultMap,注意,这里实现的方法里面加了同步代码。以后也会继续扩展别的监听方法,让job状态转化后直接调用job里面定义的指定方法。

DynamicJobExecutorThreadStopListener源码:

package com.cangzhitao.multithreadjob.listener;

import com.cangzhitao.multithreadjob.Job;
import com.cangzhitao.multithreadjob.event.JobExecutorThreadStopEvent;
import com.cangzhitao.multithreadjob.executor.DynamicJobExecutor;
import com.cangzhitao.multithreadjob.thread.DynamicJobExecutorThread;

public class DynamicJobExecutorThreadStopListener implements
                JobExecutorThreadStopListener {

        private DynamicJobExecutor executor;

        private Class<? extends Job> jobClass;

        public DynamicJobExecutorThreadStopListener(DynamicJobExecutor executor, Class<? extends Job> jobClass) {
                this.executor = executor;
                this.jobClass = jobClass;
        }

        @Override
        public void jobExecutorStopCallBack(JobExecutorThreadStopEvent event) {
                DynamicJobExecutorThread thread = (DynamicJobExecutorThread) event.getSource();
                synchronized (executor) {
                        Job job = null;
                        try {
                                job = jobClass.newInstance();
                                job.mergeResult(executor.getResultMap(), thread.getResultMap());
                        } catch (InstantiationException e) {
                                e.printStackTrace();
                        } catch (IllegalAccessException e) {
                                e.printStackTrace();
                        }
                }
        }

}

3、实际应用:

上面代码主要是在完成公司一个大数据平台项目时笔者所写代码重构而来。主要应用在两块,一块是写,一块是读,好像这已经包括了所有场景。

写是写hbase,首先写hbase使用多线程并发写入效果是明显的,其次,在写入过程中,我们同时进行数据统计,这个过程可以同时执行,效率更高。

而读即查询服务,举个例子,查询一个用户的一些数据需要0.3s,这速度算是可以了,但是如果要查询多个用户的数据,比如10个用户,如果循环调用的话,时间线性增长就会变成3s,这是不能容忍的。有些人可能要问了,查询是个用户为什么要循环调呢,像我们写sql,改成in多个值不就行了。确实,像关系数据库实现的话,查一个和多个,效率可能不会有太大影响,至少不是线性增长,但在hadoop的hive里面,使用in或or这样的sql语法,执行效率特别慢,基本上成线性增长,当然笔者就使用过一个商业的hadoop平台,不能保证所有平台都这样,但如果真这样的时候,我们就可以使用多线程来进行查询,最后将结果进行合并。假如查10个用户的数据,我们开10个线程去查询,每个线程只需执行一次,结果基本上和原来一个用户查询时间0.3s差不多。当然这种做法也不是万能的,如果要查10000个用户的数据,我们总不可能无限的设置线程数目,得测试出最大值,最合理的值。

4、结语:

最后还要提一点的是,笔者当初举了求平方和的这个例子,想用一个很大的n,然后证明使用多线程计算会比单线程for循环要快,事实证明笔者错了,不管n为多少,多线程都没能跑过单线程。

最近写多了多线程,已经形成了一种思想,能用多线程处理的笔者都给你使用多线程来处理,多线程怎么也比单线程处理快。是啊,多线程处理不可能比单线程慢啊,那为什么笔者写的多线程计算平方和会比单线程慢呢。其实答案在上面线程数设置的时候已经说明过了,平方和本身计算很快,我想主要时间都浪费在了线程管理切换上下文上面了。

上面的例子失败在于job抽象得太细了,如果我们不是把每个自然数的求平方抽象成一个job,而是把n等分成i份,每份建立一个job,求这n/i个数的平方和,再启用j个线程来执行这些job,笔者相信,应该会找到合适的i和j,能使多线程求平方和快于单线程求平方和。

5、工程源码:

工程需要依赖log4j以及commons-logging两个jar包,请自行加入

/upload/2014/11/24/MultiThreadJob.zip

2014.11.19 ps:

昨天发现一个bug,动态任务执行线程在任务队列没有执行完成就提前退出了,笔者当初的设想是收到退出信号后,要把当前任务队列里的所有任务执行完再真正退出。所以修复了一下DynamicJobExecutorThread,并对DynamicJobExecutor增加了两个构造方法,上面下载的源码已为修复过的源码。

...
public class DynamicJobExecutorThread extends Thread {
        ....
        public void run() {
                while((job=jobQueue.poll())!=null||!stopWork) {
                        if(job==null) {
                                continue;
                        }
                        ....
                }
        }
        ...
}

发表评论