前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式调度中间件xxl-job(七):调度器Trigger

分布式调度中间件xxl-job(七):调度器Trigger

作者头像
闲宇非鱼
发布2022-02-08 11:30:30
5980
发布2022-02-08 11:30:30
举报

 上一篇中我们了解了调度中心除了调度器以外的基本功能,今天我们就来学习一下 xxl-job「调度器(Trigger)」 是如何工作的。

一、简介

  在前面的学习我们可以看到,xxl-job「时间调度」「具体任务调度」 两个逻辑进行了拆分。在 JobScheduleHelper 中进行时间调度逻辑处理,将具体任务调度逻辑放置在 JobTriggerPoolHelper 中。所以这章我们主要学习 JobTriggerPoolHelper 中是如何进行具体任务调度的。   先总体来看下 JobTriggerPoolHelper 提供的方法:

  从上面的方法中可以总结出 JobTriggerPoolHelper 大致提供了一下三个功能:

  • 启动处理线程
  • 终止处理线程
  • 进行任务触发

  下面我们就来看看 JobTriggerPoolHelper 具体的处理逻辑。

二、任务触发器 JobTriggerPoolHelper

1. 触发方式

  通过查看 JobTriggerPoolHelpertrigger() 方法的使用者,我们可以看到有一下五种触发任务的场景:

  • 在调度中心页面中触发一次任务;
  • 由调度中心根据时间调度进行任务触发;
  • 任务失败重新进行任务触发;
  • 父任务完成进行子任务触发;
  • 通过API调用进行任务触发;

  当然,我们也可以通过触发类型枚举类 TriggerTypeEnum 来查看:

代码语言:javascript
复制
/**
 * 人工触发
 */
MANUAL(I18nUtil.getString("jobconf_trigger_type_manual")),
/**
 * 根据cron表达式触发
 */
CRON(I18nUtil.getString("jobconf_trigger_type_cron")),
/**
 * 失败重试触发
 */
RETRY(I18nUtil.getString("jobconf_trigger_type_retry")),
/**
 * 父任务触发
 */
PARENT(I18nUtil.getString("jobconf_trigger_type_parent")),
/**
 * API调用触发
 */
API(I18nUtil.getString("jobconf_trigger_type_api"));

2. 快任务线程池和慢任务线程池

  由于在 xxl-job 使用了线程池来进行任务调度,一旦出现某个任务调度时间过长致使线程阻塞就会导致调度中心调度效率的下降。为了解决这一问题,xxl-job 创建了 「快任务线程池」「慢任务线程池」 。   一般情况,任务默认放置在快任务线程池中进行任务触发。这里 xxl-job 设置了一个任务触发时间窗口,长度为500ms。触发器在任务触发过程中每分钟检查当前任务已触发时间,如果超过时间窗口的次数超过10次,则会将该任务降级到慢任务线程池中。具体处理逻辑如下:

代码语言:javascript
复制
// 线程池选择逻辑
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {
    // job-timeout 10 times in 1 min
    triggerPool_ = slowTriggerPool;
}

...
// 时间窗口检查逻辑
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
    minTim = minTim_now;
    jobTimeoutCountMap.clear();
}

// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) {
    // ob-timeout threshold 500ms
    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
    if (timeoutCount != null) {
        timeoutCount.incrementAndGet();
    }
}

3. 任务触发

  任务触发是通过快任务线程池/慢任务线程池调用 XxlJobTriggertrigger() 方法实现。具体代码如下:

代码语言:javascript
复制
public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {

    // load data
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // cover addressList
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}

  在 trigger() 方法中进行了如下处理:

  • 根据 jobId 加载任务信息,并在任务信息中加载执行参数;
  • 获取任务重试次数;
  • 重新检查执行器列表并更新;
  • 获取分片执行信息 executorShardingParam ,将其转化为分片参数数组,数据大小为2,第一个元素存放的是 「当前执行的任务分片index」 ,第二个元素存放的是 「任务分片的总数」
  • 根据任务执行路由策略判断是否进行分片执行,如果进行分片执行,则按照当前执行器组的情况进行分片执行, 「分片总数等于执行器组中在线的机器数」 。如果不进行分片执行,则设置 「分片index为0」 ,分片 「总数为1」 ,然后进行任务执行。

三、总结

  至此,xxl-job 的主要处理逻辑已经全部学习完毕(终于水完了,完结撒花~~ )。此时回头看一看原先的文章,里面仍有不少地方分析的不够细致,但是作为基本认知入门应该还是可以的,吧?哈哈哈~~   疫情虽然稳定,但形式仍旧严峻,希望大家还是要时刻警惕。最后祝各位身体健康,工作顺心,加油~~

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Brucebat的伪技术鱼塘 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、简介
  • 二、任务触发器 JobTriggerPoolHelper
    • 1. 触发方式
      • 2. 快任务线程池和慢任务线程池
        • 3. 任务触发
        • 三、总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档