快捷搜索:  汽车  科技

flink部署在hadoop上:超详细步骤 整合Apache Hudi

flink部署在hadoop上:超详细步骤 整合Apache Hudi

//创建source表CREATE TABLE k ( tinyint0 TINYINT smallint1 SMALLINT int2 INT bigint3 BIGINT float4 FLOAT double5 DOUBLE decimal6 DECIMAL(38 8) boolean7 BOOLEAN char8 STRING varchar9 STRING string10 STRING timestamp11 STRING) WITH ( 'connector' = 'kafka' -- 使用 kafka connector 'topic' = 'test' -- kafka topic名称 'scan.startup.mode' = 'earliest-offset' -- 从起始 offset 开始读取 'properties.bootstrap.servers' = 'dbos-bigdata-test003:9092 dbos-bigdata-test005:9092 dbos-bigdata-test005:9092' -- kafka broker 地址 'properties.group.id' = 'testgroup1' 'value.format' = 'json' 'value.json.fail-on-missing-field' = 'true' 'value.fields-include' = 'ALL');

flink部署在hadoop上:超详细步骤 整合Apache Hudi(1)

// 创建Hudi(cow)sink表CREATE TABLE hdm( tinyint0 TINYINT smallint1 SMALLINT int2 INT bigint3 BIGINT float4 FLOAT double5 DOUBLE decimal6 DECIMAL(12 3) boolean7 BOOLEAN char8 CHAR(64) varchar9 VARCHAR(64) string10 STRING timestamp11 TIMESTAMP(3) )PARTITIONED BY (tinyint0) WITH ( 'connector' = 'hudi' 'path' = 'hdfs://bigdata/hudi/hdm' 'hoodie.datasource.write.recordkey.field' = 'char8' -- 主键 'write.precombine.field' = 'timestamp11' -- 相同的键值时,取此字段最大值,默认ts字段 'write.tasks' = '1' 'compaction.tasks' = '1' 'write.rate.limit' = '2000' -- 限制每秒多少条 'compaction.async.enabled' = 'true' -- 在线压缩 'compaction.trigger.strategy' = 'num_commits' -- 按次数压缩 'compaction.delta_commits' = '5' -- 默认为5 'hive_sync.enable' = 'true' -- 启用hive同步 'hive_sync.mode' = 'hms' -- 启用hive hms同步,默认jdbc 'hive_sync.metastore.uris' = 'thrift://dbos-bigdata-test002:9083' -- required metastore的端口 'hive_sync.jdbc_url' = 'jdbc:hive2://dbos-bigdata-test002:10000' -- required hiveServer地址 'hive_sync.table' = 'hdm' -- required hive 新建的表名 'hive_sync.db' = 'hudi' -- required hive 新建的数据库名 'hive_sync.username' = 'hive' -- required HMS 用户名 'hive_sync.password' = '' -- required HMS 密码 'hive_sync.skip_ro_suffix' = 'true' -- 去除ro后缀 );// 创建Hudi(mor)sink表CREATE TABLE hdm( tinyint0 TINYINT smallint1 SMALLINT int2 INT bigint3 BIGINT float4 FLOAT double5 DOUBLE decimal6 DECIMAL(12 3) boolean7 BOOLEAN char8 CHAR(64) varchar9 VARCHAR(64) string10 STRING timestamp11 TIMESTAMP(3) )PARTITIONED BY (tinyint0) WITH ( 'connector' = 'hudi' 'path' = 'hdfs://bigdata/hudi/hdm' 'hoodie.datasource.write.recordkey.field' = 'char8' -- 主键 'write.precombine.field' = 'timestamp11' -- 相同的键值时,取此字段最大值,默认ts字段 'write.tasks' = '1' 'compaction.tasks' = '1' 'write.rate.limit' = '2000' -- 限制每秒多少条 'table.type' = 'MERGE_ON_READ' -- 默认COPY_ON_WRITE 'compaction.async.enabled' = 'true' -- 在线压缩 'compaction.trigger.strategy' = 'num_commits' -- 按次数压缩 'compaction.delta_commits' = '5' -- 默认为5 'hive_sync.enable' = 'true' -- 启用hive同步 'hive_sync.mode' = 'hms' -- 启用hive hms同步,默认jdbc 'hive_sync.metastore.uris' = 'thrift://dbos-bigdata-test002:9083' -- required metastore的端口 'hive_sync.jdbc_url' = 'jdbc:hive2://dbos-bigdata-test002:10000' -- required hiveServer地址 'hive_sync.table' = 'hdm' -- required hive 新建的表名 'hive_sync.db' = 'hudi' -- required hive 新建的数据库名 'hive_sync.username' = 'hive' -- required HMS 用户名 'hive_sync.password' = '' -- required HMS 密码 'hive_sync.skip_ro_suffix' = 'true' -- 去除ro后缀 );

flink部署在hadoop上:超详细步骤 整合Apache Hudi(2)

// 插入source数据insert intohdmselect cast(tinyint0 asTINYINT) cast(smallint1 asSMALLINT) cast(int2 asINT) cast(bigint3 asBIGINT) cast(float4 asFLOAT) cast(double5 asDOUBLE) cast(decimal6 asDECIMAL(38 18)) cast(boolean7 asBOOLEAN) cast(char8 asCHAR(64)) cast(varchar9 asVARCHAR(64)) cast(string10 asSTRING) cast(timestamp11 asTIMESTAMP(3)) fromk;

flink部署在hadoop上:超详细步骤 整合Apache Hudi(3)

以上证明提交成功了,去yarn上查看作业状态

flink部署在hadoop上:超详细步骤 整合Apache Hudi(4)

kafka正常消费了。

多几次往kafka里面造数据

flink部署在hadoop上:超详细步骤 整合Apache Hudi(5)

猜您喜欢: