Flink自定义Sink端实现过程讲解

 更新时间:2023年01月20日 09:13:57   作者:Bonyin  
这篇文章主要介绍了Flink自定义Sink端实现过程,在Fink官网中sink端只是给出了常规的write api.在我们实际开发场景中需要将flink处理的数据写入kafka,hbase kudu等外部系统

Sink介绍

在Fink官网中sink端只是给出了常规的write api.在我们实际开发场景中需要将flink处理的数据写入kafka,hbase kudu等外部系统。

UML关系

自定义Sink需要实现父类的接口和继承抽象类。

上面是Sink的继承关系

Flink addSink

// 方法需要SinkFunction的对象
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();
		// configure the type if needed
		if (sinkFunction instanceof InputTypeConfigurable) {
			((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
		}
		StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
		DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
		getExecutionEnvironment().addOperator(sink.getTransformation());
		return sink;
	}

SinkFunction

// SinkFunction是一个接口
public interface SinkFunction<IN> extends Function, Serializable {
   //公共方法
	default void invoke(IN value, Context context) throws Exception {
		invoke(value);
	}
}

RichSinkFunction

@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
	private static final long serialVersionUID = 1L;
}

其他继承接口SinkFunction的类:

案例

自定义HbaseSink

public class HbaseSink extends RichSinkFunction<Tuple2<Integer, String>> {
    Logger logger = LoggerFactory.getLogger(HbaseSink.class);
    org.apache.hadoop.conf.Configuration configuration;
    Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //获取hbase 的链接信息
        configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");
        //创建conn
        connection = ConnectionFactory.createConnection(configuration);
        logger.info("创建链接成功");
    }
    @Override
    public void invoke(Tuple2<Integer, String> value, Context context) throws Exception {
        //往habse 里面插入数据
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Table table = connection.getTable(TableName.valueOf("torder_count"));
        Put put = new Put(value.f1.getBytes(StandardCharsets.UTF_8));
        put.addColumn("info".getBytes(), // 列族
                "order_total".getBytes(StandardCharsets.UTF_8), //特征字段
                value.f0.toString().getBytes()); //属性值
        put.addColumn("info".getBytes(), "insert_time".getBytes(), format.format(new Date(System.currentTimeMillis())).getBytes());
        table.put(put);
        table.close();
        logger.info("=====一条数据写入成功======,时间:"+value.f1+", 值:"+value.f0);
    }
    @Override
    public void close() throws Exception {
        super.close();
        connection.close();
    }

通过以上案例我们熟悉了addSink函数的操作。

到此这篇关于Flink自定义Sink端实现过程讲解的文章就介绍到这了,更多相关Flink自定义Sink内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java实现HDFS文件上传下载

    Java实现HDFS文件上传下载

    这篇文章主要为大家详细介绍了Java实现HDFS文件上传下载,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-06-06
  • 10张图总结出并发编程最佳学习路线

    10张图总结出并发编程最佳学习路线

    这篇文章主要介绍了并发编程的最佳学习路线,文中通过图片介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-08-08
  • Java 从网上下载文件的几种方式实例代码详解

    Java 从网上下载文件的几种方式实例代码详解

    本文通过实例代码给大家介绍了java从网上下载文件的几种方式,非常不错,具有参考借鉴价值,需要的的朋友参考下吧
    2017-08-08
  • 在springboot中使用AOP进行全局日志记录

    在springboot中使用AOP进行全局日志记录

    这篇文章主要介绍就在springboot中使用AOP进行全局日志记录,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • jmeter压力测试工具简介_动力节点Java学院整理

    jmeter压力测试工具简介_动力节点Java学院整理

    这篇文章主要为大家详细介绍了jmeter压力测试工具相关介绍资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-08-08
  • Java集合之Map接口与实现类详解

    Java集合之Map接口与实现类详解

    这篇文章主要为大家详细介绍了Java集合中的Map接口与实现类,文中的示例代码讲解详细,对我们学习Java有一定的帮助,感兴趣的可以了解一下
    2022-12-12
  • 基于Java实现的一层简单人工神经网络算法示例

    基于Java实现的一层简单人工神经网络算法示例

    这篇文章主要介绍了基于Java实现的一层简单人工神经网络算法,结合实例形式分析了java实现人工神经网络的具体实现技巧,需要的朋友可以参考下
    2017-12-12
  • idea使用jclasslib插件查看字节码

    idea使用jclasslib插件查看字节码

    这篇文章主要为大家介绍了idea使用jclasslib插件查看字节码,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • Java定义队列结构,并实现入队、出队操作完整示例

    Java定义队列结构,并实现入队、出队操作完整示例

    这篇文章主要介绍了Java定义队列结构,并实现入队、出队操作,结合完整实例形式分析了java数据结构中队列的定义、入队、出队、判断队列是否为空、打印队列元素等相关操作技巧,需要的朋友可以参考下
    2020-02-02
  • 判断List和Map是否相等并合并List中相同的Map

    判断List和Map是否相等并合并List中相同的Map

    今天小编就为大家分享一篇关于判断List和Map是否相等并合并List中相同的Map,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-12-12

最新评论