PowerJob Alarmable工作流程源码剖析

 更新时间:2024年01月15日 08:42:24   作者:codecraft  
这篇文章主要为大家介绍了PowerJob Alarmable工作流程源码剖析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下PowerJob的Alarmable

Alarmable

tech/powerjob/server/extension/Alarmable.java

public interface Alarmable {
    void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
}
Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList

Alarm

public interface Alarm extends PowerSerializable {
    String fetchTitle();
    default String fetchContent() {
        StringBuilder sb = new StringBuilder();
        JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));
        content.forEach((key, originWord) -> {
            sb.append(key).append(": ");
            String word = String.valueOf(originWord);
            if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {
                try {
                    if (originWord instanceof Long) {
                        word = CommonUtils.formatTime((Long) originWord);
                    }
                }catch (Exception ignore) {
                }
            }
            sb.append(word).append(OmsConstant.LINE_SEPARATOR);
        });
        return sb.toString();
    }
}
Alarm定义了fetchTitle方法,提供了fetchContent默认方法,它有两个实现类分别是JobInstanceAlarm、WorkflowInstanceAlarm

DingTalkAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java

@Slf4j
@Service
@RequiredArgsConstructor
public class DingTalkAlarmService implements Alarmable {
    private final Environment environment;
    private Long agentId;
    private DingTalkUtils dingTalkUtils;
    private Cache<String, String> mobile2UserIdCache;
    private static final int CACHE_SIZE = 8192;
    /**
     * 防止缓存击穿
     */
    private static final String EMPTY_TAG = "EMPTY";
    @Override
    public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
        if (dingTalkUtils == null) {
            return;
        }
        Set<String> userIds = Sets.newHashSet();
        targetUserList.forEach(user -> {
            String phone = user.getPhone();
            if (StringUtils.isEmpty(phone)) {
                return;
            }
            try {
                String userId = mobile2UserIdCache.get(phone, () -> {
                    try {
                        return dingTalkUtils.fetchUserIdByMobile(phone);
                    } catch (PowerJobException ignore) {
                        return EMPTY_TAG;
                    } catch (Exception ignore) {
                        return null;
                    }
                });
                if (!EMPTY_TAG.equals(userId)) {
                    userIds .add(userId);
                }
            }catch (Exception ignore) {
            }
        });
        userIds.remove(null);
        if (!userIds.isEmpty()) {
            String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);
            List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();
            markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));
            String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);
            markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));
            try {
                dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);
            }catch (Exception e) {
                log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());
            }
        }
    }
    @PostConstruct
    public void init() {
        String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);
        String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);
        String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);
        log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);
        if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {
            log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");
            return;
        }
        if (!StringUtils.isNumeric(agentId)) {
            log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);
            return;
        }
        this.agentId = Long.valueOf(agentId);
        dingTalkUtils = new DingTalkUtils(appKey, appSecret);
        mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).softValues().build();
        log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");
    }
}
DingTalkAlarmService实现了Alarmable接口,其onFailed遍历targetUserList获取userId,最后通过dingTalkUtils.sendMarkdownAsync发送

MailAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java

@Slf4j
@Service
public class MailAlarmService implements Alarmable {
    @Resource
    private Environment environment;
    private JavaMailSender javaMailSender;
    @Value("${spring.mail.username:''}")
    private String from;
    @Override
    public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
        if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
            return;
        }
        SimpleMailMessage sm = new SimpleMailMessage();
        try {
            sm.setFrom(from);
            sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));
            sm.setSubject(alarm.fetchTitle());
            sm.setText(alarm.fetchContent());
            javaMailSender.send(sm);
        }catch (Exception e) {
            log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
        }
    }
    @Autowired(required = false)
    public void setJavaMailSender(JavaMailSender javaMailSender) {
        this.javaMailSender = javaMailSender;
    }
}
MailAlarmService实现了Alarmable接口,其onFailed方法构建SimpleMailMessage,然后通过spring的javaMailSender.send发送

WebHookAlarmService

tech/powerjob/server/extension/defaultimpl/alarm/impl/WebHookAlarmService.java

@Slf4j
@Service
public class WebHookAlarmService implements Alarmable {
    private static final String HTTP_PROTOCOL_PREFIX = "http://";
    private static final String HTTPS_PROTOCOL_PREFIX = "https://";
    @Override
    public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
        if (CollectionUtils.isEmpty(targetUserList)) {
            return;
        }
        targetUserList.forEach(user -> {
            String webHook = user.getWebHook();
            if (StringUtils.isEmpty(webHook)) {
                return;
            }
            // 自动添加协议头
            if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {
                webHook = HTTP_PROTOCOL_PREFIX + webHook;
            }
            MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
            RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));
            try {
                String response = HttpUtils.post(webHook, requestBody);
                log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);
            }catch (Exception e) {
                log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);
            }
        });
    }
}
WebHookAlarmService实现了Alarmable接口,其onFailed方法遍历targetUserList,挨个执行HttpUtils.post(webHook, requestBody),用的是okhttp3来实现http请求回调

小结

PowerJob的Alarmable接口定义了onFailed方法,其入参为alarm及targetUserList;它有三个实现类,分别是DingTalkAlarmService(用的是DingTalkClient)、MailAlarmService(用的是spring的JavaMailSender)、WebHookAlarmService(用的是okhttp3的OkHttpClient)。

以上就是PowerJob Alarmable工作流程源码剖析的详细内容,更多关于PowerJob Alarmable的资料请关注脚本之家其它相关文章!

相关文章

  • java实现多设备同时登录或强制下线

    java实现多设备同时登录或强制下线

    本文主要介绍了java实现多设备同时登录或强制下线,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • Java解压zip文件完整代码分享

    Java解压zip文件完整代码分享

    这篇文章主要介绍了Java解压zip文件完整代码分享,向大家分享了两部分代码示例,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • springboot上传zip包并解压至服务器nginx目录方式

    springboot上传zip包并解压至服务器nginx目录方式

    这篇文章主要介绍了springboot上传zip包并解压至服务器nginx目录方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2025-04-04
  • java实现KFC点餐系统

    java实现KFC点餐系统

    这篇文章主要为大家详细介绍了java实现KFC点餐系统,模拟肯德基快餐店的收银系统,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-01-01
  • Java获取文件的hash值(SHA256)两种方式

    Java获取文件的hash值(SHA256)两种方式

    这篇文章主要给大家介绍了关于Java获取文件hash值(SHA256)的两种方式,SHA256是一种哈希算法,它是不可逆的,也就是说无法解密,需要的朋友可以参考下
    2023-09-09
  • Java中深拷贝和浅拷贝的区别解析

    Java中深拷贝和浅拷贝的区别解析

    这篇文章主要介绍了Java中深拷贝和浅拷贝的区别解析,浅拷贝是源对象和拷贝对象的存放地址不同,但被复制的源对象的引用类型属性存放的地址仍然和源对象的引用类型属性相同,修改引用类型属性的属性会影响相互影响,需要的朋友可以参考下
    2024-01-01
  • 解决创建springboot后启动报错:Failed to bind properties under‘spring.datasource‘

    解决创建springboot后启动报错:Failed to bind properties under‘spri

    在Spring Boot项目中,application.properties和application.yml是用于配置参数的两种文件格式,properties格式简洁但不支持层次结构,而yml格式支持层次性,可读性更好,在yml文件中,要注意细节,比如冒号后面需要空格
    2024-10-10
  • Java使用正则表达式判断独立字符的存在(代码示例)

    Java使用正则表达式判断独立字符的存在(代码示例)

    通过使用正则表达式,我们可以更加灵活地判断字符串中是否包含特定的字符,并且可以控制匹配的条件,如独立的字符,这为我们处理字符串提供了更多的选择和功能,这篇文章主要介绍了Java使用正则表达式判断独立字符的存在,需要的朋友可以参考下
    2023-10-10
  • 轻松掌握Java注解,让编程更智能、更优雅

    轻松掌握Java注解,让编程更智能、更优雅

    轻松掌握Java注解?没问题!想要让你的Java代码更具可读性、维护性,同时提升开发效率?本指南将带你快速入门Java注解的世界,只需短短几分钟,你就能揭秘这个强大的编程工具,让编写有声明性逻辑的代码变得轻而易举,赶快一起来探索吧!
    2024-01-01
  • JDBC连接MySql数据库步骤 以及查询、插入、删除、更新等

    JDBC连接MySql数据库步骤 以及查询、插入、删除、更新等

    这篇文章主要介绍了JDBC连接MySql数据库步骤,以及查询、插入、删除、更新等十一个处理数据库信息的功能,需要的朋友可以参考下
    2018-05-05

最新评论