flink部署在hadoop上:超详细步骤 整合Apache Hudi
flink部署在hadoop上:超详细步骤 整合Apache Hudi
//创建source表
CREATE TABLE k (
tinyint0 TINYINT
smallint1 SMALLINT
int2 INT
bigint3 BIGINT
float4 FLOAT
double5 DOUBLE
decimal6 DECIMAL(38 8)
boolean7 BOOLEAN
char8 STRING
varchar9 STRING
string10 STRING
timestamp11 STRING
) WITH (
'connector' = 'kafka' -- 使用 kafka connector
'topic' = 'test' -- kafka topic名称
'scan.startup.mode' = 'earliest-offset' -- 从起始 offset 开始读取
'properties.bootstrap.servers' = 'dbos-bigdata-test003:9092 dbos-bigdata-test005:9092 dbos-bigdata-test005:9092' -- kafka broker 地址
'properties.group.id' = 'testgroup1'
'value.format' = 'json'
'value.json.fail-on-missing-field' = 'true'
'value.fields-include' = 'ALL'
);
// 创建Hudi(cow)sink表
CREATE TABLE hdm(
tinyint0 TINYINT
smallint1 SMALLINT
int2 INT
bigint3 BIGINT
float4 FLOAT
double5 DOUBLE
decimal6 DECIMAL(12 3)
boolean7 BOOLEAN
char8 CHAR(64)
varchar9 VARCHAR(64)
string10 STRING
timestamp11 TIMESTAMP(3)
)
PARTITIONED BY (tinyint0)
WITH (
'connector' = 'hudi'
'path' = 'hdfs://bigdata/hudi/hdm'
'hoodie.datasource.write.recordkey.field' = 'char8' -- 主键
'write.precombine.field' = 'timestamp11' -- 相同的键值时,取此字段最大值,默认ts字段
'write.tasks' = '1'
'compaction.tasks' = '1'
'write.rate.limit' = '2000' -- 限制每秒多少条
'compaction.async.enabled' = 'true' -- 在线压缩
'compaction.trigger.strategy' = 'num_commits' -- 按次数压缩
'compaction.delta_commits' = '5' -- 默认为5
'hive_sync.enable' = 'true' -- 启用hive同步
'hive_sync.mode' = 'hms' -- 启用hive hms同步,默认jdbc
'hive_sync.metastore.uris' = 'thrift://dbos-bigdata-test002:9083' -- required metastore的端口
'hive_sync.jdbc_url' = 'jdbc:hive2://dbos-bigdata-test002:10000' -- required hiveServer地址
'hive_sync.table' = 'hdm' -- required hive 新建的表名
'hive_sync.db' = 'hudi' -- required hive 新建的数据库名
'hive_sync.username' = 'hive' -- required HMS 用户名
'hive_sync.password' = '' -- required HMS 密码
'hive_sync.skip_ro_suffix' = 'true' -- 去除ro后缀
);
// 创建Hudi(mor)sink表
CREATE TABLE hdm(
tinyint0 TINYINT
smallint1 SMALLINT
int2 INT
bigint3 BIGINT
float4 FLOAT
double5 DOUBLE
decimal6 DECIMAL(12 3)
boolean7 BOOLEAN
char8 CHAR(64)
varchar9 VARCHAR(64)
string10 STRING
timestamp11 TIMESTAMP(3)
)
PARTITIONED BY (tinyint0)
WITH (
'connector' = 'hudi'
'path' = 'hdfs://bigdata/hudi/hdm'
'hoodie.datasource.write.recordkey.field' = 'char8' -- 主键
'write.precombine.field' = 'timestamp11' -- 相同的键值时,取此字段最大值,默认ts字段
'write.tasks' = '1'
'compaction.tasks' = '1'
'write.rate.limit' = '2000' -- 限制每秒多少条
'table.type' = 'MERGE_ON_READ' -- 默认COPY_ON_WRITE
'compaction.async.enabled' = 'true' -- 在线压缩
'compaction.trigger.strategy' = 'num_commits' -- 按次数压缩
'compaction.delta_commits' = '5' -- 默认为5
'hive_sync.enable' = 'true' -- 启用hive同步
'hive_sync.mode' = 'hms' -- 启用hive hms同步,默认jdbc
'hive_sync.metastore.uris' = 'thrift://dbos-bigdata-test002:9083' -- required metastore的端口
'hive_sync.jdbc_url' = 'jdbc:hive2://dbos-bigdata-test002:10000' -- required hiveServer地址
'hive_sync.table' = 'hdm' -- required hive 新建的表名
'hive_sync.db' = 'hudi' -- required hive 新建的数据库名
'hive_sync.username' = 'hive' -- required HMS 用户名
'hive_sync.password' = '' -- required HMS 密码
'hive_sync.skip_ro_suffix' = 'true' -- 去除ro后缀
);
// 插入source数据
insert intohdm
select
cast(tinyint0 asTINYINT)
cast(smallint1 asSMALLINT)
cast(int2 asINT)
cast(bigint3 asBIGINT)
cast(float4 asFLOAT)
cast(double5 asDOUBLE)
cast(decimal6 asDECIMAL(38 18))
cast(boolean7 asBOOLEAN)
cast(char8 asCHAR(64))
cast(varchar9 asVARCHAR(64))
cast(string10 asSTRING)
cast(timestamp11 asTIMESTAMP(3))
fromk;
以上证明提交成功了,去yarn上查看作业状态
kafka正常消费了。
多几次往kafka里面造数据