elastic-job源码分析


先简单介绍下elastic-job,它是当当公司开源的一个分布式调度解决方案。大家都知道,当数据量比较小的时候,我们可以只用quartz只在一台服务器上处理所有的数据。随着业务发展,数据量越来越大,一台机器已经不足以支撑,就必须想办法将一个任务分成多个小任务,拆分到不同的服务器上并行的执行。例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。需要注意的是elastic-job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

功能列表

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 支持并行调度
  • 支持作业声明周期操作
  • 丰富的作业类型
  • Spring整合以及命名空间提供
  • 运维平台

这是我从当当网在github上开源的elastic-job上复制过来的,elastic-job的github地址

源码分析

我们会针对上面功能列表中提到的一些功能,来研究一下源码,看下elastic-job是怎么实现它说的一些功能的。

测试用demo

我们基于spring配置的一个SimpleJob。
Application.xml

<?xml version="1.0" encoding="UTF-8"?>
    <bean id="elasticJobLog" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="${event.rdb.driver}"/>
        <property name="url" value="${event.rdb.url}"/>
        <property name="username" value="${event.rdb.username}"/>
        <property name="password" value="${event.rdb.password}"/>
    </bean>

    <reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />

    <job:simple id="${simple.id}" class="${simple.class}" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}" event-trace-rdb-data-source="elasticJobLog">
        <job:listener class="${listener.simple}" />
        <job:distributed-listener class="${listener.distributed}" started-timeout-milliseconds="${listener.distributed.startedTimeoutMilliseconds}" completed-timeout-milliseconds="${listener.distributed.completedTimeoutMilliseconds}" />
    </job:simple>
</beans>

*.properties

<!--zk相关配置-->
serverLists=localhost:2181
namespace=elastic-job-example-lite-spring
baseSleepTimeMilliseconds=1000
maxSleepTimeMilliseconds=3000
maxRetries=3
<!--job配置-->
listener.simple=com.dangdang.ddframe.job.example.listener.SpringSimpleListener
listener.distributed=com.dangdang.ddframe.job.example.listener.SpringSimpleDistributeListener
listener.distributed.startedTimeoutMilliseconds=10000
listener.distributed.completedTimeoutMilliseconds=30000

simple.id=springSimpleJob
simple.class=com.dangdang.ddframe.job.example.job.simple.SpringSimpleJob
simple.cron=0/10 * * * * ?
simple.shardingTotalCount=3
simple.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
simple.monitorExecution=false
simple.failover=true
simple.description=\u53EA\u8FD0\u884C\u4E00\u6B21\u7684\u4F5C\u4E1A\u793A\u4F8B
simple.disabled=false
simple.overwrite=true
simple.monitorPort=9888

通过这个xml,我们可以得到以下几个信息:

  • Spring整合以及命名空间提供
  • 利用了zookeeper,我们不难猜出,这个zk应该是用住配置中心的,提供集群相关的一些配置

应用启动配置zk监听

JobScheduler

public void init() {
    jobExecutor.init();
    JobTypeConfiguration jobTypeConfig = jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig();
    JobScheduleController jobScheduleController = new JobScheduleController(
            createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName);
    jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron());
    jobRegistry.addJobScheduleController(jobName, jobScheduleController);
}

上述方法会调用到jobExecutor.init();
JobExecutor

public void init() {
    log.debug("Job '{}' controller init.", liteJobConfig.getJobName());
    schedulerFacade.clearPreviousServerStatus();
    regCenter.addCacheData("/" + liteJobConfig.getJobName());
    schedulerFacade.registerStartUpInfo(liteJobConfig);
}

public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) {
    listenerManager.startAllListeners();
    leaderElectionService.leaderForceElection();
    configService.persist(liteJobConfig);
    serverService.persistServerOnline(!liteJobConfig.isDisabled());
    serverService.clearJobPausedStatus();
    shardingService.setReshardingFlag();
    monitorService.listen();
    listenerManager.setCurrentShardingTotalCount(configService.load(false).getTypeConfig().getCoreConfig().getShardingTotalCount());
}
  • 每次作业启动前清理上次运行状态(删除zk上之前本服务器ip的节点: /{namespace}/{jobName}/servers/{ip});
  • 然后缓存/{namespace}/{jobName}/节点;
  • 最后注册一些TreeCacheListener(上面注册的listener都实现了这个接口)的实例来监听zk集群的节点变化,并发起主节点选举。

接着来看JobScheduler.init方法后面的内容,可以看到先获取需要运行的job的配置信息,然后创建Scheduler和jobDetail,通过quartz配合job,并把该job对应的jobScheduleController缓存到本地的jobRegistry中。(通过quartz会创建一个{jobName}_worker的线程,这个线程就是用来启动批处理作业的)

{jobName}_worker线程

public static final class LiteJob implements Job {

    @Setter
    private ElasticJob elasticJob;

    @Setter
    private JobFacade jobFacade;

    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}

这个LiteJob实现了quartz的Job接口,是job执行的入口。
AbstractElasticJobExecutor

public final void execute() {
    try {
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobExceptionHandler.handleException(jobName, cause);
    }
    ShardingContexts shardingContexts = jobFacade.getShardingContexts();
    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
    if (jobFacade.misfireIfNecessary(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet()));
        return;
    }
    jobFacade.cleanPreviousExecutionInfo();
    try {
        jobFacade.beforeJobExecuted(shardingContexts);
        //CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //CHECKSTYLE:ON
        jobExceptionHandler.handleException(jobName, cause);
    }
    execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
    while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
    }
    jobFacade.failoverIfNecessary();
    try {
        jobFacade.afterJobExecuted(shardingContexts);
        //CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //CHECKSTYLE:ON
        jobExceptionHandler.handleException(jobName, cause);
    }
}

上述execute过程包含以下几个步骤:

  • 检查作业执行环境(本机与注册中心的时间误差秒数是否在允许范围)
  • 然后获取该job分片信息的上下文shardingContexts(通过从zk获取job分片的总数,当前可以用来执行job的server总数,然后默认通过基于平均分配算法的分片策略对总的job分片进行分配)
  • 然后执行jobFacade.beforeJobExecuted(shardingContexts);如果有定义ElasticJobListener监听,则执行beforeJobExecuted方法。
  • 执行execute方法(这个后面再详细分析)
  • 然后执行jobFacade.doAfterJobExecutedAtLastCompleted(shardingContexts);如果有定义ElasticJobListener监听,则执行doAfterJobExecutedAtLastCompleted方法。
/**
 * 基于平均分配算法的分片策略.
 *
 * <p>
 * 如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器.
 * 如:
 * 1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
 * 2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
 * 3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
 * </p>
 *
 * @author zhangliang
 */

接着分析之前一笔带过的execute的方法,最终会调用到
AbstractElasticJobExecutor.process方法

private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
    Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
    if (1 == items.size()) {
        int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
        JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
        process(shardingContexts, item, jobExecutionEvent);
        return;
    }
    final CountDownLatch latch = new CountDownLatch(items.size());
    for (final int each : items) {
        final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
        if (executorService.isShutdown()) {
            return;
        }
        executorService.submit(new Runnable() {

            @Override
            public void run() {
                try {
                    process(shardingContexts, each, jobExecutionEvent);
                } finally {
                    latch.countDown();
                }
            }
        });
    }
    try {
        latch.await();
    } catch (final InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}

可以看到如果分配到本作业服务器上的分片数若大于1,则通过线程分别把本作业上获取的n个分片构建分片上下文信息丢进线程池;若分片数目为1,则直接在本线程中执行。我们看下执行的process方法
AbstractElasticJobExecutor.process方法

private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
    jobFacade.postJobExecutionEvent(startEvent);
    log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
    JobExecutionEvent completeEvent = null;
    try {
        process(new ShardingContext(shardingContexts, item));
        completeEvent = startEvent.executionSuccess();
        log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        // CHECKSTYLE:ON
        completeEvent = startEvent.executionFailure(cause);
        itemErrorMessages.put(item, ExceptionUtil.transform(cause));
        jobExceptionHandler.handleException(jobName, cause);
    } finally {
        jobFacade.postJobExecutionEvent(completeEvent);
    }
}

这个方法会调用到process(new ShardingContext(shardingContexts, item));这个方法在AbstractElasticJobExecutor是个抽象方法。回头看下我们上面的一段代码

public static final class LiteJob implements Job {

    @Setter
    private ElasticJob elasticJob;

    @Setter
    private JobFacade jobFacade;

    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}

最早我们通过JobExecutorFactory获取JobExecutor,因为最开始我们创建的是SimpleJob,所以这里我们获取的实际上是一个SimpleJobExecutor,这个类继承AbstractElasticJobExecutor,所以最终会调用到
SimpleJobExecutor的process方法。然后SimpleJobExecutor持有一个SimpleJob的实例,最终会把分片上下文作为参数传递给我们自己实现的一个SimpleJob的实例。我们就可以根据传递给我们的分片上下文信息来编写业务批处理代码了。

public final class SimpleJobExecutor extends AbstractElasticJobExecutor {

    private final SimpleJob simpleJob;

    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.simpleJob = simpleJob;
    }

    @Override
    protected void process(final ShardingContext shardingContext) {
        simpleJob.execute(shardingContext);
    }
}

参考


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
spring容器启动@Value属性无法注入 spring容器启动@Value属性无法注入
问题一个同事基于Annotation配置了一段代码,结果有一个Configuration类的两个@Value标注的属性值没有注入进来,代码如下: @Configuration @PropertySource("mysql.prop
2017-04-16
下一篇 
spring-kafka源码分析二(Consumer) spring-kafka源码分析二(Consumer)
上一篇文章我们分析了spring-kafka的producer,这篇文章我们就要来分析下consumer。 ContainerProperties containerProps = new ContainerProperties(
2016-12-30
  目录