快捷搜索:  汽车  科技

kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)

kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)字段名称和字段类型需要与Kafka数据操持一致。如果考虑后续需要使用Flink SQL查询数据,字段名称不要使用Flink SQL关键字,如:table、partition等。不清楚是否为关键字,可以查询Flink官网确认下,地址:/** * A utility which can incrementally consume data from Kafka and apply it to the target table. * It has the similar Functionality with SQL data source except that the source is bind to Kafka * and the format is bind to JSON. */ public class HoodieFlinkStreamer { public sta

前面系列文章已经介绍Hudi源码用例HoodieFlinkStreamer:该用例基于Flink DataStream API实现消费Kafka数据实时写入数据湖Hudi表。但是,原生实现并不能满足客户需求,需求描述:消费Kafka单个Topic数据(应该是其他厂商将上游多个业务数据写入同一个Topic)、动态拆表、实时写入不同数据湖Hudi表。这种场景可以考虑启动多个任务(FlinkSQL)同时消费该Topic,每个任务只负责该Topic数据中的单个业务数据写入对应的数据湖Hudi表。这种方案存在的问题就是每个任务都重复读取Topic全量数据、会造成资源浪费。

本文介绍基于最新Hudi版本源码用例HoodieFlinkStreamer改造实现方案,增加动态拆表功能点,使其满足客户需求:消费Kafka单个Topic数据、根据业务属性动态拆表、实时写入多个对应的业务数据湖Hudi表。

环境说明

Flink:1.13.6

Hudi:0.10.1

强烈建查看之前系列文章:Flink1.13.6和Hudi0.10.1源码编译、FlinkSQL on yarn session集群使用方法、 insert into语句写入数据到hudi表、FlinkSQL消费Kafka数据实时写入数据湖Hudi表、FlinkSQL整合Hive catalog、源码用例实现消费kafka数据实时写入数据湖Hudi表等内容。

源码改造

HoodieFlinkStreamer类文件路径:

hudi-flink\src\main\java\org\apache\hudi\streamer\HoodieFlinkStreamer.java

根本思想就是:根据描述表的字段值不同,对Kafka DataStream增加Filter操作符过滤出来相应的DataStream,然后将过滤后的DataStream写入到相应的Hudi表内。完成改造的源码如下:

/** * A utility which can incrementally consume data from Kafka and apply it to the target table. * It has the similar Functionality with SQL data source except that the source is bind to Kafka * and the format is bind to JSON. */ public class HoodieFlinkStreamer { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final FlinkStreamerConfig cfg = new FlinkStreamerConfig(); JCommander cmd = new JCommander(cfg null args); if (cfg.help || args.length == 0) { cmd.usage(); System.exit(1); } env.enableCheckpointing(cfg.checkpointInterval); env.getConfig().setGlobalJobParameters(cfg); // We use checkpoint to trigger write operation including instant generating and committing // There can only be one checkpoint at one time. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); if (cfg.flinkCheckPointPath != null) { env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath)); } TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps(); kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg)); // Read from kafka source RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) .getLogicalType(); Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg); long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); int parallelism = env.getParallelism(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT ckpTimeout); DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>( cfg.kafkaTopic new JsonRowDataDeserializationSchema( rowType InternalTypeInfo.of(rowType) false true TimestampFormat.ISO_8601 ) kafkaProps)) .name("kafka_source") .uid("uid_kafka_source"); if (cfg.transformerClassNames != null && !cfg.transformerClassNames.isEmpty()) { Option<Transformer> transformer = StreamerUtil.createTransformer(cfg.transformerClassNames); if (transformer.isPresent()) { dataStream = transformer.get().apply(dataStream); } } for (String tableName : conf.getString(FlinkOptions.TABLE_NAME).split(" ")) { Configuration confNew = conf.clone(); confNew.setString(FlinkOptions.TABLE_NAME tableName); confNew.setString(FlinkOptions.PATH conf.getString(FlinkOptions.PATH) "/" tableName); writeHudi(confNew rowType parallelism dataStream.filter((FilterFunction<RowData>) value -> value.getString(5).toString().equals(tableName))); } env.execute(cfg.targetTableName); } public static void writeHudi(Configuration conf RowType rowType int parallelism DataStream<RowData> dataStream) { DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf rowType parallelism dataStream); DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf parallelism hoodieRecordDataStream); if (StreamerUtil.needsAsyncCompaction(conf)) { Pipelines.compact(conf pipeline); } else { Pipelines.clean(conf pipeline); } } }

至此,已经完成客户功能需求源码改造,实现很简单。下面进行基本测试。

准备schema.avsc

字段名称和字段类型需要与Kafka数据操持一致。如果考虑后续需要使用Flink SQL查询数据,字段名称不要使用Flink SQL关键字,如:table、partition等。不清楚是否为关键字,可以查询Flink官网确认下,地址:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/overview/#reserved-keywords

[root@managerment2 bin]# cat schema.avsc { "type" : "record" "name" : "record" "fields" : [ { "name" : "uuid" "type" : [ "null" "string" ] "default" : null } { "name" : "name" "type" : [ "null" "string" ] "default" : null } { "name" : "age" "type" : [ "null" "int" ] "default" : null } { "name" : "ts" "type" : [ "null" { "type" : "long" "logicalType" : "timestamp-millis" } ] "default" : null } { "name" : "partition1" "type" : [ "null" "string" ] "default" : null } { "name" : "table_name" "type" : [ "null" "string" ] "default" : null }] }

kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)(1)

提交任务

说明(需要拆分的Hudi表以逗号分割):

--target-table table1 table2

./flink run -c org.apache.hudi.streamer.HoodieFlinkStreamer /opt/hudi-flink-bundle_2.11-0.10.1.jar \ --kafka-bootstrap-servers felixzh1:9092 \ --kafka-topic hudi_default \ --kafka-group-id felixzh \ --checkpoint-interval 5000 \ --target-base-path hdfs://management0.hde.com:8020/flink/hudi_kafka \ --table-type MERGE_ON_READ \ --target-table table1 table2 \ --partition-path-field partition1 \ --source-avro-schema-path hdfs://management0.hde.com:8020/flink/schema.avsc

kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)(2)

kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)(3)

可以通过如下命令查看Hdfs上相应Hudi表文件路劲如下:

[root@managerment2 bin]# hdfs dfs -ls /flink/hudi_kafka/*/

kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)(4)

​说明:默认从latest offset开始消费​,任务启动之后,发送模拟数据。

模拟数据

创建Topic,将提前准备好的测试数据,写入Topic。

[root@felixzh1 bin]# cat hudi_default_data {"uuid": "id1" "name": "Danny" "age": 23 "ts": "1970-01-01T00:00:01" "partition1": "par1" "table_name":"table1"} {"uuid": "id2" "name": "Stephen" "age": 33 "ts": "1970-01-01T00:00:02" "partition1": "par1" "table_name":"table1"} {"uuid": "id3" "name": "Julian" "age": 53 "ts": "1970-01-01T00:00:03" "partition1": "par2" "table_name":"table1"} {"uuid": "id4" "name": "Fabian" "age": 31 "ts": "1970-01-01T00:00:04" "partition1": "par2" "table_name":"table1"} {"uuid": "id5" "name": "Sophia" "age": 18 "ts": "1970-01-01T00:00:05" "partition1": "par3" "table_name":"table1"} {"uuid": "id6" "name": "Emma" "age": 20 "ts": "1970-01-01T00:00:06" "partition1": "par3" "table_name":"table1"} {"uuid": "id7" "name": "Bob" "age": 44 "ts": "1970-01-01T00:00:07" "partition1": "par4" "table_name":"table1"} {"uuid": "id8" "name": "Han" "age": 56 "ts": "1970-01-01T00:00:08" "partition1": "par4" "table_name":"table1"} {"uuid": "id1" "name": "Danny" "age": 23 "ts": "1970-01-01T00:00:01" "partition1": "par1" "table_name":"table2"} {"uuid": "id2" "name": "Stephen" "age": 33 "ts": "1970-01-01T00:00:02" "partition1": "par1" "table_name":"table2"} {"uuid": "id3" "name": "Julian" "age": 53 "ts": "1970-01-01T00:00:03" "partition1": "par2" "table_name":"table2"} {"uuid": "id4" "name": "Fabian" "age": 31 "ts": "1970-01-01T00:00:04" "partition1": "par2" "table_name":"table2"} {"uuid": "id5" "name": "Sophia" "age": 18 "ts": "1970-01-01T00:00:05" "partition1": "par3" "table_name":"table2"} {"uuid": "id6" "name": "Emma" "age": 20 "ts": "1970-01-01T00:00:06" "partition1": "par3" "table_name":"table2"} {"uuid": "id7" "name": "Bob" "age": 44 "ts": "1970-01-01T00:00:07" "partition1": "par4" "table_name":"table2"} {"uuid": "id8" "name": "Han" "age": 56 "ts": "1970-01-01T00:00:08" "partition1": "par4" "table_name":"table2"}

kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)(5)

创建Hudi表

方便起见,我这里使用FlinkSQL直接查询写入的hudi数据。当然,需要先创建一个对应的hudi表(注意字段数据类型必须与自带用例字段数据类型保持一致,path也要对应):

CREATE TABLE table1( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED name VARCHAR(30) age INT tsTIMESTAMP(3) partition1 VARCHAR(20) table_name VARCHAR(20) )PARTITIONED BY (partition1)WITH ( 'connector' = 'hudi' 'path' = 'hdfs:///flink/hudi_kafka/table1' 'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table by default is COPY_ON_WRITE 'write.batch.size'='1' 'write.tasks' = '1' 'compaction.tasks' = '1' 'write.rate.limit' = '0' 'compaction.tasks '= ' 1' 'compaction.trigger.strategy' ='num_commits' 'compaction.delta_commits' = '5' );

CREATE TABLE table2( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED name VARCHAR(30) age INT ts TIMESTAMP(3) partition1 VARCHAR(20) table_name VARCHAR(20) )PARTITIONED BY (partition1)WITH ( 'connector' = 'hudi' 'path' = 'hdfs:///flink/hudi_kafka/table2' 'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table by default is COPY_ON_WRITE 'write.batch.size'='1' 'write.tasks' = '1' 'compaction.tasks' = '1' 'write.rate.limit' = '0' 'compaction.tasks '= ' 1' 'compaction.trigger.strategy' = 'num_commits' 'compaction.delta_commits' = '5' );FlinkSQL查询Hudi表

查询结果 -- sets up the result mode to tableau to show the results directly in the CLI set sql-client.execution.result-mode=tableau; -- query from the Hudi table select * from table1; select * from table2;

kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)(6)

kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)(7)

热烈欢迎 点赞、收藏、转发、关注​!



猜您喜欢: