详解Flink同步Kafka数据到ClickHouse分布式表

 更新时间:2022年12月01日 10:03:42   作者:大数据技术派  
这篇文章主要为大家介绍了Flink同步Kafka数据到ClickHouse分布式表实现详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

引言

业务需要一种OLAP引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定ClickHouse

什么是ClickHouse?

ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。

列式数据库更适合于OLAP场景(对于大多数查询而言,处理速度至少提高了100倍),下面详细解释了原因(通过图片更有利于直观理解),图片来源于ClickHouse中文官方文档。

行式

列式

我们使用Flink编写程序,消费kafka里面的主题数据,清洗、归一,写入到clickhouse里面去。

这里的关键点,由于第一次使用,无法分清应该建立什么格式的clickhouse表,出现了一些问题,最大的问题就是程序将数据写入了,查询发现数据不完整,只有一部分。我也在网上查了一些原因,总结下来。

为什么有时看不到已经创建好的表并且查询结果一直抖动时多时少?

常见原因1:

建表流程存在问题。ClickHouse的分布式集群搭建并没有原生的分布式DDL语义。如果您在自建ClickHouse集群时使用create table创建表,查询虽然返回了成功,但实际这个表只在当前连接的Server上创建了。下次连接重置换一个Server,您就看不到这个表了。

解决方案:

建表时,请使用create table <table_name> on cluster default语句,on cluster default声明会把这条语句广播给default集群的所有节点进行执行。示例代码如下。 Create table test on cluster default (a UInt64) Engine = MergeTree() order by tuple(); 在test表上再创建一个分布式表引擎,建表语句如下。 Create table test_dis on cluster default as test Engine = Distributed(default, default, test, cityHash64(a));

常见原因2:

ReplicatedMergeTree存储表配置有问题。ReplicatedMergeTree表引擎是对应MergeTree表引擎的主备同步增强版,在单副本实例上限定只能创建MergeTree表引擎,在双副本实例上只能创建ReplicatedMergeTree表引擎。

解决方案:

在双副本实例上建表时,请使用ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或ReplicatedMergeTree()配置ReplicatedMergeTree表引擎。其中,ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)为固定配置,无需修改。

这里引出了复制表的概念,这里介绍一下,只有 MergeTree 系列里的表可支持副本:

ReplicatedMergeTree

ReplicatedSummingMergeTree

ReplicatedReplacingMergeTree

ReplicatedAggregatingMergeTree ReplicatedCollapsingMergeTree

ReplicatedVersionedCollapsingMergeTree

ReplicatedGraphiteMergeTree

副本是表级别的,不是整个服务器级的。所以,服务器里可以同时有复制表和非复制表。副本不依赖分片。每个分片有它自己的独立副本。

创建复制表

先做好准备工作,该建表的建表,然后编写程序。在表引擎名称上加上 Replicated 前缀。例如:ReplicatedMergeTree。

  • 首先创建一个分布式数据库
create database test on cluster default_cluster;
  • 创建本地表

由于clickhouse是分布式的,创建本地表本来应该在每个节点上创建的,但是指定on cluster关键字可以直接完成,建表语句如下:

CREATE TABLE test.test_data_shade on cluster default_cluster
(
    `data` Map(String, String),
    `uid` String,
    `remote_addr` String,
    `time` Datetime64,
    `status` Int32,
    ...其它字段省略
    `dt` String
)
ENGINE = ReplicatedMergeTree()
partition by dt
order by (dt, sipHash64(uid));

这里表引擎为ReplicatedMergeTree,即有副本的表,根据dt按天分区,提升查询效率,sipHash64是一个hash函数,根据uid散列使得相同uid数据在同一个分片上面,如果有去重需求,速度更快,因为可以计算每个分片去重,再汇总一下即可。

  • 创建分布式表
CREATE TABLE test.test_data_all on cluster default_cluster as test.test_data_shade ENGINE = Distributed('default_cluster', 'test', 'test_data_shade', sipHash64(uid));

在多副本分布式 ClickHouse 集群中,通常需要使用 Distributed 表写入或读取数据,Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。

通过jdbc写入

这个我是看的官方文档,里面有2种选择,感兴趣的同学可以都去尝试一下。

这里贴一下我的Pom依赖

<dependency>
    <groupId>ru.yandex.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.1-patch</version>
    <classifier>shaded</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Flink主程序,消费kafka,做清洗,然后写入clickhouse,这都是常规操作,这里贴一下关键代码吧。

连接clickhouse有2种方式,8123端口的http方式,和基于9000端口的tcp方式。

这里官方推荐的是连接驱动是0.3.2:

<dependency>
    <!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.3.2-patch11</version>
    <classifier>all</classifier>
    <exclusions>
        <exclusion>
            <groupId>*</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Note: ru.yandex.clickhouse.ClickHouseDriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.

官方推荐升级到0.3.2,上面表格给出了升级方法,文档地址:

github.com/ClickHouse/…

以上就是详解Flink同步Kafka数据到ClickHouse分布式表的详细内容,更多关于Flink数据同步Kafka ClickHouse的资料请关注脚本之家其它相关文章!

相关文章

  • sql Union和Union All的使用方法

    sql Union和Union All的使用方法

    UNION指令的目的是将两个SQL语句的结果合并起来。从这个角度来看, 我们会产生这样的感觉,UNION跟JOIN似乎有些许类似,因为这两个指令都可以由多个表格中撷取资料。
    2009-07-07
  • 浅谈一下数据库系统的发展与组成

    浅谈一下数据库系统的发展与组成

    这篇文章主要介绍了浅谈一下数据库系统的发展与组成,数据库系统,指在计算机系统中引入数据库后的系统,一般由数据库、数据库管理系统、应用系统、数据库管理员(DBA)构成,本文就数据库的发展展开详细讲解
    2023-07-07
  • 14张图看懂什么是区块链技术

    14张图看懂什么是区块链技术

    这篇文章主要为大家分享了14张图,帮助大家看懂什么是区块链技术,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-01-01
  • navicat导入excel文件的步骤以及可能碰到的问题

    navicat导入excel文件的步骤以及可能碰到的问题

    本文介绍将excel导入到mysql数据库的方法,相对来说比较简单,但也可能会碰到一些小问题,在这里做一个小的总结,这里使用到的工具包括navicat,mysql数据库以及excel,需要的朋友可以参考下
    2024-07-07
  • Dbeaver连接ClickHouse全过程

    Dbeaver连接ClickHouse全过程

    文章主要介绍了如何使用Dbeaver连接ClickHouse数据库,首先需要安装Dbeaver工具,然后通过新建驱动管理器和设置库信息来配置驱动,接着,创建数据库连接并填写连接配置信息,测试连接以确保配置正确,最后,成功连接ClickHouse数据库
    2024-11-11
  • 存储过程返回数组对象示例代码

    存储过程返回数组对象示例代码

    存储过程返回数组对象其实就相当于返回List里面放的对象数据,下面与大家分享是例子,感兴趣的朋友可以学习下
    2013-07-07
  • access mysql mssql 随机 10条数据的sql语句

    access mysql mssql 随机 10条数据的sql语句

    好多情况下,大家需要随机抽取几个数据,当然数据是从数据库来的,所以脚本之家特为大家准备了一些。
    2009-05-05
  • Nebula Graph解决风控业务实践

    Nebula Graph解决风控业务实践

    本文主要讲述 Nebula Graph 是如何通过众安保险的选型,以及 Nebula Graph 又是如何落地到具体业务场景帮助众安保险解决风控问题,有需要的朋友可以借鉴参考下
    2022-03-03
  • SQL注入篇学习之盲注/宽字节注入

    SQL注入篇学习之盲注/宽字节注入

    盲注是注入的一种,指的是在不知道数据库返回值的情况下对数据中的内容进行猜测,实施SQL注入,下面这篇文章主要给大家介绍了关于SQL注入篇之盲注/宽字节注入的相关资料,需要的朋友可以参考下
    2022-03-03
  • 梧桐数据库与`mysql`及`oracle`关于交换服务器编号的`SQL`写法分析(推荐)

    梧桐数据库与`mysql`及`oracle`关于交换服务器编号的`SQL`写法分析(推荐)

    本文介绍了如何通过SQL查询实现服务器编号的交换操作,以优化数据中心内部服务器的布局,文章说明了不同数据库(如梧桐数据库、MySQL和Oracle)的建表语句、数据插入以及SQL实现思路,通过具体的SQL查询,文章展示了如何在不同数据库中交换服务器编号,并解释了每个部分的功能
    2024-11-11

最新评论