大数据技术之_18_大数据离线平台_03_数据处理+工具代码导入+业务 ETL 实现+创建数据库表

十六、数据处理

16.1、ETL 操作

  • 功能:清洗、过滤、补全
  • 数据来源:存储在 HDFS 上的日志文件
  • 数据处理方式:MapReduce
  • 数据保存位置:HBase

16.2、HBase 设计

16.2.1、每天1张表

  即按天分表,一天的数据存放于一张表中,rowkey 采用随机值,不需要有特定规律,尽可能的散列。

16.2.2、倒序或在前缀上加数字

  rowkey 的设计要具体问题具体分析,有时会采取倒序的原则,有时会采取 rowkey 前加上一个随机的数字。(该数字一般要和 HregionServer 的数量求模运算)

16.2.3、预分区

  根据业务预估数据量,提前建好预分区,避免 region 频繁拆分合并造成的性能浪费。

16.3、MapReduce 分析过程

  操作流程:HBase 读取数据 -> InputFormat -> map -> shuffle -> reduce -> OutputFormat -> Mysql

16.4、Hive 分析过程

  • 数据源:使用 Hive external table 创建关联 HBase 中的数据表
  • 数据结果:保存于 HDFS 上(或者保存到 Hive 结果表中)
  • 操作流程:Hive external table -> UDF编写 -> HQL 分析语句编写 -> 保存到 Hive 结果表中(其实也就是在HDFS上) -> Sqoop - 导出数据 -> Mysql

16.5、Mysql 表结构设计

16.5.1、常用关系型数据库表模型

  在多维分析的商业智能解决方案中,根据事实表维度表的关系,又可将常见的模型分为星型模型雪花型模型。在设计逻辑型数据的模型的时候,就应考虑数据是按照星型模型还是雪花型模型进行组织。

  • 星型模型 星型架构是一种非正规化的结构,多维数据集的每一个维度都直接与事实表相连接,不存在渐变维度,所以数据有一定的冗余,如在地域维度表中,存在国家 A 省 B 的城市 C 以及国家 A 省 B 的城市 D 两条记录,那么国家 A 和省 B 的信息分别存储了两次,即存在冗余。
  • 雪花模型 当有一个或多个维表没有直接连接到事实表上,而是通过其他维表连接到事实表上时,其图解就像多个雪花连接在一起,故称雪花模型。雪花模型是对星型模型的扩展。它对星型模型的维表进一步层次化,原有的各维表可能被扩展为小的事实表,形成一些局部的 " 层次" 区域,这些被分解的表都连接到主维度表而不是事实表。如下图,将地域维表又分解为国家、省份、城市等维表。它的优点是:通过最大限度地减少数据存储量以及联合较小的维表来改善查询性能。雪花型结构去除了数据冗余

雪花模型在加载数据集时,ETL 操作在设计上更加复杂,而且由于附属模型的限制,不能并行化。 星形模型加载维度表,不需要再维度之间添加附属模型,因此 ETL 就相对简单,而且可以实现高度的并行化。

16.5.2、表结构

  • 维度表:dimension_table
  • 事实表:stats_table
  • 辅助表:主要用于协助 ETL、数据分析等操作获取其他非日志数据,例如:保存会员 id 等

十七、工具代码导入

代码结构图

部分示例代码如下: pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.z</groupId>
    <artifactId>transformer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>transformer</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-resourcemanager -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
            <version>2.7.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!-- mysql start -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>
        <!-- mysql end -->

        <!-- 用户浏览器解析 -->
        <dependency>
            <groupId>cz.mallat.uasparser</groupId>
            <artifactId>uasparser</artifactId>
            <version>0.6.1</version>
        </dependency>

        <!-- json包 -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.6.2</version>
        </dependency>
    </dependencies>
    <profiles>
        <profile>
            <!-- 唯一id,表示本地 -->
            <id>local</id>
            <activation>
                <!-- maven编译的时候,默认环境,该参数为true只能存在一个 -->
                <activeByDefault>true</activeByDefault>
            </activation>
            <build>
                <!-- 插件信息 -->
                <plugins>
                    <plugin>
                        <!-- 将指定包的java文件进行编译打包操作 -->
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>build-helper-maven-plugin</artifactId>
                        <version>1.4</version>
                        <executions>
                            <execution>
                                <id>add-source</id>
                                <phase>generate-sources</phase>
                                <goals>
                                    <goal>add-source</goal>
                                </goals>
                                <configuration>
                                    <sources>
                                        <source>${basedir}/src/main/java</source>
                                    </sources>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>

        <profile>
            <!-- 需要最终形成一个jar文件 -->
            <id>dev</id>
            <build>
                <plugins>
                    <plugin>
                        <groupId>org.codehaus.mojo</groupId>
                        <artifactId>build-helper-maven-plugin</artifactId>
                        <version>1.4</version>
                        <executions>
                            <execution>
                                <id>add-source</id>
                                <phase>generate-sources</phase>
                                <goals>
                                    <goal>add-source</goal>
                                </goals>
                                <configuration>
                                    <sources>
                                        <source>${basedir}/src/main/java</source>
                                    </sources>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>

                    <plugin>
                        <!-- 将第三方的依赖包,一起打入到最终形成的jar文件中 -->
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.1</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                                <configuration>
                                    <artifactSet>
                                        <includes>
                                            <include>cz.mallat.uasparser:uasparser</include>
                                            <include>net.sourceforge.jregex:jregex</include>
                                            <include>mysql:mysql-connector-java</include>
                                        </includes>
                                    </artifactSet>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        </profile>
    </profiles>

    <build>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
        <pluginManagement>
            <plugins>
                <!--This plugin's configuration is used to store Eclipse m2e settings 
                    only. It has no influence on the Maven build itself. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.codehaus.mojo</groupId>
                                        <artifactId>
                                            build-helper-maven-plugin
                                        </artifactId>
                                        <versionRange>[1.4,)</versionRange>
                                        <goals>
                                            <goal>add-source</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore></ignore>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

resources 目录下文件 core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
       Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <!-- 指定HDFS中NameNode的地址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://hadoop102:9000</value>
    </property>

    <!-- 指定Hadoop运行时产生文件的存储目录 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-2.7.2/data/tmp</value>
    </property>

    <property>
        <name>hadoop.proxyuser.admin.hosts</name>
        <value>*</value>
    </property>

    <property>
        <name>hadoop.proxyuser.admin.groups</name>
        <value>*</value>
    </property>

    <property>
        <name>hadoop.proxyuser.httpfs.hosts</name>
        <value>*</value>
    </property>

    <property>
        <name>hadoop.proxyuser.httpfs.groups</name>
        <value>*</value>
    </property>

    <!-- 配置垃圾回收时间为1分钟
    <property>
        <name>fs.trash.interval</name>
        <value>1</value>
    </property>
    -->

    <!-- 修改访问垃圾回收站用户名称为 atguigu
    <property>
        <name>hadoop.http.staticuser.user</name>
        <value>atguigu</value>
    </property>
    -->
</configuration>

hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-->
<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop102:9000/hbase</value>
    </property>

    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <!-- 0.98后的新变动,之前版本没有.port,默认端口为16000 -->
    <property>
        <name>hbase.master.port</name>
        <value>16000</value>
    </property>

    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
    </property>

    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/opt/module/zookeeper-3.4.10/zkData</value>
    </property>

    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>com.china.hbase.CalleeWriteObserver</value>
    </property>

    <property>
        <name>zookeeper.session.timeout</name>
        <value>90000</value>
    </property>
</configuration>

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <!-- 指定HDFS副本的数量,默认是3个 -->
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>

    <!-- 指定Hadoop辅助名称节点主机配置 -->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop104:50090</value>
    </property>

    <!-- 关闭权限检查-->
    <property>
        <name>dfs.permissions.enable</name>
        <value>false</value>
    </property>

    <property>
        <name>dfs.webhdfs.enabled</name>
        <value>true</value>
    </property>

    <!-- NameNode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性。
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:///${hadoop.tmp.dir}/dfs/name1,file:///${hadoop.tmp.dir}/dfs/name2</value>
    </property>
    -->

    <!-- DataNode也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本。
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value>
    </property>
    -->

    <!-- 白名单信息
    <property>
        <name>dfs.hosts</name>
        <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value>
    </property>
    -->

    <!-- 黑名单信息
    <property>
        <name>dfs.hosts.exclude</name>
        <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value>
    </property>
    -->
</configuration>

log4j.properties

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Define some default values that can be overridden by system properties
hadoop.root.logger=INFO,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log

# Define the root logger to the system property "hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger}, EventCounter

# Logging Threshold
log4j.threshold=ALL

# Null Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender

#
# Rolling File Appender - cap space usage at 5gb.
#
hadoop.log.maxfilesize=256MB
hadoop.log.maxbackupindex=20
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}

log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}

log4j.appender.RFA.layout=org.apache.log4j.PatternLayout

# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n


#
# Daily Rolling File Appender
#

log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}

# Rollver at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd

# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout

# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format
#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n


#
# console
# Add "console" to rootlogger above if you want to use this 
#

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

#
# TaskLog Appender
#

#Default values
hadoop.tasklog.taskid=null
hadoop.tasklog.iscleanup=false
hadoop.tasklog.noKeepSplits=4
hadoop.tasklog.totalLogFileSize=100
hadoop.tasklog.purgeLogSplits=true
hadoop.tasklog.logsRetainHours=12

log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}

log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

#
# HDFS block state change log from block manager
#
# Uncomment the following to suppress normal block state change
# messages from BlockManager in NameNode.
#log4j.logger.BlockStateChange=WARN

#
#Security appender
#
hadoop.security.logger=INFO,NullAppender
hadoop.security.log.maxfilesize=256MB
hadoop.security.log.maxbackupindex=20
log4j.category.SecurityLogger=${hadoop.security.logger}
hadoop.security.log.file=SecurityAuth-${user.name}.audit
log4j.appender.RFAS=org.apache.log4j.RollingFileAppender 
log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.appender.RFAS.MaxFileSize=${hadoop.security.log.maxfilesize}
log4j.appender.RFAS.MaxBackupIndex=${hadoop.security.log.maxbackupindex}

#
# Daily Rolling Security appender
#
log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender 
log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.DRFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.appender.DRFAS.DatePattern=.yyyy-MM-dd

#
# hadoop configuration logging
#

# Uncomment the following line to turn off configuration deprecation warnings.
# log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN

#
# hdfs audit logging
#
hdfs.audit.logger=INFO,NullAppender
hdfs.audit.log.maxfilesize=256MB
hdfs.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}

#
# mapred audit logging
#
mapred.audit.logger=INFO,NullAppender
mapred.audit.log.maxfilesize=256MB
mapred.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.mapred.AuditLogger=${mapred.audit.logger}
log4j.additivity.org.apache.hadoop.mapred.AuditLogger=false
log4j.appender.MRAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.MRAUDIT.File=${hadoop.log.dir}/mapred-audit.log
log4j.appender.MRAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.MRAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
log4j.appender.MRAUDIT.MaxFileSize=${mapred.audit.log.maxfilesize}
log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}

# Custom Logging levels

#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
#log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=DEBUG

# Jets3t library
log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR

# AWS SDK & S3A FileSystem
log4j.logger.com.amazonaws=ERROR
log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN

#
# Event Counter Appender
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
#
log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter

#
# Job Summary Appender 
#
# Use following logger to send summary to separate file defined by 
# hadoop.mapreduce.jobsummary.log.file :
# hadoop.mapreduce.jobsummary.logger=INFO,JSA
# 
hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
hadoop.mapreduce.jobsummary.log.maxfilesize=256MB
hadoop.mapreduce.jobsummary.log.maxbackupindex=20
log4j.appender.JSA=org.apache.log4j.RollingFileAppender
log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
log4j.appender.JSA.MaxFileSize=${hadoop.mapreduce.jobsummary.log.maxfilesize}
log4j.appender.JSA.MaxBackupIndex=${hadoop.mapreduce.jobsummary.log.maxbackupindex}
log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false

#
# Yarn ResourceManager Application Summary Log 
#
# Set the ResourceManager summary log filename
yarn.server.resourcemanager.appsummary.log.file=rm-appsummary.log
# Set the ResourceManager summary log level and appender
yarn.server.resourcemanager.appsummary.logger=${hadoop.root.logger}
#yarn.server.resourcemanager.appsummary.logger=INFO,RMSUMMARY

# To enable AppSummaryLogging for the RM, 
# set yarn.server.resourcemanager.appsummary.logger to 
# <LEVEL>,RMSUMMARY in hadoop-env.sh

# Appender for ResourceManager Application Summary Log
# Requires the following properties to be set
#    - hadoop.log.dir (Hadoop Log directory)
#    - yarn.server.resourcemanager.appsummary.log.file (resource manager app summary log filename)
#    - yarn.server.resourcemanager.appsummary.logger (resource manager app summary log level and appender)

log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=${yarn.server.resourcemanager.appsummary.logger}
log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=false
log4j.appender.RMSUMMARY=org.apache.log4j.RollingFileAppender
log4j.appender.RMSUMMARY.File=${hadoop.log.dir}/${yarn.server.resourcemanager.appsummary.log.file}
log4j.appender.RMSUMMARY.MaxFileSize=256MB
log4j.appender.RMSUMMARY.MaxBackupIndex=20
log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout
log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n

# HS audit log configs
#mapreduce.hs.audit.logger=INFO,HSAUDIT
#log4j.logger.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=${mapreduce.hs.audit.logger}
#log4j.additivity.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=false
#log4j.appender.HSAUDIT=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.HSAUDIT.File=${hadoop.log.dir}/hs-audit.log
#log4j.appender.HSAUDIT.layout=org.apache.log4j.PatternLayout
#log4j.appender.HSAUDIT.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
#log4j.appender.HSAUDIT.DatePattern=.yyyy-MM-dd

# Http Server Request Logs
#log4j.logger.http.requests.namenode=INFO,namenoderequestlog
#log4j.appender.namenoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.namenoderequestlog.Filename=${hadoop.log.dir}/jetty-namenode-yyyy_mm_dd.log
#log4j.appender.namenoderequestlog.RetainDays=3

#log4j.logger.http.requests.datanode=INFO,datanoderequestlog
#log4j.appender.datanoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.datanoderequestlog.Filename=${hadoop.log.dir}/jetty-datanode-yyyy_mm_dd.log
#log4j.appender.datanoderequestlog.RetainDays=3

#log4j.logger.http.requests.resourcemanager=INFO,resourcemanagerrequestlog
#log4j.appender.resourcemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.resourcemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-resourcemanager-yyyy_mm_dd.log
#log4j.appender.resourcemanagerrequestlog.RetainDays=3

#log4j.logger.http.requests.jobhistory=INFO,jobhistoryrequestlog
#log4j.appender.jobhistoryrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.jobhistoryrequestlog.Filename=${hadoop.log.dir}/jetty-jobhistory-yyyy_mm_dd.log
#log4j.appender.jobhistoryrequestlog.RetainDays=3

#log4j.logger.http.requests.nodemanager=INFO,nodemanagerrequestlog
#log4j.appender.nodemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.nodemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-nodemanager-yyyy_mm_dd.log
#log4j.appender.nodemanagerrequestlog.RetainDays=3

十八、业务 ETL 实现

18.1、功能

  • 过滤内容:过滤无效数据,比如缺少 uuid,缺少会话 ip,订单事件中缺少订单 id。
  • 补全内容:IP 地址信息补全地域信息(国家、省份、城市等)、浏览器相关信息补全,服务器时间补全等等。

18.2、数据

18.2.1、上传方式

  • Flume: 在Flume 工作正常的情况下,所有的日志均由 Flume 上传写入。(详见第13.4章节)
  • Shell 手动:当 Flume 进程出现异常,需要手动执行脚本的上传。(详见第十五章节)

18.2.2、流程

  • 使用 MapReduce 通过 TextInputFormat 的方式将 HDFS 中的数据读取到 map 中,最终通过 TableOutputFormat 到 HBase 中。

18.2.3、细节分析

日志解析   日志存储于 HDFS 中,一行一条日志,解析出操作行为中具体的 key-value 值,然后进行解码操作。

IP地址解析/补全

浏览器信息解析

HBase rowkey 设计 注意规则:尽可能的短小,占用内存少,尽可能的均匀分布。(即散列)

HBase 表的创建   使用 Java API 创建。

18.3、代码实现

关键类:   LoggerUtil.java 示例代码如下:

package com.z.transformer.util;

import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.z.transformer.common.EventLogConstants;
import com.z.transformer.util.IPSeekerExt.RegionInfo;

import cz.mallat.uasparser.UserAgentInfo;

public class LoggerUtil {

    // 日志输出提示
    private static final Logger logger = Logger.getLogger(LoggerUtil.class);

    /**
     * 解析给定的日志行,如果解析成功返回一个有值的 map 集合,如果解析失败,返回一个 empty 集合
     * 
     * @param logText
     * @return
     */
    public static Map<String, String> handleLogText(String logText) {
        Map<String, String> result = new HashMap<String, String>();
        // 1、开始解析
        // hadoop 集群中默认只有 org.apache.commons.lang.StringUtils 所在的 jar 包,如果使用其他
        // StringUtils,hadoop 集群中需要导入该 StringUtils 依赖的 jar 包方可使用
        if (StringUtils.isNotBlank(logText)) {
            // 日志行非空,可以进行解析
            String[] splits = logText.trim().split(EventLogConstants.LOG_SEPARTIOR); // 日志分隔符
                                                                                        // ^A
            // 192.168.25.102^A1555318954.798^A/what.png?u_nu=1&u_sd=6D4F89C0-E17B-45D0-BFE0-059644C1878D&c_time=......
            if (splits.length == 3) {
                // 日志格式是正确的,进行解析
                String ip = splits[0].trim();
                // 将 ip 地址封装进 Map 集合中
                result.put(EventLogConstants.LOG_COLUMN_NAME_IP, ip);
                long serverTime = TimeUtil.parseNginxServerTime2Long(splits[1].trim());
                if (serverTime != -1L) {
                    // 表示服务器时间解析正确,而且 serverTime 就是对于的毫秒级的时间戳
                    // 将 serverTime 封装进 Map 集合中
                    result.put(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, String.valueOf(serverTime));
                }

                // 获取请求体
                String requestBody = splits[2].trim();
                int index = requestBody.indexOf("?"); // ? 符号所在的索引位置
                if (index >= 0 && index != requestBody.length() - 1) {
                    // 在请求参数中存在 ?,而且 ? 不是最后一个字符的情况,则截取?后面的内容
                    requestBody = requestBody.substring(index + 1);
                } else {
                    requestBody = null;
                }

                if (StringUtils.isNotBlank(requestBody)) {
                    // 非空,开始处理请求参数
                    handleRequestBody(result, requestBody);

                    // 开始补全 ip 地址
                    RegionInfo info = IPSeekerExt.getInstance().analysisIp(result.get(EventLogConstants.LOG_COLUMN_NAME_IP)); // 用户ip地址
                    if (info != null) {
                        result.put(EventLogConstants.LOG_COLUMN_NAME_COUNTRY, info.getCountry()); // 国家
                        result.put(EventLogConstants.LOG_COLUMN_NAME_PROVINCE, info.getProvince()); // 省份
                        result.put(EventLogConstants.LOG_COLUMN_NAME_CITY, info.getCity()); // 城市
                    }

                    // 开始补全浏览器信息
                    UserAgentInfo uaInfo = UserAgentUtil.analyticUserAgent(result.get(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT)); // 浏览器user agent参数
                    if (uaInfo != null) {
                        // 浏览器名称
                        result.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, uaInfo.getUaFamily()); // 浏览器名称
                        // 浏览器版本号
                        result.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, uaInfo.getBrowserVersionInfo()); // 浏览器版本
                        // 浏览器所在操作系统
                        result.put(EventLogConstants.LOG_COLUMN_NAME_OS_NAME, uaInfo.getOsFamily()); // 操作系统名称
                        // 浏览器所在操作系统的版本
                        result.put(EventLogConstants.LOG_COLUMN_NAME_OS_VERSION, uaInfo.getOsName()); // 操作系统版本
                    }

                } else {
                    // logger
                    logger.debug("请求参数为空:" + logText);
                    result.clear(); // 清空
                }
            } else {
                // log记录一下
                logger.debug("日志行内容格式不正确:" + logText);
            }
        } else {
            logger.debug("日志行内容为空,无法进行解析:" + logText);
        }
        return result;
    }

    /**
     * 处理请求参数<br/>
     * 处理结果保存到参数 result 集合(Map 集合)
     * 
     * @param clientInfo
     *            保存最终用户行为数据的 map 集合
     * @param requestBody
     *            请求参数中,用户行为数据,格式为:
     *            u_nu=1&u_sd=6D4F89C0-E17B-45D0-BFE0-059644C1878D&c_time=
     *            1450569596991&ver=1&en=e_l&pl=website&sdk=js&b_rst=1440*900&
     *            u_ud=4B16B8BB-D6AA-4118-87F8-C58680D22657&b_iev=Mozilla%2F5.0%
     *            20(Windows%20NT%205.1)%20AppleWebKit%2F537.36%20(KHTML%2C%
     *            20like%20Gecko)%20Chrome%2F45.0.2454.101%20Safari%2F537.36&l=
     *            zh-CN&bf_sid=33cbf257-3b11-4abd-ac70-c5fc47afb797_11177014
     */
    private static void handleRequestBody(Map<String, String> clientInfo, String requestBody) {
        // 将请求参数体按照 & 切割
        String[] parameters = requestBody.split("&");
        for (String parameter : parameters) {
            // 循环处理参数,parameter 格式为: c_time=1450569596991  = 只会出现一次
            String[] params = parameter.split("=");
            String key, value = null;
            try {
                // 使用 utf-8 解码
                key = URLDecoder.decode(params[0].trim(), "utf-8");
                value = URLDecoder.decode(params[1].trim(), "utf-8");
                // 添加到结果集合  Map 中
                clientInfo.put(key, value);
            } catch (Exception e) {
                logger.warn("解码失败:" + parameter, e);
            }
        }
    }
}

18.3.1、日志解析

18.3.2、IP地址解析/补全

使用淘宝接口解析IP地址   官网:http://ip.taobao.com/   示例:REST API:http://ip.taobao.com/service/getIpInfo.php?ip=123.125.71.38   限制:10QPS(Query Per Second)

使用第三方 IP 库   通过文件中已经存放的 IP 和地区的映射进行 IP 解析,由于更新不及时,可能会导致某些 IP 解析不正确(小概率事件)。(推荐使用:纯真IP地址数据库)

使用自己的 IP 库   通过第三方的 IP 库,逐渐生成自己的 IP 库,自主管理。

IP 库表设计   startip(起始ip)   endip(结束ip)   country(国家)   province(省份)   city(城市)

尖叫提示:判断某个 IP 是否在某个地域的起始 IP 和结束 IP 区间。

IP 与 long 的互转的工具类: 示例代码如下:

    // 将 127.0.0.1 形式的 IP 地址转换成十进制整数
    public long IpToLong(String strIp){
        long[] ip = new long[4];
        int position1 = strIp.indexOf(".");
        int position2 = strIp.indexOf(".", position1 + 1);
        int position3 = strIp.indexOf(".", position2 + 1);
        // 将每个.之间的字符串转换成整型  
        ip[0] = Long.parseLong(strIp.substring(0, position1));
        ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2 - position1 - 1));
        ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3 - position2 - 1));
        ip[3] = Long.parseLong(strIp.substring(position3 + 1));
        // 进行左移位处理
        return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3];
    }

    // 将十进制整数形式转换成 127.0.0.1 形式的 ip 地址
    public String LongToIp(long ip) {
        StringBuilder sb = new StringBuilder();
        // 直接右移 24 位
        sb.append(ip >> 24);
        sb.append(".");
        // 将高 8 位置 0,然后右移 16
        sb.append((ip & 0x00FFFFFF) >> 16);
        sb.append(".");
        // 将高 16 位置0 ,然后右移 8 位
        sb.append((ip & 0x0000FFFF) >> 8);
        sb.append(".");
        // 将高 24 位置 0
        sb.append((ip & 0x000000FF));
        return sb.toString();
    }

18.3.3、浏览器信息解析

  • 依赖查询:http://mvnrepository.com/
  • 依赖工具:uasparser 第三方浏览器信息解析工具

18.3.4、ETL代码编写

新建类:   AnalysisDataMapper.java   AnalysisDataRunner.java   目标:读取 HDFS 中的数据,清洗后写入到 HBase 中。

核心思路梳理:

  • Step1、创建 AnalysisDataMapper 类,复写 map 方法。
  • Step2、在 map 方法中通过 LoggerUtil.handleLogText 方法将当前行数据解析成 Map<String, String> 集合 clientInfo。
  • Step3、获取当前行日志信息的事件类型,并根据获取到的事件类型去枚举类型中匹配生成 EventEnum 对象,如果没有匹配到对应的事件类型,则返回 null。
  • Step4、判断如果无法处理给定的事件类型,则使用 log4j 输出。
  • Step5、如果可以处理指定事件类型,则开始处理事件,创建 handleEventData(Map<String, String> clientInfo, EventEnum event, Context context, Text value) 方法处理事件。
  • Step6、在 handleEventData 方法中,我们需要过滤掉那些数据不合法的 Event 事件,通过 filterEventData(Map<String, String> clientInfo, EventEnum event) 方法过滤。 过滤规则:如果是 java_server 过来的数据,则会员 id 必须存在,如果是 website 过来的数据,则会话 id 和用户 id 必须存在。
  • Step7、如果没有通过过滤,则通过日志输出当前数据,如果通过过滤,则开始准备输出数据,创建方法 outPutData(Map<String, String> clientInfo, Context context)
  • Step8、outputData 方法中,我们可以删除一些无用的数据,比如浏览器信息的原始数据(因为已经解析过了)。同时需要创建一个生成 rowKey 的方法 generateRowKey(String uuid, long serverTime, Map<String, String> clientInfo),通过该方法生成的 rowKey 之后,添加内容到 HBase 表中。
  • Step9、generateRowKey 方法主要用于 rowKey 的生成,通过拼接:时间+uuid的crc32编码+数据内容的hash码的crc32编码,作为 rowKey,一共 12 个字节。

示例代码如下: AnalysisDataMapper.java

package com.z.transformer.mr.etl;

import java.io.IOException;
import java.util.Map;
import java.util.zip.CRC32;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.EventLogConstants.EventEnum;
import com.z.transformer.util.LoggerUtil;
import com.z.transformer.util.TimeUtil;

public class AnalysisDataMapper extends Mapper<Object, Text, NullWritable, Put> {
    // Object 是偏移量,Text 表示输入,NullWritable, Put 可以互换

    // 如果无法处理给定的事件类型,则使用 log4j 输出, Logger 可以在运行 jar 包的控制台输出
    private static final Logger logger = Logger.getLogger(AnalysisDataMapper.class);

    private CRC32 crc1 = null;
    private CRC32 crc2 = null;
    private byte[] family = null;
    private long currentDayInMills = -1;

    /**
     * 初始化数据
     */
    @Override
    protected void setup(Mapper<Object, Text, NullWritable, Put>.Context context)
            throws IOException, InterruptedException {
        crc1 = new CRC32();
        crc2 = new CRC32();
        this.family = EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME;
        currentDayInMills = TimeUtil.getTodayInMillis();
    }

    // 1、覆写 map 方法
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        // 2、将原始数据通过 LoggerUtil 解析成 Map 键值对
        Map<String, String> clientInfo = LoggerUtil.handleLogText(value.toString());

        // 2.1、如果解析失败,则 Map 集合中无数据,通过日志输出当前数据
        if (clientInfo.isEmpty()) {
            logger.debug("日志解析失败:" + value.toString());
            return;
        }

        // 3、根据解析后的数据,生成对应的 Event 事件类型(通过枚举类型的别名来解析)
        EventEnum event = EventEnum.valueOfAlias(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME));
        if (event == null) {
            // 4、无法处理的事件,直接输出事件类型
            logger.debug("无法匹配对应的事件类型:" + clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME));
        } else {
            // 5、处理具体的事件
            handleEventData(clientInfo, event, context, value);
            // clientInfo 数据集, event 事件类型, context 上下文(通过上下文写入到HBase), value 当前行的数据(可能会有新的过滤操作)
        }
    }

    /**
     * 处理具体的事件的方法
     * 
     * @param clientInfo
     * @param event
     * @param context
     * @param value
     * @throws InterruptedException
     * @throws IOException
     */
    public void handleEventData(Map<String, String> clientInfo, EventEnum event, Context context, Text value)
            throws IOException, InterruptedException {
        // 6、如果事件成功通过过滤,则准备处理具体事件
        if (filterEventData(clientInfo, event)) {
            outPutData(clientInfo, context);
        } else {
            // 如果事件没有通过过滤,则通过日志输出当前数据
            logger.debug("事件格式不正确:" + value.toString());
        }
    }

    /**
     * 6、如果事件成功通过过滤,则准备处理具体事件(我们的 HBase 只存成功通过过滤的事件)
     * 
     * @param clientInfo
     * @param event
     * @return
     */
    public boolean filterEventData(Map<String, String> clientInfo, EventEnum event) {
        // 事件数据全局过滤(具体全局过滤条件视情况而定,这里的 “服务器时间” 和 “平台” 是例子)
        boolean result = StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME))
                && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_PLATFORM));
        // 后面几乎全部是&&操作,只要有一个 false,那么该 Event 事件就无法处理

        // public static final String PC_WEBSITE_SDK = "website";
        // public static final String JAVA_SERVER_SDK = "java_server";

        // 先确定平台
        switch (clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)) {
        // Java Server 平台发来的数据
        case EventLogConstants.PlatformNameConstants.JAVA_SERVER_SDK:
            result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID)); // 先判断会员 ID 是否存在
            // 再确定事件
            switch (event) {
            case CHARGEREFUND:
                // 退款事件
                // ......
                break;
            case CHARGESUCCESS:
                // 订单支付成功事件
                result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_ID));
                break;
            default:
                logger.debug("无法处理指定事件:" + clientInfo);
                result = false;
                break;
            }
            break;

        // WebSite 平台发来的数据
        case EventLogConstants.PlatformNameConstants.PC_WEBSITE_SDK:
            // 再确定事件
            switch (event) {
            case CHARGEREQUEST:
                // 下单事件
                result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_ID))
                        && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_CURRENCY_TYPE))
                        && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_PAYMENT_TYPE))
                        && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_CURRENCY_AMOUNT));
                break;
            case EVENT:
                // Event 事件
                result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_CATEGORY))
                        && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_ACTION));
                break;
            case LAUNCH:
                // Launch 访问事件
                // ......
                break;
            case PAGEVIEW:
                // PV 事件
                result = result && StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_CURRENT_URL));
                break;
            default:
                logger.debug("无法处理指定事件:" + clientInfo);
                result = false;
                break;
            }
            break;

        default:
            result = false;
            logger.debug("无法确定的数据来源:" + clientInfo);
            break;
        }

        return result;
    }

    /**
     * 7 和 8、如果事件成功通过过滤,则输出事件到 HBase 的方法
     * 
     * @param clientInfo
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void outPutData(Map<String, String> clientInfo, Context context) throws IOException, InterruptedException {
        String uuid = clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
        long serverTime = Long.valueOf(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME));

        // 因为浏览器信息已经解析完成,所以此时删除原始的浏览器信息
        clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT);

        // 创建 rowKey
        byte[] rowkey = generateRowKey(uuid, serverTime, clientInfo);
        Put put = new Put(rowkey);

        for (Map.Entry<String, String> entry : clientInfo.entrySet()) {
            if (StringUtils.isNotBlank(entry.getKey()) && StringUtils.isNotBlank(entry.getValue())) {
                put.addColumn(family, Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
            }
        }

        context.write(NullWritable.get(), put);
    }

    /**
     * 9、为向 HBase 中写入数据依赖 Put 对象,Put 对象的创建依赖 RowKey,所以如下方法
     * 
     * rowKey=时间+uuid的crc32编码+数据内容的hash码的crc32编码
     * 
     * @return
     */
    public byte[] generateRowKey(String uuid, long serverTime, Map<String, String> clientInfo) {
        // 先清空 crc1 和  crc2 集合中的数据内容
        crc1.reset();
        crc2.reset();

        // 时间=当前数据访问服务器的时间-当天00:00点的时间戳 ,得到最大值是8位数字=3600*24*1000=86400000 ,可以用int来存储,大小是 4个字节
        byte[] timeBytes = Bytes.toBytes(serverTime - this.currentDayInMills);

        // uuid 的 crc 编码
        if (StringUtils.isNotBlank(uuid)) {
            this.crc1.update(Bytes.toBytes(uuid));
        }
        byte[] uuidBytes = Bytes.toBytes(this.crc1.getValue());

        // 数据内容的 hash 码的 crc 编码
        this.crc2.update(Bytes.toBytes(clientInfo.hashCode()));
        byte[] clientInfoBytes = Bytes.toBytes(this.crc2.getValue());

        // 综合字节数组
        byte[] buffer = new byte[timeBytes.length + uuidBytes.length + clientInfoBytes.length];
        // 数组合并
        System.arraycopy(timeBytes, 0, buffer, 0, timeBytes.length);
        System.arraycopy(uuidBytes, 0, buffer, timeBytes.length, uuidBytes.length);
        System.arraycopy(clientInfoBytes, 0, buffer, uuidBytes.length, clientInfoBytes.length);

        return buffer;
    }
}

AnalysisDataRunner.java

package com.z.transformer.mr.etl;

import java.io.File;
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.GlobalConstants;
import com.z.transformer.util.TimeUtil;

public class AnalysisDataRunner implements Tool {
    private Configuration conf = null;

    public static void main(String[] args) {
        try {
            int resultCode = ToolRunner.run(new AnalysisDataRunner(), args);
            if (resultCode == 0) {
                System.out.println("Success!");
            } else {
                System.out.println("Fail!");
            }
            System.exit(resultCode);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void setConf(Configuration conf) {
        // 先实例化 Configuration
        this.conf = HBaseConfiguration.create(conf);
    }

    @Override
    public Configuration getConf() {
        // 全局的访问方法
        return this.conf;
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();
        // 处理传入的时间参数,默认或不合法时间则直接使用昨天日期
        this.processArgs(conf, args);

        // 开始创建 Job
        Job job = Job.getInstance(conf, "Event-ETL");

        // 设置 Job 参数
        job.setJarByClass(AnalysisDataRunner.class);

        // Mapper 参数设置
        job.setMapperClass(AnalysisDataMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Put.class);

        // Reducer 参数设置
        job.setNumReduceTasks(0);

        // 设置数据输入
        initJobInputPath(job);

        // 设置输出到 HBase 的信息
        initHBaseOutPutConfig(job);
        // job.setJar("target/transformer-0.0.1-SNAPSHOT.jar");

        // Job 提交
        return job.waitForCompletion(true) ? 0 : 1;
    }

    /**
     * 初始化 Job 数据输入目录
     * 
     * @param job
     * @throws IOException
     */
    private void initJobInputPath(Job job) throws IOException {
        Configuration conf = job.getConfiguration();
        // 获取要执行ETL操作的那一天的数据
        String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); // 2017-08-14
        // 格式化 HDFS 文件路径
        String hdfsPath = TimeUtil.parseLong2String(TimeUtil.parseString2Long(date), "yyyy/MM/dd");// 2017/08/14

        if (GlobalConstants.HDFS_LOGS_PATH_PREFIX.endsWith("/")) {
            hdfsPath = GlobalConstants.HDFS_LOGS_PATH_PREFIX + hdfsPath; // /event-logs/2017/08/14
        } else {
            hdfsPath = GlobalConstants.HDFS_LOGS_PATH_PREFIX + File.separator + hdfsPath; // /event-logs/2017/08/14
            // File.separator 的作用是:根据当前操作系统获取对应的文件分隔符,windows中是 \ ,Linux中是 /
        }

        FileSystem fs = FileSystem.get(conf);
        Path inPath = new Path(hdfsPath);

        if (fs.exists(inPath)) {
            FileInputFormat.addInputPath(job, inPath);
        } else {
            throw new RuntimeException("HDFS 中该文件目录不存在:" + hdfsPath);
        }
    }

    /**
     * 设置输出到 HBase 的一些操作选项
     * 
     * @throws IOException
     */
    private void initHBaseOutPutConfig(Job job) throws IOException {
        Configuration conf = job.getConfiguration();
        // 获取要执行ETL操作的那一天的数据
        String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); // 2017-08-14
        // 格式化 HBase 表的后缀名
        String tableNameSuffix = TimeUtil.parseLong2String(TimeUtil.parseString2Long(date), TimeUtil.HBASE_TABLE_NAME_SUFFIX_FORMAT); // 20170814
        // 构建表名
        String tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + tableNameSuffix; // event_logs20170814

        // 指定输出(初始化 ReducerJob)
        TableMapReduceUtil.initTableReducerJob(tableName, null, job);

        Connection conn = null;
        Admin admin = null;

        // 使用 HBase 的新 API
        conn = ConnectionFactory.createConnection(conf);
        admin = conn.getAdmin();

        // 创建表描述器(即通过表名实例化表描述器)
        TableName tn = TableName.valueOf(tableName);
        HTableDescriptor htd = new HTableDescriptor(tn);

        // 设置列族
        htd.addFamily(new HColumnDescriptor(EventLogConstants.EVENT_LOGS_FAMILY_NAME));
        // 判断表是否存在
        if (admin.tableExists(tn)) {
            // 存在,则删除
            if (admin.isTableEnabled(tn)) {
                // 先将表设置为不可用
                admin.disableTable(tn);
            }
            // 再删除表
            admin.deleteTable(tn);
        }

        // 创建表,在创建的过程中可以考虑预分区操作
        // 假设预分区为 3个分区
        // byte[][] keySplits = new byte[3][];
        // keySplits[0] = Bytes.toBytes("1"); // (-∞, 1]
        // keySplits[1] = Bytes.toBytes("2"); // (1, 2]
        // keySplits[2] = Bytes.toBytes("3"); // (2, ∞]
        // admin.createTable(htd, keySplits);

        admin.createTable(htd);
        admin.close();
    }

    /**
     * 处理时间参数,如果没有传递参数的话,则默认清洗前一天的。
     * 
     * Job脚本如下: bin/yarn jar ETL.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2017-08-14
     * 
     * @param args
     */
    private void processArgs(Configuration conf, String[] args) {
        String date = null;
        for (int i = 0; i < args.length; i++) {
            if ("-date".equals(args[i])) { // 找到 "-date" 标记
                date = args[i + 1]; // 获取时间
                break;
            }
        }

        if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
            // 如果没有传递参数,默认清洗昨天的数据然后存储到 HBase 中
            date = TimeUtil.getYesterday();
        }
        // 将要清洗的目标时间字符串保存到 conf 对象中(这样全局中就可以引用)
        conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
    }
}

18.4、测试

18.4.1、上传测试数据

$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir -p /event-logs/2015/12/20
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put /opt/software/20151220.log /event-logs/2015/12/20

18.4.2、打包集群运行

方案一:   修改 etc/hadoop/hadoop-env.sh 中的 HADOOP_CLASSPATH 配置信息。 例如:

    export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

方案二:   使用 maven 插件:maven-shade-plugin,将第三方依赖的 jar 全部打包进去,需要在 pom.xml 中配置依赖。参考【章节 十七、工具代码导入】中的 pom.xml 文件。 参数设置:

    1、-P local clean package(不打包第三方jar)
    2、-P dev clean package install(打包第三方jar)(推荐使用这种)

打包成功后,将打好的 jar 包上传至 Linux 上,然后执行命令,如下:

/opt/module/hadoop-2.7.2/bin/yarn jar /opt/software/transformer-0.0.1-SNAPSHOT.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2015-12-20

测试成功!截图如下: 1、控制台

2、HBase 网页端:http://hadoop102:16010/master-status

3、历史服务器:http://hadoop102:19888/jobhistory/attempts/job_1555404378493_0005/m/SUCCESSFUL

尖叫提示:如果在打包的过程中 org.apache.maven.plugins 其中没有包含所依赖的 jar 包,则需要在 HADOOP_CLASSPATH 添加所依赖的 jar 文件。 例如:编写代码依赖了 HBase,但是打包 MR 任务的时候,没有 include HBase 的相关 jar,则需要在命令行中执行如下命令:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

在执行代码之前,我们先手动删除 hbase 上的表和命名空间,命令如下:

hbase(main):002:0> disable 'event_logs20151220'
hbase(main):003:0> drop 'event_logs20151220'
hbase(main):005:0> drop_namespace 'ns_ct'

问题:当我们查看历史服务器中的 Logs 日志时,发现一个解码失败异常:java.lang.IllegalArgumentException: URLDecoder: Incomplete trailing escape (%) pattern,如下图所示:

解决问题链接:https://www.cnblogs.com/chenmingjun/p/10719587.html

十九、创建数据库表

19.1、使用 Navicat 工具

前提:需要在 Linux 中对 Mysql 的访问授权。

grant all on *.* to root@'%' identified by '123456';
flush privileges;
exit;

19.2、通过 SQL 文件构建表

  参考链接:https://www.cnblogs.com/chenmingjun/p/10185797.html

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券