PowerJob分布式任务调度源码流程解读

 更新时间:2024年02月16日 09:46:25   作者:codecraft  
这篇文章主要为大家介绍了PowerJob分布式任务调度源码流程解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

架构图

官方提供:

本文主要研究一下PowerJob的任务调度

CoreScheduleTaskManager

tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java

@Service
@Slf4j
@RequiredArgsConstructor
public class CoreScheduleTaskManager implements InitializingBean, DisposableBean {
    private final PowerScheduleService powerScheduleService;
    private final InstanceStatusCheckService instanceStatusCheckService;
    private final List<Thread> coreThreadContainer = new ArrayList<>();
    @SuppressWarnings("AlibabaAvoidManuallyCreateThread")
    @Override
    public void afterPropertiesSet() {
        // 定时调度
        coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)), "Thread-ScheduleCronJob"));
        coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleDailyTimeIntervalJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)), "Thread-ScheduleDailyTimeIntervalJob"));
        coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow"));
        coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob"));
        // 数据清理
        coreThreadContainer.add(new Thread(new LoopRunnable("CleanWorkerData", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::cleanData), "Thread-CleanWorkerData"));
        // 状态检查
        coreThreadContainer.add(new Thread(new LoopRunnable("CheckRunningInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkRunningInstance), "Thread-CheckRunningInstance"));
        coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingDispatchInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingDispatchInstance), "Thread-CheckWaitingDispatchInstance"));
        coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingWorkerReceiveInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingWorkerReceiveInstance), "Thread-CheckWaitingWorkerReceiveInstance"));
        coreThreadContainer.add(new Thread(new LoopRunnable("CheckWorkflowInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWorkflowInstance), "Thread-CheckWorkflowInstance"));
        coreThreadContainer.forEach(Thread::start);
    }
    //......
}
CoreScheduleTaskManager在afterPropertiesSet的时候会启动一系列的线程,它们都是LoopRunnable类型的,分别调度powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)、powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)、powerScheduleService::scheduleCronWorkflow、powerScheduleService::scheduleFrequentJob、powerScheduleService::cleanData、instanceStatusCheckService::checkRunningInstance、instanceStatusCheckService::checkWaitingDispatchInstance、instanceStatusCheckService::checkWaitingWorkerReceiveInstance、instanceStatusCheckService::checkWorkflowInstance

LoopRunnable

@RequiredArgsConstructor
    private static class LoopRunnable implements Runnable {

        private final String taskName;

        private final Long runningInterval;

        private final Runnable innerRunnable;

        @SuppressWarnings("BusyWait")
        @Override
        public void run() {
            log.info("start task : {}.", taskName);
            while (true) {
                try {
                    innerRunnable.run();
                    Thread.sleep(runningInterval);
                } catch (InterruptedException e) {
                    log.warn("[{}] task has been interrupted!", taskName, e);
                    break;
                } catch (Exception e) {
                    log.error("[{}] task failed!", taskName, e);
                }
            }
        }
    }
LoopRunnable的构造器接收taskName、runningInterval、innerRunnable三个参数,其run方法通过while true循环内部执行innerRunnable.run(),执行完sleep指定的runningInterval,若捕获到InterruptedException则break跳出循环,若其他异常则打印error日志

PowerScheduleService

PowerScheduleService主要提供了scheduleNormalJob、scheduleCronWorkflow、scheduleFrequentJob、cleanData方法

scheduleNormalJob

tech/powerjob/server/core/scheduler/PowerScheduleService.java

public void scheduleNormalJob(TimeExpressionType timeExpressionType) {
        long start = System.currentTimeMillis();
        // 调度 CRON 表达式 JOB
        try {
            final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
            if (CollectionUtils.isEmpty(allAppIds)) {
                log.info("[NormalScheduler] current server has no app's job to schedule.");
                return;
            }
            scheduleNormalJob0(timeExpressionType, allAppIds);
        } catch (Exception e) {
            log.error("[NormalScheduler] schedule cron job failed.", e);
        }
        long cost = System.currentTimeMillis() - start;
        log.info("[NormalScheduler] {} job schedule use {} ms.", timeExpressionType, cost);
        if (cost > SCHEDULE_RATE) {
            log.warn("[NormalScheduler] The database query is using too much time({}ms), please check if the database load is too high!", cost);
        }
    }
scheduleNormalJob方法主要是查询当前server负责的appId列表,然后内部委托改为scheduleNormalJob0

scheduleNormalJob0

private void scheduleNormalJob0(TimeExpressionType timeExpressionType, List<Long> appIds) {

        long nowTime = System.currentTimeMillis();
        long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
        Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {

            try {

                // 查询条件:任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行
                List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), timeExpressionType.getV(), timeThreshold);

                if (CollectionUtils.isEmpty(jobInfos)) {
                    return;
                }

                // 1. 批量写日志表
                Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
                log.info("[NormalScheduler] These {} jobs will be scheduled: {}.", timeExpressionType.name(), jobInfos);

                jobInfos.forEach(jobInfo -> {
                    Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId();
                    jobId2InstanceId.put(jobInfo.getId(), instanceId);
                });
                instanceInfoRepository.flush();

                // 2. 推入时间轮中等待调度执行
                jobInfos.forEach(jobInfoDO -> {

                    Long instanceId = jobId2InstanceId.get(jobInfoDO.getId());

                    long targetTriggerTime = jobInfoDO.getNextTriggerTime();
                    long delay = 0;
                    if (targetTriggerTime < nowTime) {
                        log.warn("[Job-{}] schedule delay, expect: {}, current: {}", jobInfoDO.getId(), targetTriggerTime, System.currentTimeMillis());
                    } else {
                        delay = targetTriggerTime - nowTime;
                    }

                    InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfoDO, instanceId, Optional.empty(), Optional.empty()));
                });

                // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms)
                jobInfos.forEach(jobInfoDO -> {
                    try {
                        refreshJob(timeExpressionType, jobInfoDO);
                    } catch (Exception e) {
                        log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e);
                    }
                });
                jobInfoRepository.flush();


            } catch (Exception e) {
                log.error("[NormalScheduler] schedule {} job failed.", timeExpressionType.name(), e);
            }
        });
    }
scheduleNormalJob0主要是调度CRON、DAILY_TIME_INTERVAL类型的任务,它通过jobInfoRepository查找指定appId、状态启用、指定TimeExpressionType,以及NextTriggerTime小于等于nowTime + 2 * SCHEDULE_RATE的任务,然后挨个执行instanceService.create创建任务实例,然后放入到InstanceTimeWheelService.schedule进行调度,最后计算和更新一下每个job的nextTriggerTime

scheduleCronWorkflow

public void scheduleCronWorkflow() {
        long start = System.currentTimeMillis();
        // 调度 CRON 表达式 WORKFLOW
        try {
            final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
            if (CollectionUtils.isEmpty(allAppIds)) {
                log.info("[CronWorkflowSchedule] current server has no app's workflow to schedule.");
                return;
            }
            scheduleWorkflowCore(allAppIds);
        } catch (Exception e) {
            log.error("[CronWorkflowSchedule] schedule cron workflow failed.", e);
        }
        long cost = System.currentTimeMillis() - start;
        log.info("[CronWorkflowSchedule] cron workflow schedule use {} ms.", cost);
        if (cost > SCHEDULE_RATE) {
            log.warn("[CronWorkflowSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost);
        }
    }
scheduleCronWorkflow主要是调度CRON 表达式 WORKFLOW,内部委托给scheduleWorkflowCore

scheduleFrequentJob

public void scheduleFrequentJob() {
        long start = System.currentTimeMillis();
        // 调度 FIX_RATE/FIX_DELAY 表达式 JOB
        try {
            final List&lt;Long&gt; allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
            if (CollectionUtils.isEmpty(allAppIds)) {
                log.info("[FrequentJobSchedule] current server has no app's job to schedule.");
                return;
            }
            scheduleFrequentJobCore(allAppIds);
        } catch (Exception e) {
            log.error("[FrequentJobSchedule] schedule frequent job failed.", e);
        }
        long cost = System.currentTimeMillis() - start;
        log.info("[FrequentJobSchedule] frequent job schedule use {} ms.", cost);
        if (cost &gt; SCHEDULE_RATE) {
            log.warn("[FrequentJobSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost);
        }
    }
scheduleFrequentJob主要是调度FIX_RATE/FIX_DELAY 表达式 JOB,内部委托给了scheduleFrequentJobCore

scheduleFrequentJobCore

private void scheduleFrequentJobCore(List<Long> appIds) {

        Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
            try {
                // 查询所有的秒级任务(只包含ID)
                List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.FREQUENT_TYPES);
                if (CollectionUtils.isEmpty(jobIds)) {
                    return;
                }
                // 查询日志记录表中是否存在相关的任务
                List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.GENERALIZED_RUNNING_STATUS);
                Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList);

                List<Long> notRunningJobIds = Lists.newLinkedList();
                jobIds.forEach(jobId -> {
                    if (!runningJobIdSet.contains(jobId)) {
                        notRunningJobIds.add(jobId);
                    }
                });

                if (CollectionUtils.isEmpty(notRunningJobIds)) {
                    return;
                }

                notRunningJobIds.forEach(jobId -> {
                    Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
                    jobInfoOpt.ifPresent(jobInfoDO -> {
                        LifeCycle lifeCycle = LifeCycle.parse(jobInfoDO.getLifecycle());
                        // 生命周期已经结束
                        if (lifeCycle.getEnd() != null && lifeCycle.getEnd() < System.currentTimeMillis()) {
                            jobInfoDO.setStatus(SwitchableStatus.DISABLE.getV());
                            jobInfoDO.setGmtModified(new Date());
                            jobInfoRepository.saveAndFlush(jobInfoDO);
                            log.info("[FrequentScheduler] disable frequent job,id:{}.", jobInfoDO.getId());
                        } else if (lifeCycle.getStart() == null || lifeCycle.getStart() < System.currentTimeMillis() + SCHEDULE_RATE * 2) {
                            log.info("[FrequentScheduler] schedule frequent job,id:{}.", jobInfoDO.getId());
                            jobService.runJob(jobInfoDO.getAppId(), jobId, null, Optional.ofNullable(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis());
                        }
                    });
                });
            } catch (Exception e) {
                log.error("[FrequentScheduler] schedule frequent job failed.", e);
            }
        });
    }
scheduleFrequentJobCore主要是调度秒级任务,它先找出秒级任务的id,然后过滤掉正在运行的任务,剩下的未运行的任务挨个判断是否需要调度,需要则执行jobService.runJob

cleanData

public void cleanData() {
        try {
            final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
            if (allAppIds.isEmpty()) {
                return;
            }
            WorkerClusterManagerService.clean(allAppIds);
        } catch (Exception e) {
            log.error("[CleanData] clean data failed.", e);
        }
    }
cleanData主要是通过WorkerClusterManagerService.clean来维护当前server负责的appId缓存

InstanceStatusCheckService

InstanceStatusCheckService提供了checkRunningInstance、checkWaitingDispatchInstance、checkWaitingWorkerReceiveInstance、checkWorkflowInstance方法

小结

PowerJob的CoreScheduleTaskManager在afterPropertiesSet的时候会启动一系列的线程,它们都是LoopRunnable类型的,其中scheduleNormalJob主要是调度CRON、DAILY_TIME_INTERVAL类型的任务,scheduleCronWorkflow主要是调度CRON 表达式 WORKFLOW任务,scheduleFrequentJob主要是调度FIX_RATE/FIX_DELAY 表达式 JOB。

以上就是PowerJob分布式任务调度源码流程解读的详细内容,更多关于PowerJob分布式任务调度的资料请关注脚本之家其它相关文章!

相关文章

  • 如何用Eureka + Feign搭建分布式微服务

    如何用Eureka + Feign搭建分布式微服务

    Eureka是Spring Cloud Netflix的一部分,是一个服务注册中心。其服务生态中主要有三个角色:Eureka注册中心、服务提供者、服务消费者。服务提供者注册到Eureka后,服务消费者就能够直接向Eureka查询当前有哪些服务可用,再从中选取一个消费.本文讲解如何搭建分布式微服务
    2021-06-06
  • SpringSecurity+OAuth2.0 搭建认证中心和资源服务中心流程分析

    SpringSecurity+OAuth2.0 搭建认证中心和资源服务中心流程分析

    OAuth 2.0 主要用于在互联网上安全地委托授权,广泛应用于身份验证和授权场景,这篇文章介绍SpringSecurity+OAuth2.0 搭建认证中心和资源服务中心,感兴趣的朋友一起看看吧
    2024-01-01
  • 关于SpringBoot的异常回滚和事务的使用详解

    关于SpringBoot的异常回滚和事务的使用详解

    这篇文章主要介绍了关于SpringBoot的异常回滚和事务的使用详解,Spring中 @Transactional 注解,默认情况下,只对抛出的RuntimeException 异常,才会事务回滚,需要的朋友可以参考下
    2023-05-05
  • Java如何按16进制发送和接收TCP指令

    Java如何按16进制发送和接收TCP指令

    这篇文章主要介绍了Java如何按16进制发送和接收TCP指令问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-09-09
  • 深入浅析Java中普通代码块、构造代码块与静态代码块

    深入浅析Java中普通代码块、构造代码块与静态代码块

    这篇文章主要介绍了Java中普通代码块、构造代码块与静态代码块的相关资料,静态代码块>Main()>构造代码块 。非常不错,具有参考借鉴价值,需要的朋友可以参考下
    2016-08-08
  • Java设计模式之里氏替换原则精解

    Java设计模式之里氏替换原则精解

    设计模式(Design pattern)代表了最佳的实践,通常被有经验的面向对象的软件开发人员所采用。设计模式是软件开发人员在软件开发过程中面临的一般问题的解决方案。本篇介绍设计模式七大原则之一的里氏替换原则
    2022-02-02
  • ArrayList详解和使用示例_动力节点Java学院整理

    ArrayList详解和使用示例_动力节点Java学院整理

    ArrayList 是一个数组队列,相当于 动态数组。与Java中的数组相比,它的容量能动态增长。接下来通过本文给大家介绍arraylist详解和使用示例代码,需要的的朋友一起学习吧
    2017-05-05
  • Java中的Gradle与Groovy的区别及存在的关系

    Java中的Gradle与Groovy的区别及存在的关系

    这篇文章主要介绍了Java中的Gradle与Groovy的区别及存在的关系,Groovy是一种JVM语言,它可以编译为与Java相同的字节码,并且可以与Java类无缝地互操作,Gradle是Java项目中主要的构建系统之一,下文关于两者的详细内容,需要的小伙伴可以参考一下
    2022-02-02
  • java安全之CommonsCollections4详解

    java安全之CommonsCollections4详解

    这篇文章主要介绍了java安全之CommonsCollections4详解
    2022-08-08
  • java 图片与base64相互转化的示例

    java 图片与base64相互转化的示例

    这篇文章主要介绍了java 图片与base64相互转化的示例,帮助大家更好的理解和使用Java,感兴趣的朋友可以了解下
    2020-10-10

最新评论