PowerJob的AbstractScriptProcessor实现类工作流程源码解读

 更新时间:2024年01月04日 09:39:51   作者:codecraft  
这篇文章主要为大家介绍了PowerJob的AbstractScriptProcessor源码流程解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

本文主要研究一下PowerJob的AbstractScriptProcessor

AbstractScriptProcessor

tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java

@Slf4j
public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
    private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors());
    private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
    protected static final String SH_SHELL = "/bin/sh";
    protected static final String CMD_SHELL = "cmd.exe";
    private static final String WORKER_DIR = PowerFileUtils.workspace() + "/official_script_processor/";
    @Override
    protected ProcessResult process0(TaskContext context) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();
        String scriptParams = CommonUtils.parseParams(context);
        omsLogger.info("[SYSTEM] ScriptProcessor start to process, params: {}", scriptParams);
        if (scriptParams == null) {
            String message = "[SYSTEM] ScriptParams is null, please check jobParam configuration.";
            omsLogger.warn(message);
            return new ProcessResult(false, message);
        }
        String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams);
        omsLogger.info("[SYSTEM] Generate executable file successfully, path: {}", scriptPath);
        if (SystemUtils.IS_OS_WINDOWS) {
            if (StringUtils.equals(getRunCommand(), SH_SHELL)) {
                String message = String.format("[SYSTEM] Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME);
                omsLogger.warn(message);
                return new ProcessResult(false, message);
            }
        }
        // 授权
        if  ( !SystemUtils.IS_OS_WINDOWS) {
            ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
            // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁)
            chmodPb.start().waitFor();
            omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~");
        }
        // 2. 执行目标脚本
        ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ?
                new ProcessBuilder(getRunCommand(), "/c", scriptPath)
                : new ProcessBuilder(getRunCommand(), scriptPath);
        Process process = pb.start();
        StringBuilder inputBuilder = new StringBuilder();
        StringBuilder errorBuilder = new StringBuilder();
        boolean success = true;
        String result;
        final Charset charset = getCharset();
        try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
            POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset));
            POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset));
            success = process.waitFor() == 0;
        } catch (InterruptedException ie) {
            omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted");
        } finally {
            result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString());
        }
        return new ProcessResult(success, result);
    }
    /**
     * 生成脚本名称
     * @param instanceId id of instance
     * @return 文件名称
     */
    protected abstract String getScriptName(Long instanceId);
    /**
     * 获取运行命令(eg,shell返回 /bin/sh)
     * @return 执行脚本的命令
     */
    protected abstract String getRunCommand();
    //......
}
AbstractScriptProcessor继承了CommonBasicProcessor,它定义了一个parallelism为4*Runtime.getRuntime().availableProcessors()的ForkJoinPool;其process0方法先读取scriptParams,然后执行prepareScriptFile获取scriptPath,接着使用chmod变更script权限为755,然后通过getRunCommand获取命令,接着往pool提交copyStream,等待process返回

prepareScriptFile

private String prepareScriptFile(Long instanceId, String processorInfo) throws IOException {
        String scriptPath = WORKER_DIR + getScriptName(instanceId);
        File script = new File(scriptPath);
        if (script.exists()) {
            return scriptPath;
        }
        File dir = new File(script.getParent());
        boolean success = dir.mkdirs();
        success = script.createNewFile();
        if (!success) {
            throw new RuntimeException("create script file failed");
        }

        // 如果是下载链接,则从网络获取
        for (String protocol : DOWNLOAD_PROTOCOL) {
            if (processorInfo.startsWith(protocol)) {
                FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000);
                return scriptPath;
            }
        }

        final Charset charset = getCharset();

        if(charset != null)
        {
            try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) {
                out.write(processorInfo);
                out.flush();
            }
        }
        else {
            try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
                bw.write(processorInfo);
                bw.flush();
            }
        }
        return scriptPath;
    }
prepareScriptFile先通过getScriptName获取scriptPath,如果是http、https、ftp链接则通过FileUtils.copyURLToFile下载,否则把scriptParams写入到scriptPath

copyStream

private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) {
        String line;
        try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
            while ((line = br.readLine()) != null) {
                sb.append(line);
                // 同步到在线日志
                omsLogger.info(line);
            }
        } catch (Exception e) {
            log.warn("[ScriptProcessor] copyStream failed.", e);
            omsLogger.warn("[SYSTEM] copyStream failed.", e);

            sb.append("Exception: ").append(e);
        }
    }
copyStream会读取InputStream到StringBuilder,同时打印到omsLogger

ShellProcessor

tech/powerjob/official/processors/impl/script/ShellProcessor.java

public class ShellProcessor extends AbstractScriptProcessor {
    @Override
    protected String getScriptName(Long instanceId) {
        return String.format("shell_%d.sh", instanceId);
    }
    @Override
    protected String getRunCommand() {
        return SH_SHELL;
    }
}
ShellProcessor的getScriptName是基于shell_%d.sh和instanceId生成的;其getRunCommand为/bin/sh

CMDProcessor

tech/powerjob/official/processors/impl/script/CMDProcessor.java

public class CMDProcessor extends AbstractScriptProcessor {
    @Override
    protected String getScriptName(Long instanceId) {
        return String.format("cmd_%d.bat", instanceId);
    }
    @Override
    protected String getRunCommand() {
        return "cmd.exe";
    }
    @Override
    protected Charset getCharset() {
        return Charset.defaultCharset();
    }
}
CMDProcessor的getScriptName是基于cmd_%d.bat和instanceId生成,其getRunCommand为`cmd.exe

PowerShellProcessor

tech/powerjob/official/processors/impl/script/PowerShellProcessor.java

public class PowerShellProcessor extends AbstractScriptProcessor {

    @Override
    protected String getScriptName(Long instanceId) {
        return String.format("powershell_%d.ps1", instanceId);
    }

    @Override
    protected String getRunCommand() {
        return "powershell.exe";
    }

    @Override
    protected Charset getCharset() {
        return Charset.defaultCharset();
    }
}
PowerShellProcessor的getScriptName是基于powershell_%d.ps1"和instanceId生成,其getRunCommand为powershell.exe

PythonProcessor

tech/powerjob/official/processors/impl/script/PythonProcessor.java

public class PythonProcessor extends AbstractScriptProcessor {

    @Override
    protected String getScriptName(Long instanceId) {
        return String.format("python_%d.py", instanceId);
    }

    @Override
    protected String getRunCommand() {
        return "python";
    }
}
PythonProcessor的getScriptName是基于python_%d.py和instanceId生成,其getRunCommand为python

小结

AbstractScriptProcessor继承了CommonBasicProcessor,它有四个实现类分别是ShellProcessor、CMDProcessor、PowerShellProcessor、PythonProcessor;它定义了getScriptName、getRunCommand抽象方法;其process0方法主要是把scriptParams写入到本地文件(scriptParams是http、https、ftp的则根据url进行下载),然后修改权限为755,然后执行pb.start(),再将input及errorStream收集到StringBuilder并打印到omsLogger,最后process.waitFor()等待处理完成。

以上就是PowerJob的AbstractScriptProcessor方法工作流程源码解读的详细内容,更多关于PowerJob AbstractScriptProcessor的资料请关注脚本之家其它相关文章!

相关文章

  • SpringBoot集成Caffeine缓存的实现步骤

    SpringBoot集成Caffeine缓存的实现步骤

    Caffeine cache是一个针对Java的高性能缓存库。在本文中,我们将介绍它与Spring Boot如何一起使用。
    2021-05-05
  • java开发中常遇到的各种难点以及解决思路方案

    java开发中常遇到的各种难点以及解决思路方案

    Java项目是一个复杂的软件开发过程,其中会涉及到很多技术难点,这篇文章主要给大家介绍了关于java开发中常遇到的各种难点以及解决思路方案的相关资料,需要的朋友可以参考下
    2023-07-07
  • 基于Spring实现自定义错误信息返回详解

    基于Spring实现自定义错误信息返回详解

    这篇文章主要为大家详细介绍了如何基于Spring实现自定义错误信息返回效果,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2025-03-03
  • Spring中@Autowired注解在不同方法的写法示例

    Spring中@Autowired注解在不同方法的写法示例

    这篇文章主要为大家介绍了Spring中@Autowired注解在不同方法的写法示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • Mac安装Maven的几种方法小结

    Mac安装Maven的几种方法小结

    本文主要介绍了Mac安装Maven的几种方法小结,主要包括通过Homebrew安装Maven,通过SDKMAN安装Maven和通过官方网站下载安装包安装Maven,感兴趣的可以了解一下
    2024-01-01
  • Springboot2.3.x整合Canal的示例代码

    Springboot2.3.x整合Canal的示例代码

    canal是阿里开源mysql binlog 数据组件,canal-server 才是canal的核心我们前边所讲的canal的功能,实际上讲述的就是canal-server的功能,本文给大家介绍Springboot2.3.x整合Canal的示例代码,需要的朋友可以参考下
    2022-02-02
  • Java--SSH,SSM和Spring Boot框架区别及优缺点说明

    Java--SSH,SSM和Spring Boot框架区别及优缺点说明

    这篇文章主要介绍了Java--SSH,SSM和Spring Boot框架区别及优缺点说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12
  • Spring Boot 中的 @PutMapping 注解原理及使用小结

    Spring Boot 中的 @PutMapping 注解原理及使用小结

    在本文中,我们介绍了 Spring Boot 中的 @PutMapping 注解,它可以将 HTTP PUT 请求映射到指定的处理方法上,我们还介绍了 @PutMapping 注解的原理以及如何在 Spring Boot 中使用它,感兴趣的朋友跟随小编一起看看吧
    2023-12-12
  • springboot中PostMapping正常接收json参数后返回404问题

    springboot中PostMapping正常接收json参数后返回404问题

    这篇文章主要介绍了springboot中PostMapping正常接收json参数后返回404问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-05-05
  • Java异常处理中同时有finally和return语句的执行问题

    Java异常处理中同时有finally和return语句的执行问题

    这篇文章主要介绍了Java异常处理中同时有finally和return语句的执行问题,首先确定的是一般finally语句都会被执行...然后,需要的朋友可以参考下
    2015-11-11

最新评论