flink RichFunction之坑及解决

 更新时间:2022年12月17日 08:51:57   投稿:jingxian  
这篇文章主要介绍了flink RichFunction之坑及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

flink RichFunction之坑

flink的RichMapFunction,RichSinkFunction等,并不能百分百做到每次只open一个数据库连接。

在有些情况下他会一直创建然后销毁,创建销毁。

举例: 重点在第三行的注释

  val value = env.socketTextStream("192.168.13.11", 9090)
    val value2 = value.filter(x => {
      try {
        var a = 1 / 0   //此处若没有异常处理,任务不会断,但是会重复打开数据库连接
      } catch {
        case e: Exception =>
      }
      isInter(x)
    }).map(fun = x => {
      x.toLong
    })
    val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) {
      override def extractTimestamp(element: Long): Long = {
        println(element + "***************")
        element
      }
    })

    try {
      var a = 1 / 0
    } catch {
      case e: Exception =>
    }
    value1.map(new mymap)
    env.execute("test")

  }

  def isInter(input: String): Boolean = {
    val matcher = Pattern.compile("^[0-9]+$").matcher(input)
    matcher.find()
  }
}


class myRichMapfun6() extends RichMapFunction[ListBuffer[String], Unit] {
  var conn: Connection = _
  var pst: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://xxxxxxx:3306/zzt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true", "root", "bigdata@mysql")
    println(conn)
    pst = conn.prepareStatement("insert into testa (str) values (?)")
  }

  override def close(): Unit = {
    conn.close()
    pst.close()
  }

  override def map(in: ListBuffer[String]): Unit = {
    pst.setString(1, in.head)
    pst.execute()
  }
}

所以你是不是觉得那就价格异常处理不就得了?

NO

再看:

在这里插入图片描述

这个时候,如果传进来line不是数字或者格式不对,就会触发异常,然而此时就不会像上面那样帮你解决问题,而是一遍遍创建对象销毁对象,一条消息创建一个连接,我就问你慌不慌,

原因

据观察是因为,输入的数据有问题,直接导致

 val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) {
      override def extractTimestamp(element: Long): Long = {
        println(element + "***************")
        element
      }
    })

这个崩溃了,不走这行代码了,没有获得eventime,然后估计。。。 剩下的我也没详细测。。。

解决方案

先fiiter过滤任何可能导致异常的脏数据确保数据都没问题就可以了。 

flink中RichFunction的一点小作用

①传递参数

所有需要用户定义的函数都可以转换成richfunction,例如实现map operator中你需要实现一个内部类,并实现它的map方法:

data.map (new MapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

然后我们可以将其转换为RichMapFunction:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

当然,RichFuction除了提供原来MapFuction的方法之外,还提供open, close, getRuntimeContext 和setRuntimeContext方法,这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息。

下面我们来看看RichFuction中传递参数的例子,以下代码是测试RichFilterFuction的例子,基于DataSet而非DataStream。

由代码可见,可以将Configuration中的limit参数的值传递进RichFuction里面,通过后面withParameters方法传递进去,最后的结果是

由此可见,我从configuration中获取了limit的值,并设定了fliter的阈值是2,从而过滤了1,2。

②传递广播变量

原理和上面差不多,下面我直接把代码贴出来:

这是目前我学习到的RichFunction的用法,和大家分享一下。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • SpringBoot使用@Scheduled实现定时任务的并行执行

    SpringBoot使用@Scheduled实现定时任务的并行执行

    在SpringBoot中,如果使用@Scheduled注解来定义多个定时任务,默认情况下这些任务将会被安排在一个单线程的调度器中执行,这意味着,这些任务将会串行执行,而不是并行执行,本文介绍了SpringBoot使用@Scheduled实现定时任务的并行执行,需要的朋友可以参考下
    2024-06-06
  • JDBC基础教程

    JDBC基础教程

    这篇文章主要介绍了JDBC基础知识与操作技巧,讲述原理与基本技巧的基础上分析了安全问题与操作注意事项,非常具有实用价值,需要的朋友可以参考下
    2014-12-12
  • Java彻底消灭if-else的8种方案

    Java彻底消灭if-else的8种方案

    这篇文章主要给大家介绍了关于Java彻底消灭if-else的8种方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • Java Map 通过 key 或者 value 过滤的实例代码

    Java Map 通过 key 或者 value 过滤的实例代码

    这篇文章主要介绍了Java Map 通过 key 或者 value 过滤的实例代码,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2018-06-06
  • Eclipse转Itellij IDEA导入Git/svn本地项目的详细步骤

    Eclipse转Itellij IDEA导入Git/svn本地项目的详细步骤

    这篇文章主要介绍了Eclipse转Itellij IDEA导入Git/svn本地项目,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-10-10
  • Springboot 项目读取Resources目录下的文件(推荐)

    Springboot 项目读取Resources目录下的文件(推荐)

    这篇文章主要介绍了Springboot 项目读取Resources目录下的文件,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-11-11
  • JavaGUI使用标签与按钮方法详解

    JavaGUI使用标签与按钮方法详解

    这篇文章主要介绍了JavaGUI使用标签与按钮方法,前段时间学了GUI,总体上概念还是有点模糊,于是决定花点时间简单整理下。先简单介绍一下GUI,GUI就是图形用户界面
    2023-03-03
  • 详解java中float与double的区别

    详解java中float与double的区别

    这篇文章主要介绍了JAVA中float与double的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-04-04
  • Java基础之关键字final详解

    Java基础之关键字final详解

    这篇文章主要介绍了Java基础之关键字final详解,文中有非常详细的代码示例,对正在学习java基础的小伙伴们有非常好的帮助,需要的朋友可以参考下
    2021-05-05
  • JavaWeb基础教程之Java基础加强版

    JavaWeb基础教程之Java基础加强版

    这篇文章主要介绍了JavaWeb基础教程之Java基础加强版的相关资料,需要的朋友可以参考下
    2016-07-07

最新评论