flink RichFunction之坑及解决
flink RichFunction之坑
flink的RichMapFunction,RichSinkFunction等,并不能百分百做到每次只open一个数据库连接。
在有些情况下他会一直创建然后销毁,创建销毁。
举例: 重点在第三行的注释
val value = env.socketTextStream("192.168.13.11", 9090) val value2 = value.filter(x => { try { var a = 1 / 0 //此处若没有异常处理,任务不会断,但是会重复打开数据库连接 } catch { case e: Exception => } isInter(x) }).map(fun = x => { x.toLong }) val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) { override def extractTimestamp(element: Long): Long = { println(element + "***************") element } }) try { var a = 1 / 0 } catch { case e: Exception => } value1.map(new mymap) env.execute("test") } def isInter(input: String): Boolean = { val matcher = Pattern.compile("^[0-9]+$").matcher(input) matcher.find() } } class myRichMapfun6() extends RichMapFunction[ListBuffer[String], Unit] { var conn: Connection = _ var pst: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://xxxxxxx:3306/zzt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true", "root", "bigdata@mysql") println(conn) pst = conn.prepareStatement("insert into testa (str) values (?)") } override def close(): Unit = { conn.close() pst.close() } override def map(in: ListBuffer[String]): Unit = { pst.setString(1, in.head) pst.execute() } }
所以你是不是觉得那就价格异常处理不就得了?
NO
再看:
这个时候,如果传进来line不是数字或者格式不对,就会触发异常,然而此时就不会像上面那样帮你解决问题,而是一遍遍创建对象销毁对象,一条消息创建一个连接,我就问你慌不慌,
原因
据观察是因为,输入的数据有问题,直接导致
val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) { override def extractTimestamp(element: Long): Long = { println(element + "***************") element } })
这个崩溃了,不走这行代码了,没有获得eventime,然后估计。。。 剩下的我也没详细测。。。
解决方案
先fiiter过滤任何可能导致异常的脏数据确保数据都没问题就可以了。
flink中RichFunction的一点小作用
①传递参数
所有需要用户定义的函数都可以转换成richfunction,例如实现map operator中你需要实现一个内部类,并实现它的map方法:
data.map (new MapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
然后我们可以将其转换为RichMapFunction:
data.map (new RichMapFunction<String, Integer>() { public Integer map(String value) { return Integer.parseInt(value); } });
当然,RichFuction除了提供原来MapFuction的方法之外,还提供open, close, getRuntimeContext 和setRuntimeContext方法,这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息。
下面我们来看看RichFuction中传递参数的例子,以下代码是测试RichFilterFuction的例子,基于DataSet而非DataStream。
由代码可见,可以将Configuration中的limit参数的值传递进RichFuction里面,通过后面withParameters方法传递进去,最后的结果是
由此可见,我从configuration中获取了limit的值,并设定了fliter的阈值是2,从而过滤了1,2。
②传递广播变量
原理和上面差不多,下面我直接把代码贴出来:
这是目前我学习到的RichFunction的用法,和大家分享一下。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
Java简单使用EasyExcel操作读写excel的步骤与要点
相信现在很多搞后端的同学大部分做的都是后台管理系统,那么管理系统就肯定免不了Excel的导出导入功能,下面这篇文章主要给大家介绍了关于Java简单使用EasyExcel操作读写excel的步骤与要点,需要的朋友可以参考下2022-09-09Windows环境IDEA下Ranger1.2.0源码编译详细流程
本文给大家讲解Windows环境IDEA下Ranger1.2.0源码编译过程,通过配置Tomcat,发布 security-admin-web项目,编译启动tomcat即可完成,需要的朋友参考下2021-06-06IDEA中 Getter、Setter 注解不起作用的问题如何解决
这篇文章主要介绍了IDEA中 Getter、Setter 注解不起作用的问题如何解决,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2020-08-08Java Web项目中使用Socket通信多线程、长连接的方法
很多时候在javaweb项目中我们需要用到Socket通信来实现功能,在web中使用Socket我们需要建立一个监听程序,在程序启动时,启动socket监听。接下来通过本文给大家介绍Java Web项目中使用Socket通信多线程、长连接的方法,感兴趣的朋友一起学习2016-04-04获取Java的MyBatis框架项目中的SqlSession的方法
SqlSession中包括已经映射好的SQL语句,这样对象实例就可以直接拿过来用了,那么这里就来讲解获取Java的MyBatis框架项目中的SqlSession的方法2016-06-06
最新评论