python高级数据分析图表(基于SeaTunnel构建)
python高级数据分析图表(基于SeaTunnel构建)Mysql 5.7.32Hadoop 3.3.0 : 底层 hudi 数据存储环境准备1. 提前本地环境已部署伪 hadoop 集群 单节点 Flink,Hive 3.2.1Hudi 0.9 : 数据湖 lib 库
01
方案论述
基于 SeaTunnel 启动配置化脚本 cdc入湖。方案主要是由 cdc SeaTunnel hudi ,支持全增量的 mysql (或者 cdc 支持数据库)日志采集,解析,入湖,hudi 作为一个可 upsert 和增量查询的系统,可实时观看入湖数据。
02
环境准备
1. 提前本地环境已部署伪 hadoop 集群 单节点 Flink,Hive 3.2.1
Hudi 0.9 : 数据湖 lib 库
Hadoop 3.3.0 : 底层 hudi 数据存储
Mysql 5.7.32
flink-connector-mysql-cdc-1.2.0.jar : mysql cdc 独立类包
2. hudi部署
详细部署可参考:
https://www.yuque.com/docs/share/01c98494-a980-414c-9c45-152023bf3c17?#PubmU
一般问题会是依赖包 guava 冲突。
Hive3.2.1项目中 apache-hive-3.1.2-src/ql 路径下 hive-exec 模块去掉 guava。
<include>org.codehaus.jackson:jackson-core-asl</include>
<include>org.codehaus.jackson:jackson-mapper-asl</include>
<!-- <include>com.google.guava:guava</include> -->
<include>net.sf.opencsv:opencsv</include>
或者直接在 hudi 中去除掉 hive-exec 依赖。
然后 hudi/packing/hudi-flink-bundle 重新打包。
mvn clean install -DskipTests-Drat.skip=true-Dscala-2.11-Dhadoop.version=3.3.0-Pflinkbundle-shade-hive3
打包完毕后把 hudi-flink-bundle_2.11-0.9.0.jar 放入 flink/lib 目录中。
3. SeaTunnel部署
SeaTunnel 项目本地 Idea 工程直接打包,打包文件在 Seatunnel-dist 目录下:
xxxxx\incubator-seatunnel\seatunnel-dist\target\seatunnel-dist-2.0.5-SNAPSHOT-2.11.8-bin.tar.gz
里面包含了 Spark Flink 等相关启动包,这次只是用 seatunlel-core-sql 。
total 12
drwxr-xr-x 3 root root 105 Jan 11 18:43 bin
drwxr-xr-x 2 root root 170 Jan 10 15:34 config
drwxr-xr-x 2 root root 129 Jan 11 10:11 lib
-rw-r--r-- 1 root root 11558 Dec 30 15:58 LICENSE
drwxr-xr-x 3 root root 18 Dec 30 15:58 plugins
drwxr-xr-x 2 root root 55 Jan 11 18:34 script
[root@node-92 seatunnel]# cd lib
[root@node-92 lib]# ls -l
total 68648
-rw-r--r-- 1 root root 2925 Jan 10 18:59 pom.xml
-rw-r--r-- 1 root root 34916272 Jan 11 10:11 seatunnel-core-sql-2.0.5-SNAPSHOT-2.11.8.jar
注意: 由于是基于Flink 12.2不存在类 SetOperation,使用其他方式去实现 configuration.setString 配置文件写入。
4. 数据准备
create table users
(
id bigint auto_increment primary key
name varchar(20) null
birthday timestamp default CURRENT_TIMESTAMP not null
ts timestamp default CURRENT_TIMESTAMP not null
);
// 随意插入几条数据
insert into users (name) values ('hello');
insert into users (name) values ('world');
insert into users (name) values ('iceberg');
insert into users (id name) values (4 'spark');
insert into users (name) values ('hudi');
select * from users;
update users set name = 'hello spark' where id = 5;
delete from users where id = 5;
5. 使用 SeaTunnel 启动配置化脚本
SET execution.checkpointing.interval = 10sec;
CREATE TABLE mysql_users (
id BIGINT PRIMARY KEY NOT ENFORCED
name STRING
birthday TIMESTAMP(3)
ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc'
'hostname' = 'localhost'
'port' = '3306'
'username' = 'user'
'password' = '123456'
'database-name' = 'test'
'table-name' = 'users'
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE hudi_users3
(
id BIGINT PRIMARY KEY NOT ENFORCED
name STRING
birthday TIMESTAMP(3)
ts TIMESTAMP(3)
`partition` VARCHAR(20)
) PARTITIONED BY (`partition`) WITH (
'connector' = 'hudi'
'table.type' = 'MERGE_ON_READ'
'path' = 'hdfs://localhost:9000/hudi/hudi_users3'
'read.streaming.enabled' = 'true'
'read.streaming.check-interval' = '1'
);
INSERT INTO hudi_users3 SELECT * '20210110' FROM mysql_users;
配置环境变量后 使用 SeaTunnel 启动 cdc 任务:
[root@ flink-1.12.2]# sh ${SEATUNNEL_HOME}/bin/start-seatunnel-sql.sh -c ${SEATUNNEL_HOME}/script/flink-cdc-hudi.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/warehouse/flink-1.12.2/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/warehouse/hadoop-3.3.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID c5ecca8a5e0a0959fc6193c97cf2421c
访问 localhost:8081,查看 Flink Running Jobs 界面,启动作业正在运行。
03
验证数据
1. Flink SQL Client 查看数据
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded
Flink SQL> CREATE TABLE hudi_users3
> (
> id BIGINT PRIMARY KEY NOT ENFORCED
> name STRING
> birthday TIMESTAMP(3)
> ts TIMESTAMP(3)
> `partition` VARCHAR(20)
> ) PARTITIONED BY (`partition`) WITH (
> 'connector' = 'hudi'
> 'table.type' = 'MERGE_ON_READ'
> 'path' = 'hdfs://localhost:9000/hudi/hudi_users3'
> 'read.streaming.enabled' = 'true'
> 'read.streaming.check-interval' = '1'
> 'read.streaming.start-commit' = '20210101101010' -- 从起始位置开始消费 ** 0.10开始 read.start-commit
> );
[INFO] Table has been created.
Flink SQL> set execution.result-mode=tableau; -- 设置标准化输出
[INFO] Session property has been set.
Flink SQL> select * from hudi_users3;
----- ---------------------- ---------------------- ------------------------- ------------------------- ----------------------
| /- | id | name | birthday | ts | partition |
----- ---------------------- ---------------------- ------------------------- ------------------------- ----------------------
| | 2 | world | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 3 | iceberg | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 4 | spark | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 1 | hello | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 5 | hudi | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
执行更新操作: update users set name = 'hello spark' where id = 5;
Flink SQL> select * from hudi_users3;
----- ---------------------- ---------------------- ------------------------- ------------------------- ----------------------
| /- | id | name | birthday | ts | partition |
----- ---------------------- ---------------------- ------------------------- ------------------------- ----------------------
| | 2 | world | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 3 | iceberg | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 4 | spark | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 1 | hello | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 5 | hello spark | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
执行删除操作: delete from users where id = 5;
Flink SQL> select * from hudi_users3;
----- ---------------------- ---------------------- ------------------------- ------------------------- ----------------------
| /- | id | name | birthday | ts | partition |
----- ---------------------- ---------------------- ------------------------- ------------------------- ----------------------
| | 1 | hello | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| - | 5 | (NULL) | (NULL) | (NULL) | (NULL) |
| | 2 | world | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 3 | iceberg | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
| | 4 | spark | 2022-01-12T07:46:47 | 2022-01-12T07:46:47 | 20210110 |
04
结束语
在本文中,展示了如何使用 SeaTunnel 集成 MySQL Hudi 构建近实时数仓架构,通过任务配置化减少代码开发量 后续通过 DolphinScheduler 调度平台文件管理和调度功能去统一管理、执行、监控任务。
后续建议:
- 监控能力,包括细粒度任务资源监控;
- 扩展更多的通用插件,例如加解密、格式转换;
- 更多配置化功能 比如限速、限流