快捷搜索:  汽车  科技

python高级数据分析图表(基于SeaTunnel构建)

python高级数据分析图表(基于SeaTunnel构建)Mysql 5.7.32Hadoop 3.3.0 : 底层 hudi 数据存储环境准备1. 提前本地环境已部署伪 hadoop 集群 单节点 Flink,Hive 3.2.1Hudi 0.9 : 数据湖 lib 库

python高级数据分析图表(基于SeaTunnel构建)(1)

01

方案论述

基于 SeaTunnel 启动配置化脚本 cdc入湖。方案主要是由 cdc SeaTunnel hudi ,支持全增量的 mysql (或者 cdc 支持数据库)日志采集,解析,入湖,hudi 作为一个可 upsert 和增量查询的系统,可实时观看入湖数据。

python高级数据分析图表(基于SeaTunnel构建)(2)

02

环境准备

1. 提前本地环境已部署伪 hadoop 集群 单节点 Flink,Hive 3.2.1

Hudi 0.9 : 数据湖 lib 库

Hadoop 3.3.0 : 底层 hudi 数据存储

Mysql 5.7.32

flink-connector-mysql-cdc-1.2.0.jar : mysql cdc 独立类包

2. hudi部署

详细部署可参考:

https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#PubmU

一般问题会是依赖包 guava 冲突。

Hive3.2.1项目中 apache-hive-3.1.2-src/ql 路径下 hive-exec 模块去掉 guava。

<include>org.codehaus.jackson:jackson-core-asl</include> <include>org.codehaus.jackson:jackson-mapper-asl</include> <!-- <include>com.google.guava:guava</include> --> <include>net.sf.opencsv:opencsv</include>

或者直接在 hudi 中去除掉 hive-exec 依赖。

然后 hudi/packing/hudi-flink-bundle 重新打包。

mvn clean install -DskipTests-Drat.skip=true-Dscala-2.11-Dhadoop.version=3.3.0-Pflinkbundle-shade-hive3

打包完毕后把 hudi-flink-bundle_2.11-0.9.0.jar 放入 flink/lib 目录中。

3. SeaTunnel部署

SeaTunnel 项目本地 Idea 工程直接打包,打包文件在 Seatunnel-dist 目录下:

xxxxx\incubator-seatunnel\seatunnel-dist\target\seatunnel-dist-2.0.5-SNAPSHOT-2.11.8-bin.tar.gz

里面包含了 Spark Flink 等相关启动包,这次只是用 seatunlel-core-sql 。

total 12 drwxr-xr-x 3 root root 105 Jan 11 18:43 bin drwxr-xr-x 2 root root 170 Jan 10 15:34 config drwxr-xr-x 2 root root 129 Jan 11 10:11 lib -rw-r--r-- 1 root root 11558 Dec 30 15:58 LICENSE drwxr-xr-x 3 root root 18 Dec 30 15:58 plugins drwxr-xr-x 2 root root 55 Jan 11 18:34 script [root@node-92 seatunnel]# cd lib [root@node-92 lib]# ls -l total 68648 -rw-r--r-- 1 root root 2925 Jan 10 18:59 pom.xml -rw-r--r-- 1 root root 34916272 Jan 11 10:11 seatunnel-core-sql-2.0.5-SNAPSHOT-2.11.8.jar

注意: 由于是基于Flink 12.2不存在类 SetOperation,使用其他方式去实现 configuration.setString 配置文件写入。

4. 数据准备

create table users ( id bigint auto_increment primary key name varchar(20) null birthday timestamp default CURRENT_TIMESTAMP not null ts timestamp default CURRENT_TIMESTAMP not null ); // 随意插入几条数据 insert into users (name) values ('hello'); insert into users (name) values ('world'); insert into users (name) values ('iceberg'); insert into users (id name) values (4 'spark'); insert into users (name) values ('hudi'); select * from users; update users set name = 'hello spark' where id = 5; delete from users where id = 5;

5. 使用 SeaTunnel 启动配置化脚本

SET execution.checkpointing.interval = 10sec; CREATE TABLE mysql_users ( id BIGINT PRIMARY KEY NOT ENFORCED name STRING birthday TIMESTAMP(3) ts TIMESTAMP(3) ) WITH ( 'connector' = 'mysql-cdc' 'hostname' = 'localhost' 'port' = '3306' 'username' = 'user' 'password' = '123456' 'database-name' = 'test' 'table-name' = 'users' 'scan.startup.mode' = 'latest-offset' ); CREATE TABLE hudi_users3 ( id BIGINT PRIMARY KEY NOT ENFORCED name STRING birthday TIMESTAMP(3) ts TIMESTAMP(3) `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi' 'table.type' = 'MERGE_ON_READ' 'path' = 'hdfs://localhost:9000/hudi/hudi_users3' 'read.streaming.enabled' = 'true' 'read.streaming.check-interval' = '1' ); INSERT INTO hudi_users3 SELECT * '20210110' FROM mysql_users;

配置环境变量后 使用 SeaTunnel 启动 cdc 任务:

[root@ flink-1.12.2]# sh ${SEATUNNEL_HOME}/bin/start-seatunnel-sql.sh -c ${SEATUNNEL_HOME}/script/flink-cdc-hudi.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data/warehouse/flink-1.12.2/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/data/warehouse/hadoop-3.3.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Job has been submitted with JobID c5ecca8a5e0a0959fc6193c97cf2421c

访问 localhost:8081,查看 Flink Running Jobs 界面,启动作业正在运行。

python高级数据分析图表(基于SeaTunnel构建)(3)

03

验证数据

1. Flink SQL Client 查看数据

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath` ./bin/sql-client.sh embedded

Flink SQL> CREATE TABLE hudi_users3 > ( > id BIGINT PRIMARY KEY NOT ENFORCED > name STRING > birthday TIMESTAMP(3) > ts TIMESTAMP(3) > `partition` VARCHAR(20) > ) PARTITIONED BY (`partition`) WITH ( > 'connector' = 'hudi' > 'table.type' = 'MERGE_ON_READ' > 'path' = 'hdfs://localhost:9000/hudi/hudi_users3' > 'read.streaming.enabled' = 'true' > 'read.streaming.check-interval' = '1' > 'read.streaming.start-commit' = '20210101101010' -- 从起始位置开始消费 ** 0.10开始 read.start-commit > ); [INFO] Table has been created. Flink SQL> set execution.result-mode=tableau; -- 设置标准化输出 [INFO] Session property has been set. Flink SQL> select * from hudi_users3; ----- ---------------------- ---------------------- ------------------------- ------------------------- ---------------------- | /- | id | name | birthday | ts | partition | ----- ---------------------- ---------------------- ------------------------- ------------------------- ---------------------- | | 2 | world | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 3 | iceberg | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 4 | spark | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 1 | hello | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 5 | hudi | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | 执行更新操作: update users set name = 'hello spark' where id = 5; Flink SQL> select * from hudi_users3; ----- ---------------------- ---------------------- ------------------------- ------------------------- ---------------------- | /- | id | name | birthday | ts | partition | ----- ---------------------- ---------------------- ------------------------- ------------------------- ---------------------- | | 2 | world | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 3 | iceberg | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 4 | spark | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 1 | hello | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 5 | hello spark | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |

执行删除操作: delete from users where id = 5;

Flink SQL> select * from hudi_users3; ----- ---------------------- ---------------------- ------------------------- ------------------------- ---------------------- | /- | id | name | birthday | ts | partition | ----- ---------------------- ---------------------- ------------------------- ------------------------- ---------------------- | | 1 | hello | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | - | 5 | (NULL) | (NULL) | (NULL) | (NULL) | | | 2 | world | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 3 | iceberg | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 | | | 4 | spark | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |

04

结束语

在本文中,展示了如何使用 SeaTunnel 集成 MySQL Hudi 构建近实时数仓架构,通过任务配置化减少代码开发量 后续通过 DolphinScheduler 调度平台文件管理和调度功能去统一管理、执行、监控任务。

后续建议:

  • 监控能力,包括细粒度任务资源监控;
  • 扩展更多的通用插件,例如加解密、格式转换;
  • 更多配置化功能 比如限速、限流

猜您喜欢: