菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
433
0

项目实战从0到1之hive(39)大数据项目之电商数仓(用户行为数据)(七)

原创
05/13 14:22
阅读数 69969

第9章 数仓搭建之DWD层

对ODS层数据进行清洗(去除空值,脏数据,超过极限范围的数据,行式存储改为列存储,改压缩格式)。

9.1 DWD层启动表数据解析

9.1.1 创建启动表

1)建表语句

drop table if exists dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,  
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`entry` string,
`open_ad_type` string,
`action` string,
`loading_time` string,
`detail` string,
`extend1` string
)
PARTITIONED BY (dt string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_start_log/'
TBLPROPERTIES('parquet.compression'='lzo');

9.1.2 向启动表导入数据

 insert overwrite table dwd_start_log
PARTITION (dt='2020-10-14')
select
  get_json_object(line,'$.mid') mid_id,
  get_json_object(line,'$.uid') user_id,
  get_json_object(line,'$.vc') version_code,
  get_json_object(line,'$.vn') version_name,
  get_json_object(line,'$.l') lang,
  get_json_object(line,'$.sr') source,
  get_json_object(line,'$.os') os,
  get_json_object(line,'$.ar') area,
  get_json_object(line,'$.md') model,
  get_json_object(line,'$.ba') brand,
  get_json_object(line,'$.sv') sdk_version,
  get_json_object(line,'$.g') gmail,
  get_json_object(line,'$.hw') height_width,
  get_json_object(line,'$.t') app_time,
  get_json_object(line,'$.nw') network,
  get_json_object(line,'$.ln') lng,
  get_json_object(line,'$.la') lat,
  get_json_object(line,'$.entry') entry,
  get_json_object(line,'$.open_ad_type') open_ad_type,
  get_json_object(line,'$.action') action,
  get_json_object(line,'$.loading_time') loading_time,
  get_json_object(line,'$.detail') detail,
  get_json_object(line,'$.extend1') extend1
from ods_start_log
where dt='2020-10-14';

2)测试

select * from dwd_start_log limit 2;

9.1.3 DWD层启动表加载数据脚本

1)在hadoop102的/home/kgg/bin目录下创建脚本

[kgg@hadoop102 bin]$ vim dwd_start_log.sh
  在脚本中编写如下内容
#!/bin/bash

# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
   do_date=$1
else
   do_date=`date -d "-1 day" +%F`  
fi
echo "===日志日期为 $do_date==="
sql="
set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table "$APP".dwd_start_log
PARTITION (dt='$do_date')
select
  get_json_object(line,'$.mid') mid_id,
  get_json_object(line,'$.uid') user_id,
  get_json_object(line,'$.vc') version_code,
  get_json_object(line,'$.vn') version_name,
  get_json_object(line,'$.l') lang,
  get_json_object(line,'$.sr') source,
  get_json_object(line,'$.os') os,
  get_json_object(line,'$.ar') area,
  get_json_object(line,'$.md') model,
  get_json_object(line,'$.ba') brand,
  get_json_object(line,'$.sv') sdk_version,
  get_json_object(line,'$.g') gmail,
  get_json_object(line,'$.hw') height_width,
  get_json_object(line,'$.t') app_time,
  get_json_object(line,'$.nw') network,
  get_json_object(line,'$.ln') lng,
  get_json_object(line,'$.la') lat,
  get_json_object(line,'$.entry') entry,
  get_json_object(line,'$.open_ad_type') open_ad_type,
  get_json_object(line,'$.action') action,
  get_json_object(line,'$.loading_time') loading_time,
  get_json_object(line,'$.detail') detail,
  get_json_object(line,'$.extend1') extend1
from "$APP".ods_start_log
where dt='$do_date';"

$hive -e "$sql"

2)增加脚本执行权限

[kgg@hadoop102 bin]$ chmod 777 dwd_start_log.sh

3)脚本使用

[kgg@hadoop102 module]$ dwd_start_log.sh 2019-02-11

4)查询导入结果

select * from dwd_start_log where dt='2019-02-11' limit 2;

5)脚本执行时间

企业开发中一般在每日凌晨30分~1点

9.2 DWD层事件表数据解析

9.2.1 创建基础明细表

明细表用于存储ODS层原始表转换过来的明细数据。

img

1)创建事件日志基础明细表

drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE dwd_base_event_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`event_name` string,
`event_json` string,
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/'
TBLPROPERTIES('parquet.compression'='lzo');

2)说明

其中event_name和event_json用来对应事件名和整个事件。这个地方将原始日志1对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到UDF和UDTF。

9.2.2 自定义UDF函数(解析公共字段)

img

1)创建一个maven工程:hivefunction 2)创建包名:com.kgg.udf 3)在pom.xml文件中添加如下内容

<properties>
   <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
   <hive.version>1.2.1</hive.version>
</properties>

<dependencies>
   <!--添加hive依赖-->
   <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-exec</artifactId>
       <version>${hive.version}</version>
   </dependency>
</dependencies>

<build>
   <plugins>
       <plugin>
           <artifactId>maven-compiler-plugin</artifactId>
           <version>2.3.2</version>
           <configuration>
               <source>1.8</source>
               <target>1.8</target>
           </configuration>
       </plugin>
       <plugin>
           <artifactId>maven-assembly-plugin</artifactId>
           <configuration>
               <descriptorRefs>
                   <descriptorRef>jar-with-dependencies</descriptorRef>
               </descriptorRefs>
           </configuration>
           <executions>
               <execution>
                   <id>make-assembly</id>
                   <phase>package</phase>
                   <goals>
                       <goal>single</goal>
                   </goals>
               </execution>
           </executions>
       </plugin>
   </plugins>
</build>

4)UDF用于解析公共字段

package com.kgg.udf;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;

public class BaseFieldUDF extends UDF {

   public String evaluate(String line, String key) throws JSONException {

       // 1 处理line   服务器时间 | json
       String[] log = line.split("\\|");

       //2 合法性校验
       if (log.length != 2 || StringUtils.isBlank(log[1])) {
           return "";
      }

       // 3 开始处理json
       JSONObject baseJson = new JSONObject(log[1].trim());

       String result = "";

       // 4 根据传进来的key查找相应的value
       if ("et".equals(key)) {
           if (baseJson.has("et")) {
               result = baseJson.getString("et");
          }
      } else if ("st".equals(key)) {
           result = log[0].trim();
      } else {
           JSONObject cm =“” baseJson.getJSONObject("cm");
           if (cm.has(key)) {
               result = cm.getString(key);
          }
      }
       return result;
  }

   public static void main(String[] args) throws JSONException {

       String line = "1541217850324|{"cm":{"mid":"m7856","uid":"u8739","ln":"-74.8","sv":"V2.2.2","os":"8.1.3","g":"P7XC9126@gmail.com","nw":"3G","l":"es","vc":"6","hw":"640*960","ar":"MX","t":"1541204134250","la":"-31.7","md":"huawei

发表评论

0/200
433 点赞
0 评论
收藏