快捷搜索:  汽车  科技

数据清洗分析是如何进行的(数据治理十)

数据清洗分析是如何进行的(数据治理十)#!/bin/bash ####################################################################### ### 根据ODS层: ### ### TO_YCAK_MAC_D 机器基本信息日全量表 ### ### TO_YCAK_MAC_LOC_D 机器位置信息日全量表 ### ### TO_YCBK_MAC_ADMIN_MAP_D 机器客户映射关系资料日全量表 ### ### TO_YCBK_MAC_STORE_MAP_D 机器门店映射关系日全量表 ### ### TO_Y

#头条创作挑战赛#

Atlas 案例演示

由于 Atlas 目前版本对 Hive 元数据监控比较好,这里我们改写了数仓“商户营收业务”业务,只使用 Hive Shell 脚本实现,后期来演示 Atlas 对元数据的管理。

“商户营收业务”数仓分层图:

数据清洗分析是如何进行的(数据治理十)(1)

一、创建所有 Hive 表

在 node3 上执行数仓“商户营收业务”创建所有表的 SQL 脚本:

数据清洗分析是如何进行的(数据治理十)(2)

CREATE EXTERNAL TABLE `TO_YCAK_MAC_D`( `MID` int `SRL_ID` string `HARD_ID` string `SONG_WHSE_VER` string `EXEC_VER` string `UI_VER` string `IS_ONLINE` string `STS` int `CUR_LOGIN_TM` string `PAY_SW` string `LANG` int `SONG_WHSE_TYPE` int `SCR_TYPE` int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCAK_MAC_D'; CREATE EXTERNAL TABLE `TO_YCAK_MAC_LOC_D`( `MID` int `PRVC_ID` int `CTY_ID` int `PRVC` string `CTY` string `MAP_CLSS` string `LON` string `LAT` string `ADDR` string `ADDR_FMT` string `REV_TM` string `SALE_TM` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCAK_MAC_LOC_D'; CREATE EXTERNAL TABLE `TO_YCBK_MAC_ADMIN_MAP_D`( `MID` int `MAC_NM` string `PKG_NUM` int `PKG_NM` string `INV_RATE` double `AGE_RATE` double `COM_RATE` double `PAR_RATE` double `DEPOSIT` double `SCENE_PRVC_ID` string `SCENE_CTY_ID` string `SCENE_AREA_ID` string `SCENE_ADDR` string `PRDCT_TYPE` string `SERIAL_NUM` string `HAD_MPAY_FUNC` int `IS_ACTV` int `ACTV_TM` string `ORDER_TM` string `GROUND_NM` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_MAC_ADMIN_MAP_D'; CREATE EXTERNAL TABLE `TO_YCBK_MAC_STORE_MAP_D`( `STORE_ID` int `MID` int `PRDCT_TYPE` int `ADMINID` int `CREAT_TM` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_MAC_STORE_MAP_D'; CREATE EXTERNAL TABLE `TO_YCBK_STORE_D`( `ID` int `STORE_NM` string `TAG_ID` string `TAG_NM` string `SUB_TAG_ID` string `SUB_TAG_NM` string `PRVC_ID` string `CTY_ID` string `AREA_ID` string `ADDR` string `GROUND_NM` string `BUS_TM` string `CLOS_TM` string `SUB_SCENE_CATGY_ID` string `SUB_SCENE_CATGY_NM` string `SUB_SCENE_ID` string `SUB_SCENE_NM` string `BRND_ID` string `BRND_NM` string `SUB_BRND_ID` string `SUB_BRND_NM` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_STORE_D'; CREATE EXTERNAL TABLE `TO_YCBK_PRVC_D`( `PRVC_ID` int `PRVC` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_PRVC_D'; CREATE EXTERNAL TABLE `TO_YCBK_CITY_D`( `PRVC_ID` int `CTY_ID` int `CTY` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_CITY_D'; CREATE EXTERNAL TABLE `TO_YCBK_AREA_D`( `CTY_ID` int `AREA_ID` int `AREA` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TO_YCBK_AREA_D'; CREATE EXTERNAL TABLE `TW_MAC_BASEINFO_D`( `MID` int `MAC_NM` string `SONG_WHSE_VER` string `EXEC_VER` string `UI_VER` string `HARD_ID` string `SALE_TM` string `REV_TM` string `OPER_NM` string `PRVC` string `CTY` string `AREA` string `ADDR` string `STORE_NM` string `SCENCE_CATGY` string `SUB_SCENCE_CATGY` string `SCENE` string `SUB_SCENE` string `BRND` string `SUB_BRND` string `PRDCT_NM` string `PRDCT_TYP` int `BUS_MODE` string `INV_RATE` double `AGE_RATE` double `COM_RATE` double `PAR_RATE` double `IS_ACTV` int `ACTV_TM` string `PAY_SW` int `PRTN_NM` string `CUR_LOGIN_TM` string ) PARTITIONED BY (data_dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TW_MAC_BASEINFO_D'; CREATE EXTERNAL TABLE `TO_YCAK_USR_D`( `UID` int `REG_MID` int `GDR` string `BIRTHDAY` string `MSISDN` string `LOC_ID` int `LOG_MDE` int `REG_TM` string `USR_EXP` string `SCORE` int `LEVEL` int `WX_ID` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_D'; CREATE EXTERNAL TABLE `TO_YCAK_USR_ALI_D`( `UID` int `REG_MID` int `GDR` string `BIRTHDAY` string `MSISDN` string `LOC_ID` int `LOG_MDE` int `REG_TM` string `USR_EXP` string `SCORE` int `LEVEL` int `USR_TYPE` string `IS_CERT` string `IS_STDNT` string `ALY_ID` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_ALI_D'; CREATE EXTERNAL TABLE `TO_YCAK_USR_QQ_D`( `UID` int `REG_MID` int `GDR` string `BIRTHDAY` string `MSISDN` string `LOC_ID` int `LOG_MDE` int `REG_TM` string `USR_EXP` string `SCORE` int `LEVEL` int `QQID` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_QQ_D'; CREATE EXTERNAL TABLE `TO_YCAK_USR_APP_D`( `UID` int `REG_MID` int `GDR` string `BIRTHDAY` string `MSISDN` string `LOC_ID` int `REG_TM` string `USR_EXP` string `LEVEL` int `APP_ID` string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_APP_D'; CREATE EXTERNAL TABLE `TO_YCAK_USR_LOGIN_D`( `ID` int `UID` int `MID` int `LOGIN_TM` string `LOGOUT_TM` string `MODE_TYPE` int ) PARTITIONED BY (`data_dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_LOGIN_D'; CREATE EXTERNAL TABLE `TW_USR_BASEINFO_D`( `UID` int `REG_MID` int `REG_CHNL` string `REF_UID` string `GDR` string `BIRTHDAY` string `MSISDN` string `LOC_ID` int `LOG_MDE` string `REG_DT` string `REG_TM` string `USR_EXP` string `SCORE` int `LEVEL` int `USR_TYPE` string `IS_CERT` string `IS_STDNT` string ) PARTITIONED BY (`data_dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TW_USR_BASEINFO_D'; CREATE EXTERNAL TABLE `TO_YCAK_USR_LOC_D`( `ID` int `UID` int `LAT` string `LNG` string `DATETIME` string `MID` string ) PARTITIONED BY (data_dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_USR_LOC_D'; CREATE EXTERNAL TABLE `TW_MAC_LOC_D`( `MID` int `X` string `Y` string `CNT` int `ADDER` string `PRVC` string `CTY` string `CTY_CD` string `DISTRICT` string `AD_CD` string `TOWN_SHIP` string `TOWN_CD` string `NB_NM` string `NB_TP` string `BD_NM` string `BD_TP` string `STREET` string `STREET_NB` string `STREET_LOC` string `STREET_DRCTION` string `STREET_DSTANCE` string `BUS_INFO` string ) PARTITIONED BY (data_dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TW_MAC_LOC_D'; CREATE EXTERNAL TABLE `TO_YCAK_CNSM_D`( `ID` int `MID` int `PRDCD_TYPE` int `PAY_TYPE` int `PKG_ID` int `PKG_NM` string `AMT` int `CNSM_ID` string `ORDR_ID` string `TRD_ID` string `ACT_TM` string `UID` int `NICK_NM` string `ACTV_ID` int `ACTV_NM` string `CPN_TYPE` int `CPN_TYPE_NM` string `PKG_PRC` int `PKG_DSCNT` int `ORDR_TYPE` int `BILL_DT` int ) PARTITIONED BY (data_dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TO_YCAK_CNSM_D'; CREATE EXTERNAL TABLE `TW_CNSM_BRIEF_D`( `ID` int `TRD_ID` string `UID` string `MID` int `PRDCD_TYPE` int `PAY_TYPE` int `ACT_TM` string `PKG_ID` int `COIN_PRC` int `COIN_CNT` int `UPDATE_TM` string `ORDR_ID` string `ACTV_NM` string `PKG_PRC` int `PKG_DSCNT` int `CPN_TYPE` int `ABN_TYP` int ) PARTITIONED BY (data_dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TW_CNSM_BRIEF_D'; CREATE EXTERNAL TABLE `TW_MAC_STAT_D`( `MID` int `MAC_NM` string `PRDCT_TYPE` string `STORE_NM` int `BUS_MODE` string `PAY_SW` string `SCENCE_CATGY` string `SUB_SCENCE_CATGY` string `SCENE` string `SUB_SCENE` string `BRND` string `SUB_BRND` string `PRVC` string `CTY` string `AREA` string `AGE_ID` string `INV_RATE` string `AGE_RATE` string `COM_RATE` string `PAR_RATE` string `PKG_ID` string `PAY_TYPE` string `CNSM_USR_CNT` string `REF_USR_CNT` string `NEW_USR_CNT` string `REV_ORDR_CNT` string `REF_ORDR_CNT` string `TOT_REV` string `TOT_REF` string ) PARTITIONED BY (data_dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/machine/TW_MAC_STAT_D'; CREATE EXTERNAL TABLE `TM_USR_MRCHNT_STAT_D`( `ADMIN_ID` string `PAY_TYPE` int `REV_ORDR_CNT` int `REF_ORDR_CNT` int `TOT_REV` double `TOT_REF` double `TOT_INV_REV` DECIMAL(10 2) `TOT_AGE_REV` DECIMAL(10 2) `TOT_COM_REV` DECIMAL(10 2) `TOT_PAR_REV` DECIMAL(10 2) ) PARTITIONED BY (DATA_DT string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://mycluster/user/hive/warehouse/data/user/TM_USR_MRCHNT_STAT_D';

[root@node3 test]# hive -f ./CreateAllHiveTables.sql

执行如下命令,将 mysql 数据导入到 ODS 层中,注意输入时间:

mysql 数据导入到 ODS 所有表的脚本附件:

[root@node3 ~]# cd /root/test [root@node3 test]# sh all_mysql_to_ods.sh 20220413

数据清洗分析是如何进行的(数据治理十)(3)

#!/bin/bash ################################################################### ### 将所有mysql中的数据导入到ODS中 ### ################################################################### if [ x"$1" = x ]; then echo "====没有导入数据的日期,输入日期====" exit else echo "====使用传入的日期 ====" currentDate=$1 fi echo "日期为 : $currentDate" sh /root/test/ods_mysqltohive_to_ycak_cnsm_d.sh $1 sh /root/test/ods_mysqltohive_to_ycak_mac_d.sh $1 sh /root/test/ods_mysqltohive_to_ycak_mac_loc_d.sh $1 sh /root/test/ods_mysqltohive_to_ycak_usr_ali_d.sh $1 sh /root/test/ods_mysqltohive_to_ycak_usr_app_d.sh $1 sh /root/test/ods_mysqltohive_to_ycak_usr_d.sh $1 sh /root/test/ods_mysqltohive_to_ycak_usr_loc_d.sh $1 sh /root/test/ods_mysqltohive_to_ycak_usr_login_d.sh $1 sh /root/test/ods_mysqltohive_to_ycak_usr_qq_d.sh $1 sh /root/test/ods_mysqltohive_to_ycbk_area_d.sh $1 sh /root/test/ods_mysqltohive_to_ycbk_city_d.sh $1 sh /root/test/ods_mysqltohive_to_ycbk_mac_admin_map_d.sh $1 sh /root/test/ods_mysqltohive_to_ycbk_mac_store_map_d.sh $1 sh /root/test/ods_mysqltohive_to_ycbk_prvc_d.sh $1 sh /root/test/ods_mysqltohive_to_ycbk_store_d.sh $1

查看 Atlas 中监控到的创建 Hive 表

数据清洗分析是如何进行的(数据治理十)(4)

二、编写处理业务 Shell 脚本

以下脚本包含处理“商户营收业务”所有脚本,这些脚本都是 Hive Shell 的脚本,调用时需要传入参数,也可以使用 Azkaban 进行调度。

1、ODS 层数据表获取 EDS 层 TW_MAC_BASEINFO_D 机器的基本信息表脚本附件:

数据清洗分析是如何进行的(数据治理十)(5)

#!/bin/bash ####################################################################### ### 根据ODS层: ### ### TO_YCAK_MAC_D 机器基本信息日全量表 ### ### TO_YCAK_MAC_LOC_D 机器位置信息日全量表 ### ### TO_YCBK_MAC_ADMIN_MAP_D 机器客户映射关系资料日全量表 ### ### TO_YCBK_MAC_STORE_MAP_D 机器门店映射关系日全量表 ### ### TO_YCBK_STORE_D 门店信息日全量表 ### ### TO_YCBK_PRVC_D 机器省份日全量表 ### ### TO_YCBK_CITY_D 机器城市日全量表 ### ### TO_YCBK_AREA_D 机器区县日全量表 ### ### 获取EDS层表 TW_MAC_BASEINFO_D 机器的基本信息 ### ####################################################################### if [ x"$1" = x ]; then echo "====没有导入数据的日期,输入日期====" exit else echo "====使用传入的日期 ====" currentDate=$1 fi echo "日期为 : $currentDate" `hive -e "set hive.exec.mode.local.auto=true"` `hive -e "insert overwrite table tw_mac_baseinfo_d partition(data_dt='${currentDate}') select YCAK.MID --机器ID YCBK.MAC_NM --机器名称 YCAK.SONG_WHSE_VER --歌曲版本 YCAK.EXEC_VER --系统版本号 YCAK.UI_VER --歌曲UI版本号 YCAK.HARD_ID --硬件ID YCAK.SALE_TM --销售时间 YCAK.REV_TM --运营时间 YCBK.STORE_NM as OPER_NM --运营商名称 if (YCAK.PRVC is null YCBK.PRVC YCAK.PRVC) as PRVC --机器所在省 if (YCAK.CTY is null YCBK.CTY YCAK.CTY) as CTY --机器所在市 YCBK.AREA --机器所在区域 if (YCAK.ADDR_FMT is null YCBK.ADDR YCAK.ADDR_FMT) as ADDR --机器详细地址 YCBK.STORE_NM --门店名称 YCBK.TAG_NM as SCENCE_CATGY --主场景名称 YCBK.SUB_SCENE_CATGY_NM as SUB_SCENCE_CATGY --子场景分类名称 YCBK.SUB_TAG_NM as SCENE --主场景分类名称 YCBK.SUB_SCENE_NM as SUB_SCENE --子场景名称 YCBK.BRND_NM as BRND --主场景品牌 YCBK.SUB_BRND_NM as SUB_BRND --子场景品牌 YCBK.PKG_NM as PRDCT_NM --产品名称 2 as PRDCT_TYP --产品类型 case when YCBK.PKG_NM = '联营版' then '联营' when YCBK.INV_RATE < 100 then '联营' else '卖断' end BUS_MODE --运营模式 YCBK.INV_RATE --投资人分成比例 YCBK.AGE_RATE --代理人、联盟人分成比例 YCBK.COM_RATE --公司分成比例 YCBK.PAR_RATE --合作方分成比例 if (YCAK.STS is null YCBK.IS_ACTV YCAK.STS) as IS_ACTV --是否激活 YCBK.ACTV_TM --激活时间 if (YCAK.PAY_SW is null YCBK.PAY_SW YCAK.PAY_SW) as PAY_SW --是否开通移动支付 YCBK.STORE_NM as PRTN_NM --代理人姓名,这里获取门店名称 YCAK.CUR_LOGIN_TM --最近登录时间 FROM ( SELECT TEMP.MID --机器ID MAC.SRL_ID --序列号 MAC.HARD_ID --硬件ID MAC.SONG_WHSE_VER --歌库版本号 MAC.EXEC_VER --系统版本号 MAC.UI_VER --歌库UI版本号 MAC.STS --激活状态 MAC.CUR_LOGIN_TM --最近登录时间 MAC.PAY_SW --支付开关是否打开 MAC.IS_ONLINE --是否在线 2 as PRDCT_TYPE --产品类型,2 LOC.PRVC --机器所在省份 LOC.CTY --机器所在城市 LOC.ADDR_FMT --详细地址 LOC.REV_TM --运营时间 LOC.SALE_TM --销售时间 from (select MID from TO_YCAK_MAC_D union select MID from TO_YCBK_MAC_ADMIN_MAP_D) as TEMP left join TO_YCAK_MAC_D as MAC on TEMP.MID = MAC.MID left join TO_YCAK_MAC_LOC_D as LOC on TEMP.MID = LOC.MID ) as YCAK LEFT JOIN ( select TEMP.MID --机器ID MA.MAC_NM --机器名称 MA.PKG_NM --套餐名称 MA.INV_RATE --投资人分成比例 MA.AGE_RATE --承接方分成比例 MA.COM_RATE --公司分成比例 MA.PAR_RATE --合作方分成比例 MA.IS_ACTV --是否激活 MA.ACTV_TM --激活时间 MA.HAD_MPAY_FUNC as PAY_SW --支付开关是否打开 PRVC.PRVC --省份 CTY.CTY --城市 AREA.AREA --区、县 CONCAT(MA.SCENE_ADDR MA.GROUND_NM) as ADDR --场景地址 场地名称 STORE.GROUND_NM as STORE_NM --门店名称 这里的store_nm都是数字 STORE.TAG_NM --主场景名称 STORE.SUB_TAG_NM --主场景分类 STORE.SUB_SCENE_CATGY_NM --子场景分类名称 STORE.SUB_SCENE_NM --子场景名称 STORE.BRND_NM --品牌名称 STORE.SUB_BRND_NM --子品牌名称 from (select MID from TO_YCAK_MAC_D union select MID from TO_YCBK_MAC_ADMIN_MAP_D) as TEMP left join TO_YCBK_MAC_ADMIN_MAP_D as MA on TEMP.MID = MA.MID left join TO_YCBK_PRVC_D as PRVC on MA.SCENE_PRVC_ID = PRVC.PRVC_ID left join TO_YCBK_CITY_D as CTY on MA.SCENE_CTY_ID = CTY.CTY_ID left join TO_YCBK_AREA_D as AREA on MA.SCENE_AREA_ID = AREA.AREA_ID left join TO_YCBK_MAC_STORE_MAP_D as SMA on TEMP.MID = SMA.MID left join TO_YCBK_STORE_D as STORE on SMA.STORE_ID = STORE.ID ) as YCBK ON YCAK.MID = YCBK.MID"` 2、ODS 层数据表获取 EDS 层 TW_USR_BASEINFO_D 活跃用户信息数据表脚本附件:

数据清洗分析是如何进行的(数据治理十)(6)

#!/bin/bash ################################################################### ### 根据 YCAK 库中所有用户信息获取表 TW_USR_BASEINFO_D 用户信息 ### ################################################################### if [ x"$1" = x ]; then echo "====没有导入数据的日期,输入日期====" exit else echo "====使用传入的日期 ====" currentDate=$1 fi echo "日期为 : $currentDate" `hive -e "insert overwrite table TW_USR_BASEINFO_D partition (data_dt = ${currentDate}) SELECT UID --用户ID REG_MID --机器ID '1' AS REG_CHNL -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道 WX_ID AS REF_UID --微信账号 GDR --性别 BIRTHDAY --生日 MSISDN --手机号码 LOC_ID --地区ID LOG_MDE --注册登录方式 substring(REG_TM 1 8) AS REG_DT --注册日期 substring(REG_TM 9 6) AS REG_TM --注册时间 USR_EXP --用户当前经验值 SCORE --累计积分 LEVEL --用户等级 '2' AS USR_TYPE --用户类型 1-企业 2-个人 NULL AS IS_CERT --实名认证 NULL AS IS_STDNT --是否是学生 FROM TO_YCAK_USR_D UNION SELECT UID --用户ID REG_MID --机器ID '2' AS REG_CHNL -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道 ALY_ID AS REF_UID --支付宝账号 GDR --性别 BIRTHDAY --生日 MSISDN --手机号码 LOC_ID --地区ID LOG_MDE --注册登录方式 substring(REG_TM 1 8) AS REG_DT --注册日期 substring(REG_TM 9 6) AS REG_TM --注册时间 USR_EXP --用户当前经验值 SCORE --累计积分 LEVEL --用户等级 NVL(USR_TYPE '2') AS USR_TYPE --用户类型 1-企业 2-个人 IS_CERT --实名认证 IS_STDNT --是否是学生 FROM TO_YCAK_USR_ALI_D UNION SELECT UID --用户ID REG_MID --机器ID '3' AS REG_CHNL -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道 QQID AS REF_UID --QQ账号 GDR --性别 BIRTHDAY --生日 MSISDN --手机号码 LOC_ID --地区ID LOG_MDE --注册登录方式 substring(REG_TM 1 8) AS REG_DT --注册日期 substring(REG_TM 9 6) AS REG_TM --注册时间 USR_EXP --用户当前经验值 SCORE --累计积分 LEVEL --用户等级 '2' AS USR_TYPE --用户类型 1-企业 2-个人 NULL AS IS_CERT --实名认证 NULL AS IS_STDNT --是否是学生 FROM TO_YCAK_USR_QQ_D UNION SELECT UID --用户ID REG_MID --机器ID '4' AS REG_CHNL -- 1-微信渠道,2-支付宝渠道,3-QQ渠道,4-APP渠道 APP_ID AS REF_UID --APP账号 GDR --性别 BIRTHDAY --生日 MSISDN --手机号码 LOC_ID --地区ID NULL AS LOG_MDE --注册登录方式 substring(REG_TM 1 8) AS REG_DT --注册日期 substring(REG_TM 9 6) AS REG_TM --注册时间 USR_EXP --用户当前经验值 0 AS SCORE --累计积分 LEVEL --用户等级 '2' AS USR_TYPE --用户类型 1-企业 2-个人 NULL AS IS_CERT --实名认证 NULL AS IS_STDNT --是否是学生 FROM TO_YCAK_USR_APP_D"` 3、ODS 层数据表获取 EDS 层 TW_CNSM_BRIEF_D 消费退款订单流水日增量表脚本附件:

数据清洗分析是如何进行的(数据治理十)(7)

#!/bin/bash ################################################################### ### 根据 YCAK 库中用户消费订单明细表 TO_YCAK_CNSM_D ### ### 获取 EDS 层 TW_CNSM_BRIEF_D 消费退款订单流水日增量表 ### ################################################################### if [ x"$1" = x ]; then echo "====没有导入数据的日期,输入日期====" exit else echo "====使用传入的日期 ====" currentDate=$1 fi echo "日期为 : $currentDate" `hive -e "insert overwrite table TW_CNSM_BRIEF_D partition (data_dt=${currentDate}) select ID --ID TRD_ID --第三方交易编号 cast(UID as string) AS UID --用户ID MID --机器ID PRDCD_TYPE --产品类型 PAY_TYPE --支付类型 ACT_TM --消费时间 PKG_ID --套餐ID case when AMT<0 then AMT*-1 else AMT end AS COIN_PRC --币值 1 AS COIN_CNT --币数 ,单位分 ACT_TM as UPDATE_TM --状态更新时间 ORDR_ID --订单ID ACTV_NM --优惠活动名称 PKG_PRC --套餐原价 PKG_DSCNT --套餐优惠价 CPN_TYPE --优惠券类型 CASE WHEN ORDR_TYPE = 1 THEN 0 WHEN ORDR_TYPE = 2 THEN 1 WHEN ORDR_TYPE = 3 THEN 2 WHEN ORDR_TYPE = 4 THEN 2 END AS ABN_TYP --异常类型:0-无异常 1-异常订单 2-商家退款 FROM TO_YCAK_CNSM_D WHERE DATA_DT = ${currentDate} "` 4、EDS-DWD 层数据获取 EDS-DWS 层 TW_MAC_STAT_D 机器日营收情况统计表脚本附件:

数据清洗分析是如何进行的(数据治理十)(8)

#!/bin/bash ################################################################### ### 根据 EDS-DWD 层中: ### ### TW_MAC_BASEINFO_D 机器基础信息日全量表 ### ### TW_USR_BASEINFO_D 活跃用户基础信息日增量表 ### ### TW_CNSM_BRIEF_D 消费退款订单流水日增量表 ### ### 获取 EDS-DWS 层 TW_MAC_STAT_D 机器日营收情况统计表 ### ################################################################### if [ x"$1" = x ]; then echo "====没有导入数据的日期,输入日期====" exit else echo "====使用传入的日期 ====" currentDate=$1 fi echo "日期为 : $currentDate" `hive -e "insert overwrite table TW_MAC_STAT_D partition (data_dt = ${currentDate}) SELECT A.MID --机器ID A.MAC_NM --机器名称 A.PRDCT_TYP --产品类型 A.STORE_NM --门店名称 A.BUS_MODE --运营模式 A.PAY_SW --是否开通移动支付 A.SCENCE_CATGY --主场景分类 A.SUB_SCENCE_CATGY --子场景分类 A.SCENE --主场景 A.SUB_SCENE --子场景 A.BRND --主场景品牌 A.SUB_BRND --子场景品牌 A.PRVC --省份 A.CTY --城市 A.AREA --区县 A.PRTN_NM as AGE_ID --代理人ID A.INV_RATE --投资人分成比例 A.AGE_RATE --代理人、联盟人分成比例 A.COM_RATE --公司分成比例 A.PAR_RATE --合作方分成比例 C.PKG_ID --套餐ID C.PAY_TYPE --支付类型 NVL(C.CNSM_USR_CNT 0) AS CNSM_USR_CNT --总消费用户数 NVL(D.REF_USR_CNT 0) AS REF_USR_CNT --总退款用户数 NVL(E.NEW_USR_CNT 0) AS NEW_USR_CNT --总新增用户数 NVL(C.REV_ORDR_CNT 0) AS REV_ORDR_CNT --总营收订单数 NVL(D.REF_ORDR_CNT 0) AS REF_ORDR_CNT --总退款订单数 NVL(C.TOT_REV 0) AS TOT_REV --总营收 NVL(D.TOT_REF 0) AS TOT_REF --总退款 FROM (SELECT * FROM TW_MAC_BASEINFO_D WHERE DATA_DT = ${currentDate}) A --机器基础信息 LEFT JOIN ( select MID --机器ID PKG_ID --套餐ID PAY_TYPE --支付类型 COUNT(DISTINCT UID) as CNSM_USR_CNT --总消费用户数 SUM(COIN_CNT * COIN_PRC) as TOT_REV --总营收 COUNT(ORDR_ID) as REV_ORDR_CNT --总营收订单数 from TW_CNSM_BRIEF_D where ABN_TYP = 0 AND DATA_DT = ${currentDate} group by MID PKG_ID PAY_TYPE ) C on A.MID = C.MID --机器当日营收信息 LEFT JOIN ( select MID --机器ID PKG_ID --套餐ID PAY_TYPE --支付类型 COUNT(DISTINCT UID) as REF_USR_CNT --总退款用户数 SUM(COIN_CNT * COIN_PRC) as TOT_REF --总退款 COUNT(ORDR_ID) as REF_ORDR_CNT --总退款订单数 from TW_CNSM_BRIEF_D where ABN_TYP = 2 group by MID PKG_ID PAY_TYPE ) D on A.MID = D.MID AND C.MID = D.MID AND C.PKG_ID = D.PKG_ID AND C.PAY_TYPE = D.PAY_TYPE --机器当日退款信息 LEFT JOIN ( select REG_MID as MID --机器ID count(UID) as NEW_USR_CNT --新增用户个数 from TW_USR_BASEINFO_D where REG_DT = ${currentDate} group by REG_MID ) E on A.MID = E.MID --机器当日新增用户信息 "` 5、EDS-DWS 层数据获取 DM 层 TM_USR_MRCHNT_STAT_D 商户日营收统计表脚本附件:

数据清洗分析是如何进行的(数据治理十)(9)

#!/bin/bash ################################################################### ### 根据 EDS-DWS 层中: ### ### TW_MAC_STAT_D 机器日营收情况统计表 ### ### 获取DM层 TM_USR_MRCHNT_STAT_D 商户日营收统计表 ### ################################################################### if [ x"$1" = x ]; then echo "====没有导入数据的日期,输入日期====" exit else echo "====使用传入的日期 ====" currentDate=$1 fi echo "日期为 : $currentDate" `hive -e "insert overwrite table TM_USR_MRCHNT_STAT_D partition (data_dt=${currentDate}) select AGE_ID AS ADMIN_ID --代理人 PAY_TYPE SUM(REV_ORDR_CNT) AS REV_ORDR_CNT --总营收订单数 SUM(REF_ORDR_CNT) AS REF_ORDR_CNT --总退款订单数 CAST(SUM(TOT_REV) AS DECIMAL(10 2)) AS TOT_REV --总营收 CAST(SUM(TOT_REF) AS DECIMAL(10 2)) AS TOT_REF --总退款 CAST(SUM(TOT_REV * NVL(INV_RATE 0)) AS DECIMAL(10 2)) AS TOT_INV_REV --投资人营收 CAST(SUM(TOT_REV * NVL(AGE_RATE 0)) AS DECIMAL(10 2)) AS TOT_AGE_REV --代理人营收 CAST(SUM(TOT_REV * NVL(COM_RATE 0)) AS DECIMAL(10 2)) AS TOT_COM_REV --公司营收 CAST(SUM(TOT_REV * NVL(PAR_RATE 0)) AS DECIMAL(10 2)) AS TOT_PAR_REV --合伙人营收 from TW_MAC_STAT_D WHERE DATA_DT = ${currentDate} GROUP BY AGE_ID PAY_TYPE "` 三、手动执行脚本

注意:执行脚本时需要传入时间:

[root@node3 test]# sh ProduceShell1.sh 20220413 [root@node3 test]# sh ProduceShell2.sh 20220413 [root@node3 test]# sh ProduceShell3.sh 20220413 [root@node3 test]# sh ProduceShell4.sh 20220413 [root@node3 test]# sh ProduceShell5.sh 20220413 四、Atlas 中查看表元数据

查看 EDS 层表 TW_MAC_BASEINFO_D 机器的基本信息表血缘关系:

数据清洗分析是如何进行的(数据治理十)(10)

查看 EDS 层表 TW_USR_BASEINFO_D 活跃用户信息数据表血缘关系:

数据清洗分析是如何进行的(数据治理十)(11)

查看 EDS 层表 TW_CNSM_BRIEF_D 消费退款订单流水日增量表血缘关系:

数据清洗分析是如何进行的(数据治理十)(12)

查看 EDS-DWS 层 TW_MAC_STAT_D 机器日营收情况统计表血缘关系:

数据清洗分析是如何进行的(数据治理十)(13)

查看 DM 层 TM_USR_MRCHNT_STAT_D 商户日营收统计表血缘关系:

数据清洗分析是如何进行的(数据治理十)(14)

以上除了可以查看表之间的血缘关系还可以查看字段的血缘关系,以 EDS-DWS 层表 TW_MAC_STAT_D 机器日营收情况统计表中的“机器-MID”字段为例,查看字段的血缘关系如下:

数据清洗分析是如何进行的(数据治理十)(15)

我们可以根据 Atlas 提供的表、字段的血缘关系及时定位问题,加快数据分析效率。

猜您喜欢: