PowerJob的AliOssService工作流程源码解读
序
本文主要研究一下PowerJob的AliOssService
DFsService
tech/powerjob/server/extension/dfs/DFsService.java
public interface DFsService { /** * 存储文件 * @param storeRequest 存储请求 * @throws IOException 异常 */ void store(StoreRequest storeRequest) throws IOException; /** * 下载文件 * @param downloadRequest 文件下载请求 * @throws IOException 异常 */ void download(DownloadRequest downloadRequest) throws IOException; /** * 获取文件元信息 * @param fileLocation 文件位置 * @return 存在则返回文件元信息 * @throws IOException 异常 */ Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException; /** * 清理 powerjob 认为“过期”的文件 * 部分存储系统自带生命周期管理(如阿里云OSS,则不需要单独实现该方法) * @param bucket bucket * @param days 天数,需要清理超过 X 天的文件 */ default void cleanExpiredFiles(String bucket, int days) { } }
DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法
AbstractDFsService
tech/powerjob/server/persistence/storage/AbstractDFsService.java
@Slf4j public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean { protected ApplicationContext applicationContext; public AbstractDFsService() { log.info("[DFsService] invoke [{}]'s constructor", this.getClass().getName()); } abstract protected void init(ApplicationContext applicationContext); protected static final String PROPERTY_KEY = "oms.storage.dfs"; protected static String fetchProperty(Environment environment, String dfsType, String key) { String pKey = String.format("%s.%s.%s", PROPERTY_KEY, dfsType, key); return environment.getProperty(pKey); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName()); init(applicationContext); } }
AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init
AliOssService
tech/powerjob/server/persistence/storage/impl/AliOssService.java
@Slf4j @Priority(value = Integer.MAX_VALUE - 1) @Conditional(AliOssService.AliOssCondition.class) public class AliOssService extends AbstractDFsService { private static final String TYPE_ALI_OSS = "alioss"; private static final String KEY_ENDPOINT = "endpoint"; private static final String KEY_BUCKET = "bucket"; private static final String KEY_CREDENTIAL_TYPE = "credential_type"; private static final String KEY_AK = "ak"; private static final String KEY_SK = "sk"; private static final String KEY_TOKEN = "token"; private OSS oss; private String bucket; private static final int DOWNLOAD_PART_SIZE = 10240; private static final String NO_SUCH_KEY = "NoSuchKey"; //...... void initOssClient(String endpoint, String bucket, String mode, String ak, String sk, String token) throws Exception { log.info("[AliOssService] init OSS by config: endpoint={},bucket={},credentialType={},ak={},sk={},token={}", endpoint, bucket, mode, ak, sk, token); if (StringUtils.isEmpty(bucket)) { throw new IllegalArgumentException("'oms.storage.dfs.alioss.bucket' can't be empty, please creat a bucket in aliyun oss console then config it to powerjob"); } this.bucket = bucket; CredentialsProvider credentialsProvider; CredentialType credentialType = CredentialType.parse(mode); switch (credentialType) { case PWD: credentialsProvider = new DefaultCredentialProvider(ak, sk, token); break; case SYSTEM_PROPERTY: credentialsProvider = CredentialsProviderFactory.newSystemPropertiesCredentialsProvider(); break; default: credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider(); } this.oss = new OSSClientBuilder().build(endpoint, credentialsProvider); log.info("[AliOssService] initialize successfully, THIS_WILL_BE_THE_STORAGE_LAYER."); } //...... }
AliOssService继承了AbstractDFsService
store
@Override public void store(StoreRequest storeRequest) throws IOException { ObjectMetadata objectMetadata = new ObjectMetadata(); PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, parseFileName(storeRequest.getFileLocation()), storeRequest.getLocalFile(), objectMetadata); oss.putObject(putObjectRequest); }
store方法创建PutObjectRequest,使用oss.putObject进行上传
download
@Override public void download(DownloadRequest downloadRequest) throws IOException { FileLocation dfl = downloadRequest.getFileLocation(); DownloadFileRequest downloadFileRequest = new DownloadFileRequest(bucket, parseFileName(dfl), downloadRequest.getTarget().getAbsolutePath(), DOWNLOAD_PART_SIZE); try { FileUtils.forceMkdirParent(downloadRequest.getTarget()); oss.downloadFile(downloadFileRequest); } catch (Throwable t) { ExceptionUtils.rethrow(t); } }
download方法则根据DownloadRequest指定的FileLocation创建DownloadFileRequest,然后通过oss.downloadFile(downloadFileRequest)进行下载
fetchFileMeta
@Override public Optional<FileMeta> fetchFileMeta(FileLocation fileLocation) throws IOException { try { ObjectMetadata objectMetadata = oss.getObjectMetadata(bucket, parseFileName(fileLocation)); return Optional.ofNullable(objectMetadata).map(ossM -> { Map<String, Object> metaInfo = Maps.newHashMap(); metaInfo.putAll(ossM.getRawMetadata()); if (ossM.getUserMetadata() != null) { metaInfo.putAll(ossM.getUserMetadata()); } return new FileMeta() .setLastModifiedTime(ossM.getLastModified()) .setLength(ossM.getContentLength()) .setMetaInfo(metaInfo); }); } catch (OSSException oe) { String errorCode = oe.getErrorCode(); if (NO_SUCH_KEY.equalsIgnoreCase(errorCode)) { return Optional.empty(); } ExceptionUtils.rethrow(oe); } return Optional.empty(); }
fetchFileMeta方法通过oss.getObjectMetadata获取ObjectMetadata
cleanExpiredFiles
@Override public void cleanExpiredFiles(String bucket, int days) { /* 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54 阿里云 OSS 自带生命周期管理,请参考文档进行配置,代码层面不进行实现(浪费服务器资源)https://help.aliyun.com/zh/oss/user-guide/overview-54 */ }
cleanExpiredFiles则是空操作
init
protected void init(ApplicationContext applicationContext) { Environment environment = applicationContext.getEnvironment(); String endpoint = fetchProperty(environment, TYPE_ALI_OSS, KEY_ENDPOINT); String bkt = fetchProperty(environment, TYPE_ALI_OSS, KEY_BUCKET); String ct = fetchProperty(environment, TYPE_ALI_OSS, KEY_CREDENTIAL_TYPE); String ak = fetchProperty(environment, TYPE_ALI_OSS, KEY_AK); String sk = fetchProperty(environment, TYPE_ALI_OSS, KEY_SK); String token = fetchProperty(environment, TYPE_ALI_OSS, KEY_TOKEN); try { initOssClient(endpoint, bkt, ct, ak, sk, token); } catch (Exception e) { ExceptionUtils.rethrow(e); } }
init则是通过environment获取相关属性,然后执行initOssClient
小结
DFsService接口定义了store、download、fetchFileMeta、cleanExpiredFiles方法;AbstractDFsService声明实现DFsService、ApplicationContextAware、DisposableBean接口,它在setApplicationContext方法执行了init;AliOssService继承了AbstractDFsService,通过ossClient实现了store、download、fetchFileMeta方法。
以上就是PowerJob的AliOssServiceg工作流程源码解读的详细内容,更多关于PowerJob AliOssServiceg的资料请关注脚本之家其它相关文章!
相关文章
关于SpringBoot整合redis使用Lettuce客户端超时问题
使用到Lettuce连接redis,一段时间后不操作,再去操作redis,会报连接超时错误,在其重连后又可使用,纠结是什么原因导致的呢,下面小编给大家带来了SpringBoot整合redis使用Lettuce客户端超时问题及解决方案,一起看看吧2021-08-08解决springmvc关于前台日期作为实体类对象参数类型转换错误的问题
下面小编就为大家带来一篇解决springmvc关于前台日期作为实体类对象参数类型转换错误的问题。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧2017-06-06Java生成二维码的两种实现方式(基于Spring Boot)
这篇文章主要给大家介绍了关于Java生成二维码的两种实现方式,文中的代码基于Spring Boot,本文基于JAVA环境,以SpringBoot框架为基础开发,文中通过实例代码介绍的非常详细,需要的朋友可以参考下2023-07-07
最新评论