PowerJob的ServerDiscoveryService工作流程源码解读

 更新时间:2023年12月24日 15:26:55   作者:codecraft  
这篇文章主要为大家介绍了PowerJob的ServerDiscoveryService工作流程源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下PowerJob的ServerDiscoveryService

ServerDiscoveryService

tech/powerjob/worker/background/ServerDiscoveryService.java

@Slf4j
public class ServerDiscoveryService {
    private final Long appId;
    private final PowerJobWorkerConfig config;
    private String currentServerAddress;
    private final Map<String, String> ip2Address = Maps.newHashMap();
    /**
     *  服务发现地址
     */
    private static final String DISCOVERY_URL = "http://%s/server/acquire?%s";
    /**
     * 失败次数
     */
    private static int FAILED_COUNT = 0;
    /**
     * 最大失败次数
     */
    private static final int MAX_FAILED_COUNT = 3;
    public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) {
        this.appId = appId;
        this.config = config;
    }
    //......
}
ServerDiscoveryService定义了currentServerAddress、ip2Address、服务发现url模版,失败次数,最大失败次数

start

public void start(ScheduledExecutorService timingPool) {
        this.currentServerAddress = discovery();
        if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
            throw new PowerJobException("can't find any available server, this worker has been quarantined.");
        }
        // 这里必须保证成功
        timingPool.scheduleAtFixedRate(() -> {
                    try {
                        this.currentServerAddress = discovery();
                    } catch (Exception e) {
                        log.error("[PowerDiscovery] fail to discovery server!", e);
                    }
                }
                , 10, 10, TimeUnit.SECONDS);
    }
其start方法先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress

discovery

private String discovery() {
        if (ip2Address.isEmpty()) {
            config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
        }
        String result = null;
        // 先对当前机器发起请求
        String currentServer = currentServerAddress;
        if (!StringUtils.isEmpty(currentServer)) {
            String ip = currentServer.split(":")[0];
            // 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担
            String firstServerAddress = ip2Address.get(ip);
            if (firstServerAddress != null) {
                result = acquire(firstServerAddress);
            }
        }
        for (String httpServerAddress : config.getServerAddress()) {
            if (StringUtils.isEmpty(result)) {
                result = acquire(httpServerAddress);
            }else {
                break;
            }
        }
        if (StringUtils.isEmpty(result)) {
            log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");
            // 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务
            if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
                log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
                List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys();
                if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
                    frequentInstanceIds.forEach(instanceId -> {
                        HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId);
                        taskTracker.destroy();
                        log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
                    });
                }
                FAILED_COUNT = 0;
            }
            return null;
        } else {
            // 重置失败次数
            FAILED_COUNT = 0;
            log.debug("[PowerDiscovery] current server is {}.", result);
            return result;
        }
    }
discovery方法从config.getServerAddress()解析地址放到ip2Address,若currentServerAddress有值则acquire,否则遍历config.getServerAddress()执行acquire;若还没有获取到则判断FAILED_COUNT是否超出MAX_FAILED_COUNT,超出则遍历HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys()挨个执行remove及destory

acquire

private String acquire(String httpServerAddress) {
        String result = null;
        String url = buildServerDiscoveryUrl(httpServerAddress);
        try {
            result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
        }catch (Exception ignore) {
        }
        if (!StringUtils.isEmpty(result)) {
            try {
                ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
                if (resultDTO.isSuccess()) {
                    return resultDTO.getData().toString();
                }
            }catch (Exception ignore) {
            }
        }
        return null;
    }
    private String buildServerDiscoveryUrl(String address) {
        ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
                .setAppId(appId)
                .setCurrentServer(currentServerAddress)
                .setProtocol(config.getProtocol().name().toUpperCase());
        String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
        return String.format(DISCOVERY_URL, address, query);
    }
acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取地址

ServerController

tech/powerjob/server/web/controller/ServerController.java

@RestController
@RequestMapping("/server")
@RequiredArgsConstructor
public class ServerController implements ServerInfoAware {
    private ServerInfo serverInfo;
    private final TransportService transportService;
    private final ServerElectionService serverElectionService;
    private final AppInfoRepository appInfoRepository;
    private final WorkerClusterQueryService workerClusterQueryService;
    //......
    @GetMapping("/acquire")
    public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
        return ResultDTO.success(serverElectionService.elect(request));
    }
    //......    
}
acquireServer方法执行serverElectionService.elect(request)返回server地址

elect

tech/powerjob/server/remote/server/election/ServerElectionService.java

public String elect(ServerDiscoveryRequest request) {
        if (!accurate()) {
            final String currentServer = request.getCurrentServer();
            // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功
            Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
            if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
                log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
                return currentServer;
            }
        }
        return getServer0(request);
    }
elect方法判断如果是本机就直接返回,否则执行getServer0

getServer0

private String getServer0(ServerDiscoveryRequest discoveryRequest) {
        final Long appId = discoveryRequest.getAppId();
        final String protocol = discoveryRequest.getProtocol();
        Set<String> downServerCache = Sets.newHashSet();
        for (int i = 0; i < RETRY_TIMES; i++) {
            // 无锁获取当前数据库中的Server
            Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
            if (!appInfoOpt.isPresent()) {
                throw new PowerJobException(appId + " is not registered!");
            }
            String appName = appInfoOpt.get().getAppName();
            String originServer = appInfoOpt.get().getCurrentServer();
            String activeAddress = activeAddress(originServer, downServerCache, protocol);
            if (StringUtils.isNotEmpty(activeAddress)) {
                return activeAddress;
            }
            // 无可用Server,重新进行Server选举,需要加锁
            String lockName = String.format(SERVER_ELECT_LOCK, appId);
            boolean lockStatus = lockService.tryLock(lockName, 30000);
            if (!lockStatus) {
                try {
                    Thread.sleep(500);
                }catch (Exception ignore) {
                }
                continue;
            }
            try {
                // 可能上一台机器已经完成了Server选举,需要再次判断
                AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
                String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);
                if (StringUtils.isNotEmpty(address)) {
                    return address;
                }
                // 篡位,如果本机存在协议,则作为Server调度该 worker
                final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
                if (targetProtocolInfo != null) {
                    // 注意,写入 AppInfoDO#currentServer 的永远是 default 的地址,仅在返回的时候特殊处理为协议地址
                    appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
                    appInfo.setGmtModified(new Date());
                    appInfoRepository.saveAndFlush(appInfo);
                    log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
                    return targetProtocolInfo.getAddress();
                }
            }catch (Exception e) {
                log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
            } finally {
                lockService.unlock(lockName);
            }
        }
        throw new PowerJobException("server elect failed for app " + appId);
    }
getServer0方法先判断appInfoRepository中当前appId的originServer是否存活,是则直接返回,否则加锁将transportService.defaultProtocol().getAddress()写入到appInfo的currentServer

小结

PowerJob的ServerDiscoveryService定义了start方法,它先通过discovery方法获取currentServerAddress,然后注册定时任务每隔10s重新刷新一下currentServerAddress;

discovery方法主要是遍历config.getServerAddress()执行acquire;

acquire方法通过buildServerDiscoveryUrl构建url,然后执行HttpUtils.get(url)获取该appId的server地址。

以上就是PowerJob的ServerDiscoveryService工作流程源码解读的详细内容,更多关于PowerJob ServerDiscoveryService的资料请关注脚本之家其它相关文章!

相关文章

  • 如何使用ThreadLocal上下文解决查询性能问题

    如何使用ThreadLocal上下文解决查询性能问题

    这篇文章主要介绍了利用ThreadLocal上下文解决查询性能问题,有两种解决方案,一种是使用ThreadLocal上下文,另一种是使用Redis缓存,需要的朋友可以参考下
    2023-07-07
  • 解决kafka消息堆积及分区不均匀的问题

    解决kafka消息堆积及分区不均匀的问题

    这篇文章主要介绍了解决kafka消息堆积及分区不均匀的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java8新特性:函数式编程

    Java8新特性:函数式编程

    Java8最新引入函数式编程概念,该项技术可以大大提升编码效率,本文会对涉及的对象等进行两种方法的对比,对新技术更直白的看到变化,更方便学习
    2021-06-06
  • RestTemplate未使用线程池问题的解决方法

    RestTemplate未使用线程池问题的解决方法

    今天给大家带来的是关于Springboot的相关知识,文章围绕着RestTemplate未使用线程池展开,文中有非常详细的介绍及代码示例,需要的朋友可以参考下
    2021-06-06
  • spring中向一个单例bean中注入非单例bean的方法详解

    spring中向一个单例bean中注入非单例bean的方法详解

    Spring是先将Bean对象实例化之后,再设置对象属性,所以会先调用他的无参构造函数实例化,每个对象存在一个map中,当遇到依赖,就去map中调用对应的单例对象,这篇文章主要给大家介绍了关于spring中向一个单例bean中注入非单例bean的相关资料,需要的朋友可以参考下
    2021-07-07
  • Java点餐小程序之黑心商人

    Java点餐小程序之黑心商人

    这篇文章主要介绍了一个Java编程的小程序-点餐系统,算是对之前所学习的Java基础知识作了一个汇总,需要的朋友可以参考下
    2017-09-09
  • 关于动态参数使用@PathVariable的解析

    关于动态参数使用@PathVariable的解析

    这篇文章主要介绍了关于动态参数使用@PathVariable的解析,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-02-02
  • Feign 使用HttpClient和OkHttp方式

    Feign 使用HttpClient和OkHttp方式

    这篇文章主要介绍了Feign 使用HttpClient和OkHttp方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • Java设计模式编程中的责任链模式使用示例

    Java设计模式编程中的责任链模式使用示例

    这篇文章主要介绍了Java设计模式编程中的责任链模式使用示例,责任链模式可以避免很多请求的发送者和接收者之间的耦合关系,需要的朋友可以参考下
    2016-05-05
  • Java基于正则实现的日期校验功能示例

    Java基于正则实现的日期校验功能示例

    这篇文章主要介绍了Java基于正则实现的日期校验功能,涉及java文件读取、日期转换及字符串正则匹配相关操作技巧,需要的朋友可以参考下
    2017-03-03

最新评论