kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)
kafka stream 聚合(HoodieFlinkStreamer消费Kafka动态拆表写多Hudi表)字段名称和字段类型需要与Kafka数据操持一致。如果考虑后续需要使用Flink SQL查询数据,字段名称不要使用Flink SQL关键字,如:table、partition等。不清楚是否为关键字,可以查询Flink官网确认下,地址:/** * A utility which can incrementally consume data from Kafka and apply it to the target table. * It has the similar Functionality with SQL data source except that the source is bind to Kafka * and the format is bind to JSON. */ public class HoodieFlinkStreamer { public sta
前面系列文章已经介绍Hudi源码用例HoodieFlinkStreamer:该用例基于Flink DataStream API实现消费Kafka数据实时写入数据湖Hudi表。但是,原生实现并不能满足客户需求,需求描述:消费Kafka单个Topic数据(应该是其他厂商将上游多个业务数据写入同一个Topic)、动态拆表、实时写入不同数据湖Hudi表。这种场景可以考虑启动多个任务(FlinkSQL)同时消费该Topic,每个任务只负责该Topic数据中的单个业务数据写入对应的数据湖Hudi表。这种方案存在的问题就是每个任务都重复读取Topic全量数据、会造成资源浪费。
本文介绍基于最新Hudi版本源码用例HoodieFlinkStreamer改造实现方案,增加动态拆表功能点,使其满足客户需求:消费Kafka单个Topic数据、根据业务属性动态拆表、实时写入多个对应的业务数据湖Hudi表。
环境说明Flink:1.13.6
Hudi:0.10.1
强烈建查看之前系列文章:Flink1.13.6和Hudi0.10.1源码编译、FlinkSQL on yarn session集群使用方法、 insert into语句写入数据到hudi表、FlinkSQL消费Kafka数据实时写入数据湖Hudi表、FlinkSQL整合Hive catalog、源码用例实现消费kafka数据实时写入数据湖Hudi表等内容。
源码改造HoodieFlinkStreamer类文件路径:
hudi-flink\src\main\java\org\apache\hudi\streamer\HoodieFlinkStreamer.java
根本思想就是:根据描述表的字段值不同,对Kafka DataStream增加Filter操作符过滤出来相应的DataStream,然后将过滤后的DataStream写入到相应的Hudi表内。完成改造的源码如下:
/**
* A utility which can incrementally consume data from Kafka and apply it to the target table.
* It has the similar Functionality with SQL data source except that the source is bind to Kafka
* and the format is bind to JSON.
*/
public class HoodieFlinkStreamer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final FlinkStreamerConfig cfg = new FlinkStreamerConfig();
JCommander cmd = new JCommander(cfg null args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
env.enableCheckpointing(cfg.checkpointInterval);
env.getConfig().setGlobalJobParameters(cfg);
// We use checkpoint to trigger write operation including instant generating and committing
// There can only be one checkpoint at one time.
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
if (cfg.flinkCheckPointPath != null) {
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
}
TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();
kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg));
// Read from kafka source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
.getLogicalType();
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
int parallelism = env.getParallelism();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT ckpTimeout);
DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic
new JsonRowDataDeserializationSchema(
rowType
InternalTypeInfo.of(rowType)
false
true
TimestampFormat.ISO_8601
) kafkaProps))
.name("kafka_source")
.uid("uid_kafka_source");
if (cfg.transformerClassNames != null && !cfg.transformerClassNames.isEmpty()) {
Option<Transformer> transformer = StreamerUtil.createTransformer(cfg.transformerClassNames);
if (transformer.isPresent()) {
dataStream = transformer.get().apply(dataStream);
}
}
for (String tableName : conf.getString(FlinkOptions.TABLE_NAME).split(" ")) {
Configuration confNew = conf.clone();
confNew.setString(FlinkOptions.TABLE_NAME tableName);
confNew.setString(FlinkOptions.PATH conf.getString(FlinkOptions.PATH) "/" tableName);
writeHudi(confNew rowType parallelism dataStream.filter((FilterFunction<RowData>) value -> value.getString(5).toString().equals(tableName)));
}
env.execute(cfg.targetTableName);
}
public static void writeHudi(Configuration conf RowType rowType int parallelism DataStream<RowData> dataStream) {
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf rowType parallelism dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf parallelism hoodieRecordDataStream);
if (StreamerUtil.needsAsyncCompaction(conf)) {
Pipelines.compact(conf pipeline);
} else {
Pipelines.clean(conf pipeline);
}
}
}
至此,已经完成客户功能需求源码改造,实现很简单。下面进行基本测试。
准备schema.avsc字段名称和字段类型需要与Kafka数据操持一致。如果考虑后续需要使用Flink SQL查询数据,字段名称不要使用Flink SQL关键字,如:table、partition等。不清楚是否为关键字,可以查询Flink官网确认下,地址:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/overview/#reserved-keywords
[root@managerment2 bin]# cat schema.avsc
{
"type" : "record"
"name" : "record"
"fields" : [ {
"name" : "uuid"
"type" : [ "null" "string" ]
"default" : null
} {
"name" : "name"
"type" : [ "null" "string" ]
"default" : null
} {
"name" : "age"
"type" : [ "null" "int" ]
"default" : null
} {
"name" : "ts"
"type" : [ "null" {
"type" : "long"
"logicalType" : "timestamp-millis"
} ]
"default" : null
} {
"name" : "partition1"
"type" : [ "null" "string" ]
"default" : null
} {
"name" : "table_name"
"type" : [ "null" "string" ]
"default" : null
}]
}
说明(需要拆分的Hudi表以逗号分割):
--target-table table1 table2
./flink run -c org.apache.hudi.streamer.HoodieFlinkStreamer /opt/hudi-flink-bundle_2.11-0.10.1.jar \
--kafka-bootstrap-servers felixzh1:9092 \
--kafka-topic hudi_default \
--kafka-group-id felixzh \
--checkpoint-interval 5000 \
--target-base-path hdfs://management0.hde.com:8020/flink/hudi_kafka \
--table-type MERGE_ON_READ \
--target-table table1 table2 \
--partition-path-field partition1 \
--source-avro-schema-path hdfs://management0.hde.com:8020/flink/schema.avsc
可以通过如下命令查看Hdfs上相应Hudi表文件路劲如下:
[root@managerment2 bin]# hdfs dfs -ls /flink/hudi_kafka/*/
说明:默认从latest offset开始消费,任务启动之后,发送模拟数据。
模拟数据创建Topic,将提前准备好的测试数据,写入Topic。
[root@felixzh1 bin]# cat hudi_default_data
{"uuid": "id1" "name": "Danny" "age": 23 "ts": "1970-01-01T00:00:01" "partition1": "par1" "table_name":"table1"}
{"uuid": "id2" "name": "Stephen" "age": 33 "ts": "1970-01-01T00:00:02" "partition1": "par1" "table_name":"table1"}
{"uuid": "id3" "name": "Julian" "age": 53 "ts": "1970-01-01T00:00:03" "partition1": "par2" "table_name":"table1"}
{"uuid": "id4" "name": "Fabian" "age": 31 "ts": "1970-01-01T00:00:04" "partition1": "par2" "table_name":"table1"}
{"uuid": "id5" "name": "Sophia" "age": 18 "ts": "1970-01-01T00:00:05" "partition1": "par3" "table_name":"table1"}
{"uuid": "id6" "name": "Emma" "age": 20 "ts": "1970-01-01T00:00:06" "partition1": "par3" "table_name":"table1"}
{"uuid": "id7" "name": "Bob" "age": 44 "ts": "1970-01-01T00:00:07" "partition1": "par4" "table_name":"table1"}
{"uuid": "id8" "name": "Han" "age": 56 "ts": "1970-01-01T00:00:08" "partition1": "par4" "table_name":"table1"}
{"uuid": "id1" "name": "Danny" "age": 23 "ts": "1970-01-01T00:00:01" "partition1": "par1" "table_name":"table2"}
{"uuid": "id2" "name": "Stephen" "age": 33 "ts": "1970-01-01T00:00:02" "partition1": "par1" "table_name":"table2"}
{"uuid": "id3" "name": "Julian" "age": 53 "ts": "1970-01-01T00:00:03" "partition1": "par2" "table_name":"table2"}
{"uuid": "id4" "name": "Fabian" "age": 31 "ts": "1970-01-01T00:00:04" "partition1": "par2" "table_name":"table2"}
{"uuid": "id5" "name": "Sophia" "age": 18 "ts": "1970-01-01T00:00:05" "partition1": "par3" "table_name":"table2"}
{"uuid": "id6" "name": "Emma" "age": 20 "ts": "1970-01-01T00:00:06" "partition1": "par3" "table_name":"table2"}
{"uuid": "id7" "name": "Bob" "age": 44 "ts": "1970-01-01T00:00:07" "partition1": "par4" "table_name":"table2"}
{"uuid": "id8" "name": "Han" "age": 56 "ts": "1970-01-01T00:00:08" "partition1": "par4" "table_name":"table2"}
创建Hudi表方便起见,我这里使用FlinkSQL直接查询写入的hudi数据。当然,需要先创建一个对应的hudi表(注意字段数据类型必须与自带用例字段数据类型保持一致,path也要对应):
CREATE TABLE table1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED
name VARCHAR(30)
age INT
tsTIMESTAMP(3)
partition1 VARCHAR(20)
table_name VARCHAR(20)
)PARTITIONED BY (partition1)WITH (
'connector' = 'hudi'
'path' = 'hdfs:///flink/hudi_kafka/table1'
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table by default is COPY_ON_WRITE
'write.batch.size'='1'
'write.tasks' = '1'
'compaction.tasks' = '1'
'write.rate.limit' = '0'
'compaction.tasks '= ' 1'
'compaction.trigger.strategy' ='num_commits'
'compaction.delta_commits' = '5'
);
CREATE TABLE table2(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED
name VARCHAR(30)
age INT
ts TIMESTAMP(3)
partition1 VARCHAR(20)
table_name VARCHAR(20)
)PARTITIONED BY (partition1)WITH (
'connector' = 'hudi'
'path' = 'hdfs:///flink/hudi_kafka/table2'
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table by default is COPY_ON_WRITE
'write.batch.size'='1'
'write.tasks' = '1'
'compaction.tasks' = '1'
'write.rate.limit' = '0'
'compaction.tasks '= ' 1'
'compaction.trigger.strategy' = 'num_commits'
'compaction.delta_commits' = '5'
);
FlinkSQL查询Hudi表
查询结果
-- sets up the result mode to tableau to show the results directly in the CLI
set sql-client.execution.result-mode=tableau;
-- query from the Hudi table
select * from table1;
select * from table2;
热烈欢迎 点赞、收藏、转发、关注!