PowerJob Alarmable工作流程源码剖析

 更新时间:2024年01月15日 08:42:24   作者:codecraft  
这篇文章主要为大家介绍了PowerJob Alarmable工作流程源码剖析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下PowerJob的Alarmable

Alarmable

tech/powerjob/server/extension/Alarmable.java

public interface Alarmable {
    void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
}
Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList

Alarm

public interface Alarm extends PowerSerializable {
    String fetchTitle();
    default String fetchContent() {
        StringBuilder sb = new StringBuilder();
        JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));
        content.forEach((key, originWord) -> {
            sb.append(key).append(": ");
            String word = String.valueOf(originWord);
            if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {
                try {
                    if (originWord instanceof Long) {
                        word = CommonUtils.formatTime((Long) originWord);
                    }
                }catch (Exception ignore) {
                }
            }
            sb.append(word).append(OmsConstant.LINE_SEPARATOR);
        });
        return sb.toString();
    }
}
Alarm定义了fetchTitle方法,提供了fetchContent默认方法,它有两个实现类分别是JobInstanceAlarm、WorkflowInstanceAlarm

DingTalkAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java

@Slf4j
@Service
@RequiredArgsConstructor
public class DingTalkAlarmService implements Alarmable {
    private final Environment environment;
    private Long agentId;
    private DingTalkUtils dingTalkUtils;
    private Cache<String, String> mobile2UserIdCache;
    private static final int CACHE_SIZE = 8192;
    /**
     * 防止缓存击穿
     */
    private static final String EMPTY_TAG = "EMPTY";
    @Override
    public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
        if (dingTalkUtils == null) {
            return;
        }
        Set<String> userIds = Sets.newHashSet();
        targetUserList.forEach(user -> {
            String phone = user.getPhone();
            if (StringUtils.isEmpty(phone)) {
                return;
            }
            try {
                String userId = mobile2UserIdCache.get(phone, () -> {
                    try {
                        return dingTalkUtils.fetchUserIdByMobile(phone);
                    } catch (PowerJobException ignore) {
                        return EMPTY_TAG;
                    } catch (Exception ignore) {
                        return null;
                    }
                });
                if (!EMPTY_TAG.equals(userId)) {
                    userIds .add(userId);
                }
            }catch (Exception ignore) {
            }
        });
        userIds.remove(null);
        if (!userIds.isEmpty()) {
            String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);
            List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();
            markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));
            String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);
            markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));
            try {
                dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);
            }catch (Exception e) {
                log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());
            }
        }
    }
    @PostConstruct
    public void init() {
        String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);
        String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);
        String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);
        log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);
        if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {
            log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");
            return;
        }
        if (!StringUtils.isNumeric(agentId)) {
            log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);
            return;
        }
        this.agentId = Long.valueOf(agentId);
        dingTalkUtils = new DingTalkUtils(appKey, appSecret);
        mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build();
        log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");
    }
}
DingTalkAlarmService实现了Alarmable接口,其onFailed遍历targetUserList获取userId,最后通过dingTalkUtils.sendMarkdownAsync发送

MailAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java

@Slf4j
@Service
public class MailAlarmService implements Alarmable {
    @Resource
    private Environment environment;
    private JavaMailSender javaMailSender;
    @Value("${spring.mail.username:''}")
    private String from;
    @Override
    public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
        if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
            return;
        }
        SimpleMailMessage sm = new SimpleMailMessage();
        try {
            sm.setFrom(from);
            sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));
            sm.setSubject(alarm.fetchTitle());
            sm.setText(alarm.fetchContent());
            javaMailSender.send(sm);
        }catch (Exception e) {
            log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
        }
    }
    @Autowired(required = false)
    public void setJavaMailSender(JavaMailSender javaMailSender) {
        this.javaMailSender = javaMailSender;
    }
}
MailAlarmService实现了Alarmable接口,其onFailed方法构建SimpleMailMessage,然后通过spring的javaMailSender.send发送

WebHookAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/WebHookAlarmService.java

@Slf4j
@Service
public class WebHookAlarmService implements Alarmable {
    private static final String HTTP_PROTOCOL_PREFIX = "http://";
    private static final String HTTPS_PROTOCOL_PREFIX = "https://";
    @Override
    public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
        if (CollectionUtils.isEmpty(targetUserList)) {
            return;
        }
        targetUserList.forEach(user -> {
            String webHook = user.getWebHook();
            if (StringUtils.isEmpty(webHook)) {
                return;
            }
            // 自动添加协议头
            if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {
                webHook = HTTP_PROTOCOL_PREFIX + webHook;
            }
            MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
            RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));
            try {
                String response = HttpUtils.post(webHook, requestBody);
                log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);
            }catch (Exception e) {
                log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);
            }
        });
    }
}
WebHookAlarmService实现了Alarmable接口,其onFailed方法遍历targetUserList,挨个执行HttpUtils.post(webHook, requestBody),用的是okhttp3来实现http请求回调

小结

PowerJob的Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList;它有三个实现类,分别是DingTalkAlarmService(用的是DingTalkClient)、MailAlarmService(用的是spring的JavaMailSender)、WebHookAlarmService(用的是okhttp3的OkHttpClient)。

以上就是PowerJob Alarmable工作流程源码剖析的详细内容,更多关于PowerJob Alarmable的资料请关注脚本之家其它相关文章!

相关文章

  • Java Volatile关键字实现原理过程解析

    Java Volatile关键字实现原理过程解析

    这篇文章主要介绍了Java Volatile关键字实现原理过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • JavaWeb学习笔记之Filter和Listener

    JavaWeb学习笔记之Filter和Listener

    这篇文章主要给大家介绍了关于JavaWeb学习笔记之Filter和Listener的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-03-03
  • Java深入讲解异常处理try catch的使用

    Java深入讲解异常处理try catch的使用

    这篇文章主要介绍了Java异常处理机制try catch流程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2022-06-06
  • 基于Java实现根据周次生成数据的工具类

    基于Java实现根据周次生成数据的工具类

    这篇文章主要为大家详细介绍了一个基于Java的周次生成工具,用于根据指定的年份和周次类型(自然周或财务周)生成连续的周次列表数据,希望对大家有所帮助
    2025-08-08
  • Springboot如何使用OSHI获取和操作系统和硬件信息

    Springboot如何使用OSHI获取和操作系统和硬件信息

    这篇文章主要介绍了Springboot如何使用OSHI获取和操作系统和硬件信息问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-10-10
  • java获取登录者IP和登录时间的两种实现代码详解

    java获取登录者IP和登录时间的两种实现代码详解

    这篇文章主要介绍了java获取登录者IP和登录时间的实现代码,本文通过两种结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-07-07
  • SpringBoot中的@Configuration、@MapperScan注解

    SpringBoot中的@Configuration、@MapperScan注解

    SpringBoot中的@Configuration和@MapperScan注解分别用于配置类和Mapper接口的自动扫描,感兴趣的朋友跟随小编一起看看吧
    2024-11-11
  • SpringBoot入门之集成JSP的示例代码

    SpringBoot入门之集成JSP的示例代码

    这篇文章主要介绍了SpringBoot入门之集成JSP的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07
  • JAVA异常处理机制之throws/throw使用情况

    JAVA异常处理机制之throws/throw使用情况

    这篇文章主要介绍了JAVA异常处理机制之throws/throw使用情况的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07
  • SpringBoot整合Redis使用RedisTemplate和StringRedisTemplate

    SpringBoot整合Redis使用RedisTemplate和StringRedisTemplate

    Spring Boot Data(数据) Redis 中提供了RedisTemplate和StringRedisTemplate,其中StringRedisTemplate是RedisTemplate的子类,两个方法基本一致。本文介绍了SpringBoot整合Redis使用RedisTemplate和StringRedisTemplate的方法,需要的可以参考一下
    2022-12-12

最新评论