PowerJob的DispatchStrategy方法工作流程源码解读

 更新时间:2024年01月12日 09:34:39   作者:codecraft  
这篇文章主要为大家介绍了PowerJob的DispatchStrategy方法工作流程源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下PowerJob的DispatchStrategy

DispatchStrategy

tech/powerjob/common/enums/DispatchStrategy.java

@Getter
@AllArgsConstructor
public enum DispatchStrategy {
    HEALTH_FIRST(1),
    RANDOM(2);
    private final int v;
    public static DispatchStrategy of(Integer v) {
        if (v == null) {
            return HEALTH_FIRST;
        }
        for (DispatchStrategy ds : values()) {
            if (v.equals(ds.v)) {
                return ds;
            }
        }
        throw new IllegalArgumentException("unknown DispatchStrategy of " + v);
    }
}
DispatchStrategy定义了HEALTH_FIRST、RANDOM两个枚举值

getSuitableWorkers

tech/powerjob/server/remote/worker/WorkerClusterQueryService.java

public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) {

        List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());

        workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));

        DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy());
        switch (dispatchStrategy) {
            case RANDOM:
                Collections.shuffle(workers);
                break;
            case HEALTH_FIRST:
                workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore());
                break;
            default:
                // do nothing
        }

        // 限定集群大小(0代表不限制)
        if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
            workers = workers.subList(0, jobInfo.getMaxWorkerCount());
        }
        return workers;
    }
WorkerClusterQueryService的getSuitableWorkers方法先通过getWorkerInfosByAppId获取指定appId的WorkerInfo,然后通过filterWorker进行一次过滤,最后根据dispatchStrategy来对workers进行排序,如果是RANDOM则通过Collections.shuffle(workers)随机化,如果是HEALTH_FIRST则根据systemMetrics的calculateScore结果进行排序,如果有限定maxWorkerCount则对workers进行subList,没有则返回排序后的workers

getWorkerInfosByAppId

private Map<String, WorkerInfo> getWorkerInfosByAppId(Long appId) {
        ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId);
        if (clusterStatusHolder == null) {
            log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId);
            return Collections.emptyMap();
        }
        return clusterStatusHolder.getAllWorkers();
    }

    public Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
        return WorkerClusterManagerService.getAppId2ClusterStatus();
    }
getWorkerInfosByAppId通过WorkerClusterManagerService.getAppId2ClusterStatus()获取ClusterStatusHolder,在返回ClusterStatusHolder的getAllWorkers

filterWorker

private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) {
        for (WorkerFilter filter : workerFilters) {
            if (filter.filter(workerInfo, jobInfo)) {
                return true;
            }
        }
        return false;
    }
filterWorker方法则是遍历workerFilters直接filter

calculateScore

tech/powerjob/common/model/SystemMetrics.java

public int calculateScore() {
        if (score > 0) {
            return score;
        }
        // Memory is vital to TaskTracker, so we set the multiplier factor as 2.
        double memScore = (jvmMaxMemory - jvmUsedMemory) * 2;
        // Calculate the remaining load of CPU. Multiplier is set as 1.
        double cpuScore = cpuProcessors - cpuLoad;
        // Windows can not fetch CPU load, set cpuScore as 1.
        if (cpuScore > cpuProcessors) {
            cpuScore = 1;
        }
        score = (int) (memScore + cpuScore);
        return score;
    }
SystemMetrics的calculateScore方法则是基于memScore与cpuScore来计算

WorkerFilter

public interface WorkerFilter {

    /**
     *
     * @param workerInfo worker info, maybe you need to use your customized info in SystemMetrics#extra
     * @param jobInfoDO job info
     * @return true will remove the worker in process list
     */
    boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO);
}
WorkerFilter定义了filter接口用于过滤worker,它有3个实现类,分别是DesignatedWorkerFilter、DisconnectedWorkerFilter、SystemMetricsWorkerFilter

DesignatedWorkerFilter

tech/powerjob/server/extension/defaultimpl/workerfilter/DesignatedWorkerFilter.java

@Slf4j
@Component
public class DesignatedWorkerFilter implements WorkerFilter {
    @Override
    public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
        String designatedWorkers = jobInfo.getDesignatedWorkers();
        // no worker is specified, no filter of any
        if (StringUtils.isEmpty(designatedWorkers)) {
            return false;
        }
        Set<String> designatedWorkersSet = Sets.newHashSet(SJ.COMMA_SPLITTER.splitToList(designatedWorkers));
        for (String tagOrAddress : designatedWorkersSet) {
            if (tagOrAddress.equals(workerInfo.getTag()) || tagOrAddress.equals(workerInfo.getAddress())) {
                return false;
            }
        }
        return true;
    }
}
DesignatedWorkerFilter的filter方法遍历jobInfo的designatedWorkers信息,判断workerInfo的tag或者address是否匹配

DisconnectedWorkerFilter

tech/powerjob/server/extension/defaultimpl/workerfilter/DisconnectedWorkerFilter.java

@Slf4j
@Component
public class DisconnectedWorkerFilter implements WorkerFilter {

    @Override
    public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
        boolean timeout = workerInfo.timeout();
        if (timeout) {
            log.info("[Job-{}] filter worker[{}] due to timeout(lastActiveTime={})", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getLastActiveTime());
        }
        return timeout;
    }
}
DisconnectedWorkerFilter的filter方法则通过WorkerInfo的timeout方法来判断,它主要是判断当前时间与lastActiveTime的时间差是否大于WORKER_TIMEOUT_MS(60s)

SystemMetricsWorkerFilter

tech/powerjob/server/extension/defaultimpl/workerfilter/SystemMetricsWorkerFilter.java

@Slf4j
@Component
public class SystemMetricsWorkerFilter implements WorkerFilter {

    @Override
    public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
        SystemMetrics metrics = workerInfo.getSystemMetrics();
        boolean filter = !metrics.available(jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
        if (filter) {
            log.info("[Job-{}] filter worker[{}] because the {} do not meet the requirements", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getSystemMetrics());
        }
        return filter;
    }
}
SystemMetricsWorkerFilter的filter方法则根据workerInfo的SystemMetrics判断可用cpu核数、内存、磁盘空间是否大于阈值

小结

DispatchStrategy定义了HEALTH_FIRST、RANDOM两个枚举值;WorkerClusterQueryService的getSuitableWorkers方法先通过getWorkerInfosByAppId获取指定appId的WorkerInfo,然后通过filterWorker进行一次过滤,最后根据dispatchStrategy来对workers进行排序,如果是RANDOM则通过Collections.shuffle(workers)随机化,如果是HEALTH_FIRST则根据systemMetrics的calculateScore结果进行排序,如果有限定maxWorkerCount则对workers进行subList,没有则返回排序后的workers。

以上就是PowerJob的DispatchStrategy方法工作流程源码解读的详细内容,更多关于PowerJob DispatchStrategy的资料请关注脚本之家其它相关文章!

相关文章

  • 使用Spring Cache设置缓存条件操作

    使用Spring Cache设置缓存条件操作

    这篇文章主要介绍了使用Spring Cache设置缓存条件操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java泛型 <T> T、 T、<T>的用法小结

    Java泛型 <T> T、 T、<T>的用法小结

    T在Java泛型中,被称作类型变量, 有的方法返回值是<T> T,有的是T,区别在哪里,本文主要介绍了Java泛型 <T> T、 T、<T>的用法小结,具有一定的参考价值,感兴趣的可以了解下
    2023-12-12
  • springboot项目编译提示无效的源发行版17解决办法

    springboot项目编译提示无效的源发行版17解决办法

    这篇文章主要给大家介绍了关于springboot项目编译提示无效的源发行版17解决办法,这个错误意味着你的Spring Boot项目正在使用Java 17这个版本,但是你的项目中未配置正确的Java版本,需要的朋友可以参考下
    2023-06-06
  • Java8 CompletableFuture详解

    Java8 CompletableFuture详解

    这篇文章主要介绍了Java8 CompletableFuture详解,CompletableFuture extends Future提供了方法,一元操作符和促进异步性以及事件驱动编程模型,需要的朋友可以参考下
    2014-06-06
  • Java实现List去重的方法详解

    Java实现List去重的方法详解

    本文用示例介绍Java的List(ArrayList、LinkedList等)的去重的方法。List去重的常用方法一般是:JDK8的stream的distinct、转为HashSet、转为TreeSet等,感兴趣的可以了解一下
    2022-05-05
  • JavaWeb ServletConfig作用及原理分析讲解

    JavaWeb ServletConfig作用及原理分析讲解

    ServletConfig对象,叫Servlet配置对象。主要用于加载配置文件的初始化参数。我们知道一个Web应用里面可以有多个servlet,如果现在有一份数据需要传给所有的servlet使用,那么我们就可以使用ServletContext对象了
    2022-10-10
  • 浅谈Java字符串比较的三种方法

    浅谈Java字符串比较的三种方法

    这篇文章主要介绍了浅谈Java字符串比较的三种方法,字符串比较是常见的操作,包括比较相等、比较大小、比较前缀和后缀串等,需要的朋友可以参考下
    2023-04-04
  • 快速理解Java设计模式中的组合模式

    快速理解Java设计模式中的组合模式

    这篇文章主要介绍了快速理解Java设计模式中的组合模式,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • Spring Data MongoDB中实现自定义级联的方法详解

    Spring Data MongoDB中实现自定义级联的方法详解

    这篇文章主要给大家介绍了关于Spring Data MongoDB中实现自定义级联的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2017-11-11
  • JVM核心教程之JVM运行与类加载全过程详解

    JVM核心教程之JVM运行与类加载全过程详解

    我们都知道一个java程序运行要经过编译和执行,但是这太概括了,中间还有很多步骤,下面这篇文章主要给大家介绍了关于JVM核心教程之JVM运行与类加载全过程的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下。
    2018-04-04

最新评论