(starters)springboot-starter整合阿里云datahub方式

 更新时间:2022年11月18日 10:48:22   作者:Cry丶  
这篇文章主要介绍了(starters)springboot-starter整合阿里云datahub方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

DataHub 类似于传统大数据解决方案中 Kafka 的角色,提供了一个数据队列功能。

DataHub 除了供了一个缓冲的队列作用。同时由于 DataHub 提供了各种与其他阿里云

上下游产品的对接功能,所以 DataHub 又扮演了一个数据的分发枢纽工作。

datahub提供了开发者生产和消费的sdk,在平时的开发中往往会写很多重复的代码,我们可以利用springboot为我们提供的自定义starter的方式,模仿springboot官方的starter组件实现方式,来封装一个更高效简单易用的starter组件,实现开箱即用。

本文仅提供核心思路实现供学习使用,应根据自己所在公司开发习惯做定制开发

1. 功能介绍

1.无需关心DataHub底层如何操作,安心编写业务代码即可进行数据的获取和上传,

2.类似RabbitMQ的starter,通过注解方式,Listener和Handler方式进行队列消费

3.支持游标的上次记忆功能

<dependency>
      <artifactId>cry-starters-projects</artifactId>
      <groupId>cn.com.cry.starters</groupId>
      <version>2022-1.0.0</version>
</dependency>

2.快速开始

2.1 启动客户端

配置阿里云DataHub的endpoint以及AK信息

aliyun:
  datahub:
  	# 开启功能
  	havingValue: true
    #是否为私有云
    isPrivate: false
    accessId: xxx
    accessKey: xxx
    endpoint: xxx
    #连接DataHub客户端超时时间
    conn-timeout: 10000

启动SpringBoot,你会发现datahub客户端已经启动完毕

2.2 获取DataHub客户端

DatahubClient datahubClient=DataHubTemplate.getDataHubClient();

2.3 写数据

public int write(@RequestParam("id") Integer shardId) {
    List<Student> datas = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        Student s = new Student();
        s.setAge(i);
        s.setName("name-" + i);
        s.setAddress("address-" + i);
        datas.add(s);
    }
    int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);
    return successNumbers;
}

以上示例代码表示往 projectName为my_test, topicName为student, shardId 为N的hub里写数据,并且返回插入成功的条数

2.4 读数据

读数据开发的逻辑类似RabbitMq的starter,使用@DataHubListener和@DataHubHandler处理器注解进行使用

@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl {    @DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)
    public void handler(Message message) {
        System.out.println("读取到shardId=0的消息");
        System.out.println(message.getData());
        System.out.println(message.getCreateTsime());
        System.out.println(message.getSize());
        System.out.println(message.getConfig());
        System.out.println(message.getMessageId());
    }
}

以上代码说明: 通过LATEST游标的方式,监听 project=my_test ,topicName=student,shardId=0 ,最终通过Message的包装类拿到dataHub实时写入的数据。

这边可以设置多种游标类型,例如根据最新的系统时间、最早录入的序号等

3. 核心代码

首先需要一个DataHubClient增强类,在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId,根据游标规则来读取当前的cursor进行数据的读取。

public class DataHubClientWrapper implements InitializingBean, DisposableBean {    @Autowired
    private AliyunAccountProperties properties;    @Autowired
    private ApplicationContext context;    private DatahubClient datahubClient;
    public DataHubClientWrapper() {    }    /**
     * 执行销毁方法
     *
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        WorkerResourceExecutor.shutdown();
    }    @Override
    public void afterPropertiesSet() throws Exception {        /**
         * 创建DataHubClient
         */
        this.datahubClient = DataHubClientFactory.create(properties);        /**
         * 打印Banner
         */
        BannerUtil.printBanner();        /**
         * 赋值Template的静态对象dataHubClient
         */
        DataHubTemplate.setDataHubClient(datahubClient);        /**
         * 初始化Worker线程
         */
        WorkerResourceExecutor.initWorkerResource(context);
        /**
         * 启动Worker线程
         */
        WorkerResourceExecutor.start();
    }
}

写数据,构建了一个类似RedisDataTemplate的模板类,封装了write的逻辑,调用时只需要用DataHubTemplate.write调用

public class DataHubTemplate {    private static DatahubClient dataHubClient;    private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);    /**
     * 默认不开启重试机制
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @return
     */
    public static int write(String projectName, String topicName, List<?> datas, Integer shardId) {
        return write(projectName, topicName, datas, shardId, false);
    }    /**
     * 往指定的projectName以及topic和shard下面写数据
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @param retry
     * @return
     */
    private static int write(String projectName, String topicName, List<?> datas, Integer shardId, boolean retry) {
        RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (Object o : datas) {
            RecordEntry entry = new RecordEntry();
            Map<String, Object> data = BeanUtil.beanToMap(o);
            TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);
            for (String key : data.keySet()) {
                tupleRecordData.setField(key, data.get(key));
            }
            entry.setRecordData(tupleRecordData);
            entry.setShardId(String.valueOf(shardId));
            recordEntries.add(entry);
        }
        PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);
        int failedRecordCount = result.getFailedRecordCount();
        if (failedRecordCount > 0 && retry) {
            retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);
        }
        return datas.size() - failedRecordCount;
    }    /**
     * @param client
     * @param records
     * @param retryTimes
     * @param project
     * @param topic
     */
    private static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
        boolean suc = false;
        List<RecordEntry> failedRecords = records;
        while (retryTimes != 0) {
            logger.info("the time to send message has [{}] records failed, is starting retry", records.size());
            retryTimes = retryTimes - 1;
            PutRecordsResult result = client.putRecords(project, topic, failedRecords);
            int failedNum = result.getFailedRecordCount();
            if (failedNum > 0) {
                failedRecords = result.getFailedRecords();
                continue;
            }
            suc = true;
            break;
        }
        if (!suc) {
            logger.error("DataHub send message retry failure");
        }
    }    public static DatahubClient getDataHubClient() {
        return dataHubClient;
    }    public static void setDataHubClient(DatahubClient dataHubClient) {
        DataHubTemplate.dataHubClient = dataHubClient;
    }
}

读数据,需要在Spring启动时开启一个监听线程DataListenerWorkerThread,执行一个死循环不停轮询DataHub下的对应通道。

public class DataListenerWorkerThread extends Thread {
    private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);
    private volatile boolean init = false;
    private DatahubConfig config;
    private String workerKey;
    private int recordLimits;
    private int sleep;
    private RecordSchema recordSchema;
    private RecordHandler recordHandler;
    private CursorHandler cursorHandler;    public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {
        this.config = new DatahubConfig(projectName, topicName, shardId);
        this.workerKey = projectName + "-" + topicName + "-" + shardId;
        this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);
        this.recordLimits = recordLimits;
        this.sleep = sleep;
        this.setName("DataHub-Worker");
        this.setDaemon(true);
    }    @Override
    public void run() {
        initRecordSchema();
        String cursor = cursorHandler.positioningCursor(config);
        for (; ; ) {
            try {
                GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);
                if (result.getRecordCount() <= 0) {
                    // 无数据,sleep后读取
                    Thread.sleep(sleep);
                    continue;
                }
                List<Map<String, Object>> dataMap = recordHandler.convert2List(result.getRecords());
                logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());
                // 拿到下一个游标
                cursor = cursorHandler.nextCursor(result);
                //执行方法
                WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);
            } catch (InvalidParameterException ex) {
                //非法游标或游标已过期,建议重新定位后开始消费
                cursor = cursorHandler.resetCursor(config);
                logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());
            } catch (DatahubClientException e) {
                logger.error("DataHubException:{}", e.getErrorMessage());
                this.interrupt();
            } catch (InterruptedException e) {
                logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());
            } catch (Exception e) {
                this.interrupt();
                logger.error("receive DataHub records cry.exception:{}", e, e);
            }
        }
    }    /**
     * 终止
     */
    public void shutdown() {
        if (!interrupted()) {
            interrupt();
        }
    }    /**
     * 初始化topic字段以及recordSchema
     */
    private void initRecordSchema() {
        try {
            if (!init) {
                recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();
                List<Field> fields = recordSchema.getFields();
                this.recordHandler = new RecordHandler(fields);
                init = true;
            }
        } catch (Exception e) {
            logger.error("initRecordSchema error:{}", e, e);
        }
    }
}

read的时候结合了注解开发,通过定义类注解DataHubListener和方法注解DataHubHandler内置属性,来动态的控制需要在哪些方法中处理监听到的数据的逻辑:

DataHubHandler

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubHandler {
    /**
     * 话题名称
     *
     * @return
     */
    String topicName();    /**
     * shardId
     *
     * @return
     */
    int shardId();    /**
     * 最大数据量限制
     *
     * @return
     */
    int recordLimit() default 1000;    /**
     * 游标类型
     *
     * @return
     */
    CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST;    /**
     * 若未监听到数据添加,休眠时间 ms
     *
     * @return
     */
    int sleep() default 10000;    /**
     * 使用CursorType.SYSTEM_TIME的时候配置 时间偏移量
     *
     * @return
     */
    String startTime() default "";    /**
     * 使用使用CursorType.SEQUENCE的时候配置,偏移量,必须是正整数
     *
     * @return
     */
    int sequenceOffset() default 0;
}

DataHubListener

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubListener {
    String projectName();
}

最后我们需要启动SpringBootStarter的EnableConfigurationProperties 功能,通过配置文件来控制default-bean的开启或者关闭。

启动类:

@Configuration
@EnableConfigurationProperties(value = {AliyunAccountProperties.class})
public class DataHubClientAutoConfiguration {
    /**
     * 初始化dataHub装饰bean
     *
     * @return
     */
    @Bean
    public DataHubClientWrapper dataHubWrapper() {
        return new DataHubClientWrapper();
    }
}

属性配置类

@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
@Data
public class AliyunAccountProperties implements Properties{    /**
     * http://xxx.aliyuncs.com
     */
    private String endpoint;    /**
     * account
     */
    private String accessId;    /**
     * password
     */
    private String accessKey;    /**
     * private cloud || public cloud
     */
    private boolean isPrivate;    /**
     * unit: ms
     */
    private Integer connTimeout = 10000;
}

最后记得要做成一个starter,在resources下新建一个META-INF文件夹,新建一个spring.factories文件,

org.springframework.boot.autoconfigure.EnableAutoConfiguration= \
  cry.starter.datahub.DataHubClientAutoConfiguration

大体逻辑就是这样了,你学会了吗? hhhhhhhhh~

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • springboot整合redis修改分区的操作流程

    springboot整合redis修改分区的操作流程

    这篇文章主要介绍了springboot整合redis修改分区的操作流程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07
  • 详细分析Java中String、StringBuffer、StringBuilder类的性能

    详细分析Java中String、StringBuffer、StringBuilder类的性能

    在Java中,String类和StringBuffer类以及StringBuilder类都能用于创建字符串对象,而在分别操作这些对象时我们会发现JVM执行它们的性能并不相同,下面我们就来详细分析Java中String、StringBuffer、StringBuilder类的性能
    2016-05-05
  • 关于PreparedStatement的setObject作用及说明

    关于PreparedStatement的setObject作用及说明

    这篇文章主要介绍了关于PreparedStatement的setObject作用及说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • Spring Security 实现短信验证码登录功能

    Spring Security 实现短信验证码登录功能

    这篇文章主要介绍了Spring Security 实现短信验证码登录功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-05-05
  • Java基础MAC系统下IDEA连接MYSQL数据库JDBC过程

    Java基础MAC系统下IDEA连接MYSQL数据库JDBC过程

    最近一直在学习web项目,当然也会涉及与数据库的连接这块,这里就总结一下在IDEA中如何进行MySQL数据库的连接,这里提一下我的电脑是MAC系统,使用的编码软件是IDEA,数据库是MySQL
    2021-09-09
  • springboot websocket简单入门示例

    springboot websocket简单入门示例

    这篇文章主要介绍了springboot websocket简单入门示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-08-08
  • springboot vue测试平台接口定义及发送请求功能实现

    springboot vue测试平台接口定义及发送请求功能实现

    这篇文章主要为大家介绍了springboot+vue测试平台接口定义及发送请求功能实现,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • JAVA实现利用第三方平台发送短信验证码

    JAVA实现利用第三方平台发送短信验证码

    本文以注册为例,在SpringMVC+Spring+Mybatis框架的基础上完成该短信验证码功能。需要的朋友一起来看下吧
    2016-12-12
  • IDEA自定义pom依赖的步骤详解

    IDEA自定义pom依赖的步骤详解

    这篇文章主要介绍了IDEA自定义pom依赖的步骤详解,本文分步骤通过图文并茂的形式给大家介绍的非常详细对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-04-04
  • Java常用集合与原理解析

    Java常用集合与原理解析

    这篇文章主要介绍了Java常用集合与原理解析,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-03-03

最新评论