discuz论坛apache日志hadoop大数据分析项目:hive以及hbase是如何入库以及代码实现

about云discuz论坛apache日志hadoop大数据分析项目: 数据时如何导入hbase与hive的到了这里项目的基本核心功能已经完成。这里介绍一下hive以及hbase是如何入库以及代码实现。 首先我们将hbase与hive整合,详细参考 about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的 about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的 整合完毕,我们就可以通过mapreduce把数据导入hbase,当然在导入hbase的同时,hive数据同时也可以查询出结果。 那么我们是如何导入hbase的,思路前面已经介绍,这里采用的是hbase put。以后的版本中,我们将采用多种方法来实现此功能包括hive分区、hbase后面如果遇到问题,我们可能还会重构。 开发环境介绍: 1.Eclipse 2.Hadoop2.2 3.hbase-0.98.3-hadoop2 思路: 在导入hbase的过程中,我们直接使用了mapreduce中的map函数,reduce在这里对我们没有太大的用处,我们这里借助的是mapreduce的分布式,提高查询效率。 mapreduce中map函数主要实现了哪些功能 1.清洗数据 通过

  1. public static void StringResolves(String line, Context context)

函数实现 2.数据的导入 通过public static void addData(String rowKey, String tableName, String[] column1, String[] value1, Context context) 函数实现

下面贴上代码: HbaseMain.java代码

package www.aboutyun.com;

import java.io.IOException;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class HbaseMain {



       static final String INPUT_PATH = "hdfs://master:8020/test.txt";

       static final String OUT_PATH = "hdfs://master:8020/Output";



       public static void main(String[] args) throws IOException,

                       InterruptedException, ClassNotFoundException {



               // 主类

               Configuration conf = new Configuration();

               Job job = Job.getInstance(conf, HbaseMain.class.getSimpleName());

               job.setJarByClass(HbaseMain.class);

               // 寻找输入

               FileInputFormat.setInputPaths(job, INPUT_PATH);

               // 1.2对输入数据进行格式化处理的类

               job.setInputFormatClass(TextInputFormat.class);

               job.setMapperClass(HbaseMap.class);

               // 1.2指定map输出类型<key,value>类型

               job.setMapOutputKeyClass(Text.class);

               job.setMapOutputValueClass(LongWritable.class);

               job.setNumReduceTasks(0);

               // 指定输出路径

               FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

 

               job.waitForCompletion(true);



       }

}


HbaseMap.java代码

package www.aboutyun.com;



import java.io.IOException;

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Locale;

import java.util.Random;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Mapper.Context;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;



public class HbaseMap extends Mapper<LongWritable, Text, Text, IntWritable> {

       private static Configuration conf = null;

       /**

        * 初始化配置

        */



       static {

               conf = HBaseConfiguration.create();

               conf.set("hbase.zookeeper.quorum", "master");// 使用eclipse时必须添加这个,否则无法定位

               conf.set("hbase.zookeeper.property.clientPort", "2181");

       }



       /**************************************************************************/

       public void map(LongWritable key, Text line, Context context)

                       throws IOException, InterruptedException {



               try {

                       StringResolves(line.toString(), context);

               } catch (ParseException e) {

                       // TODO Auto-generated catch block

                       e.printStackTrace();

               }



       }



       /**************************************************************************/

       // 字符串解析



       public static void StringResolves(String line, Context context)

                       throws ParseException {

               String ipField, dateField, urlField, browserField;



               // 获取ip地址

               ipField = line.split("- -")[0].trim();



               // 获取时间,并转换格式

               int getTimeFirst = line.indexOf("[");

               int getTimeLast = line.indexOf("]");

               String time = line.substring(getTimeFirst + 1, getTimeLast).trim();

               Date dt = null;

               DateFormat df1 = DateFormat.getDateTimeInstance(DateFormat.LONG,

                               DateFormat.LONG);

               dt = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.US)

                               .parse(time);

               dateField = df1.format(dt);

               SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHMM");

               String dateField1 = sdf.format(dt);

               // 获取url

               String[] getUrl = line.split("\"");



               String firtGeturl = getUrl[1].substring(3).trim();



               String secondGeturl = getUrl[3].trim();

               urlField = firtGeturl + "分隔符" + secondGeturl;



               // 获取浏览器

               String[] getBrowse = line.split("\"");

               String strBrowse = getBrowse[5].toString();

               String str = "(KHTML, like Gecko)";

               int i = strBrowse.indexOf(str);

               strBrowse = strBrowse.substring(i);

               String strBrowse1[] = strBrowse.split("\\/");

               strBrowse = strBrowse1[0].toString();

               String strBrowse2[] = strBrowse.split("\\)");

               browserField = strBrowse2[1].trim();



               // 添加到数据库



               String rowKey = ipField + dateField1 + urlField

                               + new Random().nextInt();

               String[] cols = new String[] { "IpAddress", "AccressTime", "Url",

                               "UserBrowser", };

               String[] colsValue = new String[] { ipField, dateField, urlField,

                               browserField };



               try {

                       addData(rowKey, "LogTable", cols, colsValue, context);

                       context.write(new Text("1"), new IntWritable(1));



               } catch (IOException | InterruptedException e) {

                       // TODO Auto-generated catch block

                       e.printStackTrace();

               }

       }



       /*

        * 为表添加数据(适合知道有多少列族的固定表)

        * 

        * @rowKey rowKey

        * 

        * @tableName 表名

        * 

        * @column1 第一个列族列表

        * 

        * @value1 第一个列的值的列表

        */

       public static void addData(String rowKey, String tableName,

                       String[] column1, String[] value1, Context context)

                       throws IOException {



               Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey

               HTable table = new HTable(conf, Bytes.toBytes(tableName));// HTabel负责跟记录相关的操作如增删改查等//

                                                                                                                                       // 获取表

               HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族

                               .getColumnFamilies();



               for (int i = 0; i < columnFamilies.length; i++) {

                       String familyName = columnFamilies[i].getNameAsString(); // 获取列族名

                       if (familyName.equals("Info")) { // info列族put数据

                               for (int j = 0; j < column1.length; j++) {

                                       put.add(Bytes.toBytes(familyName),

                                                       Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));

                               }

                       }



               }

               table.put(put);

               // context.write(new Text(rowKey), null);

               System.out.println("add data Success!");

       }



}




后面我们将会不断完善此功能。

上面的一些准备工作,就不要说了,这里展现一下运行后的效果:
hive效果图

Hbase效果图

这样就达到了效果。后面我们使用hive统计,然后通过将统计结果展示,项目基本完成,后面就不断完善即可。

原文发布于微信公众号 - about云(wwwaboutyuncom)

原文发表时间:2014-08-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏编码小白

tomcat请求处理分析(六)servlet的处理过程

1.1.1.1  servlet的解析过程 servlet的解析分为两步实现,第一个是匹配到对应的Wrapper,第二个是加载对应的servlet并进行数据,这...

74570
来自专栏日常分享

Java 实现一个带提醒的定时器

定时闹钟预览版EXE下载链接:https://files.cnblogs.com/files/rekent/ReadytoRelax_jar.zip

30310
来自专栏Android 开发学习

JsBridge 源码分析

19430
来自专栏码农分享

4.1、苏宁百万级商品爬取 代码讲解 索引建立

Lucene是一款高性能的、可扩展的信息检索(IR)工具库。信息检索是指文档搜索、文档内信息搜索或者文档相关的元数据搜索等操作。

15630
来自专栏JackieZheng

Spring读书笔记——bean加载

我们的日常开发几乎离不开Spring,他为我们的开发带来了很大的便捷,那么Spring框架是如何做到方便他人的呢。今天就来说说bean如何被加载加载。 我们在x...

22290
来自专栏小灰灰

Batik渲染png图片异常的bug修复全程记录

batik是apache的一个开源项目,可以实现svg的渲染,后端借助它可以比较简单的实现图片渲染,当然和java一贯处理图片不太方便一样,使用起来也有不少坑

26270
来自专栏分布式系统进阶

Librdkafka的操作处理队列

31120
来自专栏Flutter入门

Weex是如何在Android客户端上跑起来的

Weex可以通过自己设计的DSL,书写.we文件或者.vue文件来开发界面,整个页面书写分成了3段,template、style、script,借鉴了成熟的MV...

49850
来自专栏码农阿宇

JustMock .NET单元测试利器(三)用JustMock测试你的应用程序

用JustMock测试你的应用程序 本主题将指导您通过几个简单的步骤来使用Telerik®JustMock轻松测试您的应用程序。您将理解一个简单的原理,称为Ar...

38170
来自专栏大内老A

在ASP.NET MVC中如何应用多个相同类型的ValidationAttribute?

ASP.NET MVC采用System.ComponentModel.DataAnnotations提供的元数据验证机制对Model实施验证,我们可以在Mode...

21750

扫码关注云+社区

领取腾讯云代金券