Java 数据流之Broadcast State

 更新时间:2021年09月14日 10:44:56   作者:Vicky_Tang  
这篇文章主要介绍了Java 数据流之Broadcast State,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

一、BroadcastState 的介绍

广播状态(Broadcast State)是 Operator State 的一种特殊类型。如果我们需要将配置 、规则等低吞吐事件流广播到下游所有 Task 时,就可以使用 BroadcastState。下游的 Task 接收这些配置、规则并保存为 BroadcastState,所有Task 中的状态保持一致,作用于另一个数据流的计算中。
简单理解:一个低吞吐量流包含一组规则,我们想对来自另一个流的所有元素基于此规则进行评估。
场景:动态更新计算规则。

广播状态与其他操作符状态的区别在于:

  • 它有一个 map 格式,用于定义存储结构
  • 它仅对具有广播流和非广播流输入的特定操作符可用
  • 这样的操作符可以具有不同名称的多个广播状态

二、BroadcastState 操作流程

三、案例实现

  • 从端口读取Json数据作为事件流
  • 从Mysql读取数据作为广播流
  • 关联广播流和事件流
  • 匹配对应的用户信息
package cn.kgc.broadcast
 
import java.sql.{Connection, DriverManager, PreparedStatement}
 
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
 
// (001,'tom',18,'北京',15830010002)
// 定义样例类 接受 MySQL的用户数据
case class BaseUserInfo(id:Long,name:String,age:Int,city:String,phone:Long)
 
// user_id、user_name、user_addrss、behaviour、url
// 输出数据类型
case class UserVisitInfo(id:Long,name:String,city:String,behaviour:String,url:String)
 
// 实现广播ProcessFunction
class MyBroadcastFunc extends BroadcastProcessFunction[String,(Long, BaseUserInfo),UserVisitInfo]{
 
  lazy val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
 
  // 处理的是日志流中的每条数据
  override def processElement(value: String, ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#ReadOnlyContext, out: Collector[UserVisitInfo]): Unit = {
    // {"user_id":"001","ts":"2021-07-10 11:10:05","behaviour":"browse","url":"https://www.tb1.com/1.html"}
    val user_id = JSON.parseObject(value).getLong("user_id")
    val behaviour = JSON.parseObject(value).getString("behaviour")
    val url = JSON.parseObject(value).getString("url")
 
    val mapState = ctx.getBroadcastState(mapStateDes)
    val userInfo = mapState.get(user_id)
 
    out.collect(UserVisitInfo(user_id,userInfo.name,userInfo.city,behaviour,url))
 
  }
 
  // 处理的是广播流的每个值
  override def processBroadcastElement(value: (Long, BaseUserInfo), ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#Context, out: Collector[UserVisitInfo]): Unit = {
    val mapState: BroadcastState[Long, BaseUserInfo] = ctx.getBroadcastState(mapStateDes)
    mapState.put(value._1,value._2)
  }
}
 
 
class UserSourceFunc extends RichParallelSourceFunction[BaseUserInfo]{
 
  var conn:Connection = _
  var statement: PreparedStatement = _
  var flag:Boolean = true
 
  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","liu911223")
    statement = conn.prepareStatement("select * from base_user")
  }
 
  override def run(ctx: SourceFunction.SourceContext[BaseUserInfo]): Unit = {
    while (flag){
      Thread.sleep(5000)
      val resultSet = statement.executeQuery()
      while (resultSet.next()){
        val id = resultSet.getLong(1)
        val name = resultSet.getString(2)
        val age = resultSet.getInt(3)
        val city = resultSet.getString(4)
        val phone = resultSet.getLong(5)
        ctx.collect(BaseUserInfo(id,name,age,city,phone))
      }
    }
  }
 
  override def cancel(): Unit = {
    flag = false
  }
 
  override def close(): Unit = {
    if (statement != null) statement.close()
    if (conn != null) conn.close()
  }
}
object BroadcastDemo01 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
 
    // 定义为KV,一方面是为了广播的时候定义为map,另一方面是为了做关联操作
    val userBaseDS: DataStream[(Long, BaseUserInfo)] = env.addSource(new UserSourceFunc)
      .map(user => (user.id, user))
    val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
    val broadCastStream: BroadcastStream[(Long, BaseUserInfo)] = userBaseDS.broadcast(mapStateDes)
 
    // 日志JSON数据
    val dataInfoDS: DataStream[String] = env.socketTextStream("master",1314)
 
    dataInfoDS.connect(broadCastStream)
      .process(new MyBroadcastFunc)
      .print()
 
    env.execute()
  }
}

到此这篇关于Java 数据流之Broadcast State的文章就介绍到这了,更多相关Java Broadcast State内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 解决fcitx输入法在IDEA中输入法候选框无法跟随光标移动的问题

    解决fcitx输入法在IDEA中输入法候选框无法跟随光标移动的问题

    这篇文章主要介绍了解决fcitx输入法在Intellij IDEA开发工具中输入法候选框无法跟随光标移动的问题,代码简单易懂对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-10-10
  • 在本地用idea连接虚拟机上的hbase集群的实现代码

    在本地用idea连接虚拟机上的hbase集群的实现代码

    这篇文章主要介绍了在本地用idea连接虚拟机上的hbase集群的实现代码,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-10-10
  • Spring底层核心源码原理解析

    Spring底层核心源码原理解析

    这篇文章主要介绍了Spring底层核心源码原理解析,当在某个方法上加了@Transactional注解后,就表示该方法在调用时会开启Spring事务,而这个方法所在的类所对应的Bean对象会是该类的代理对象,需要的朋友可以参考下
    2023-09-09
  • 你知道Java的这些骚操作吗?

    你知道Java的这些骚操作吗?

    今天在看python相关的东西,看到各种骚操作,回头想了下Java有没有什么骚操作,整理下面几种,一起看一下吧,需要的朋友可以参考下
    2021-05-05
  • Java线程和操作系统线程的关系解读

    Java线程和操作系统线程的关系解读

    这篇文章主要介绍了Java线程和操作系统线程的关系解读,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • Springboot通过Scheduled实现定时任务代码

    Springboot通过Scheduled实现定时任务代码

    这篇文章主要介绍了Springboot通过Scheduled实现定时任务代码,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • 深入理解springMVC中的Model和Session属性

    深入理解springMVC中的Model和Session属性

    这篇文章主要介绍了深入理解springMVC中的Model和Session属性,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • Springboot项目出现java.lang.ArrayStoreException的异常分析

    Springboot项目出现java.lang.ArrayStoreException的异常分析

    这篇文章介绍了Springboot项目出现java.lang.ArrayStoreException的异常分析,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-12-12
  • Java Eureka探究细枝末节

    Java Eureka探究细枝末节

    Eureka是Netflix开发的服务发现框架,本身是一个基于REST的服务,主要用于定位运行在AWS域中的中间层服务,以达到负载均衡和中间层服务故障转移的目的
    2022-09-09
  • intellij idea旗舰版解决学生无法注册问题详解

    intellij idea旗舰版解决学生无法注册问题详解

    这篇文章主要介绍了intellij idea旗舰版解决学生无法注册问题详解,文中通过图文示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07

最新评论