MLSQL Stack如何让流调试更加简单详解

 更新时间:2019年06月04日 10:05:33   作者:祝威廉  
这篇文章主要给大家介绍了关于MLSQL Stack如何让流调试更加简单的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用MLSQL具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧

前言

有一位同学正在调研MLSQL Stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点:

  • 能随时查看最新固定条数的Kafka数据
  • 调试结果(sink)能打印在web控制台
  • 流程序能自动推测json schema(现在spark是不行的)

实现这三个点之后,我发现调试确实就变得简单很多了。

流程

首先我新建了一个kaf_write.mlsql,里面方便我往Kafka里写数据:

set abc='''
{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}
''';
load jsonStr.`abc` as table1;

select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where 
kafka.bootstrap.servers="127.0.0.1:9092";

这样我每次运行,数据就能写入到Kafka.

接着,我写完后,需要看看数据是不是真的都写进去了,写成了什么样子:

!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;

这句话表示,我要采样Kafka 10条Kafka数据,该Kafka的地址为127.0.0.1:9092,主题为wow.运行结果如下:

没有什么问题。接着我写一个非常简单的流式程序:

-- the stream name, should be uniq.
set streamName="streamExample";

-- use kafkaTool to infer schema from kafka
!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;


load kafka.`wow` options 
kafka.bootstrap.servers="127.0.0.1:9092"
as newkafkatable1;


select * from newkafkatable1
as table21;


-- print in webConsole instead of terminal console.
save append table21 
as webConsole.`` 
options mode="Append"
and duration="15"
and checkpointLocation="/tmp/s-cpl4";

运行结果如下:

在终端我们也可以看到实时效果了。

补充

当然,MLSQL Stack 还有对流还有两个特别好地方,第一个是你可以对流的事件设置http协议的callback,以及对流的处理结果再使用批SQL进行处理,最后入库。参看如下脚本:

-- the stream name, should be uniq.
set streamName="streamExample";


-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';

-- load data as table
load jsonStr.`data` as datasource;

-- convert table as stream source
load mockStream.`datasource` options 
stepSizeRange="0-3"
as newkafkatable1;

-- aggregation 
select cast(value as string) as k from newkafkatable1
as table21;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- output the the result to console.


save append table21 
as custom.`` 
options mode="append"
and duration="15"
and sourceTable="jack"
and code='''
select count(*) as c from jack as newjack;
save append newjack as parquet.`/tmp/jack`; 
'''
and checkpointLocation="/tmp/cpl15";

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对脚本之家的支持。

相关文章

  • MySQL 触发器定义与用法简单实例

    MySQL 触发器定义与用法简单实例

    这篇文章主要介绍了MySQL 触发器定义与用法,结合简单实例形式总结分析了mysql触发器的语法、原理、定义及使用方法,需要的朋友可以参考下
    2019-09-09
  • Mysql连接无效(invalid connection)问题及解决

    Mysql连接无效(invalid connection)问题及解决

    这篇文章主要介绍了Mysql连接无效(invalid connection)问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • 详解如何校验MySQL及Oracle时间字段合规性

    详解如何校验MySQL及Oracle时间字段合规性

    这篇文章主要为大家介绍了如何校验MySQL及Oracle时间字段合规性详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-06-06
  • MySQL复制机制原理讲解

    MySQL复制机制原理讲解

    在本篇文章中小编通过诙谐幽默的语言图文给大家讲述了MySQL复制机制的原理及相关知识点,需要的朋友们参考下。
    2019-05-05
  • mysql如何获取数据列值(int和string)最大值

    mysql如何获取数据列值(int和string)最大值

    最近在开发项目的时候有个需求,我数据库里面存了很多升级包,升级包有列数据表示的是升级包的版本号,类型属于字符串,结构类似于V1.0.2.22这种,然后后台有个任务需要获取最新版本号的那条数据,本文给大家介绍mysql获取数据列值(int和string)最大值,感兴趣的朋友一起看看吧
    2024-01-01
  • mysql 8.0.11压缩包版本安装教程

    mysql 8.0.11压缩包版本安装教程

    这篇文章主要为大家详细介绍了mysql 8.0.11压缩包版本安装教程,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-05-05
  • 五分钟带你搞懂MySQL索引下推

    五分钟带你搞懂MySQL索引下推

    这篇文章主要介绍了Mysql的索引下推,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-09-09
  • mysql 5.5.x zip直接解压版安装方法

    mysql 5.5.x zip直接解压版安装方法

    这篇文章主要介绍了mysql 5.5.x zip直接解压版安装方法 ,需要的朋友可以参考下
    2016-04-04
  • mysql的计划任务与事件调度实例分析

    mysql的计划任务与事件调度实例分析

    这篇文章主要介绍了mysql的计划任务与事件调度,结合实例形式分析了mysql计划任务与事件调度相关概念、原理、用法与操作注意事项,需要的朋友可以参考下
    2019-12-12
  • 关于MySQL实现指定编码遇到的坑

    关于MySQL实现指定编码遇到的坑

    这篇文章主要介绍了一个关于MySQL指定编码实现的小坑,文中大家需要注意如果有需要保存emoji符号的字段,记得一定要指定编码为 utf8mb4,感兴趣的朋友一起看看吧
    2021-10-10

最新评论