Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >hadoop读写hdfs和操作hbase,把hbase内容按group by排序

hadoop读写hdfs和操作hbase,把hbase内容按group by排序

作者头像
用户4415180
发布于 2022-06-23 06:12:40
发布于 2022-06-23 06:12:40
50000
代码可运行
举报
文章被收录于专栏:高并发高并发
运行总次数:0
代码可运行
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package org.ucas.hbase;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;

public class Hw1Grp2 {
	
	//hbase 表名
	private static final String TABLE_NAME = "Result";
       //列簇名
	private static final String COLMUN_FAMILY = "res";
	private HTable table;
    public HTable getTable() {
		return table;
	}
	public void setTable(HTable table) {
		this.table = table;
	}
	
        public BufferedReader readHdfs(String file) throws IOException, URISyntaxException{
		
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(file), conf);
		Path path = new Path(file);
		FSDataInputStream inStream = fs.open(path);
		BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
		return in;
	}

	public HTable createTable(String tableName) throws MasterNotRunningException,
              ZooKeeperConnectionException, IOException{
            Configuration configuration = HBaseConfiguration.create();
            HBaseAdmin hAdmin = new HBaseAdmin(configuration);
            if(hAdmin.tableExists(tableName)) {
               System.out.println("table is exists, delete exists table");
               hAdmin.disableTable(tableName);
               hAdmin.deleteTable(tableName);
            } else {
               System.out.println("table not exists");
            }
            HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
            HColumnDescriptor cf = new HColumnDescriptor(COLMUN_FAMILY);
            htd.addFamily(cf);
            hAdmin.createTable(htd);
            hAdmin.close();
            System.out.println("table create");
            return new HTable(configuration,tableName);
       }
       public void insert(String rowKey, String family, String qualifier, String value) throws IOException {
		Put put = new Put(rowKey.getBytes());
		put.add(family.getBytes(),qualifier.getBytes(),value.getBytes());
		table.put(put);
       }
	public void handleData(String file, int rowKey, Map<String, Integer> args) throws IOException, URISyntaxException {
		   String colStr = null;
		   BufferedReader buffer = readHdfs(file);
		   
		   //rowKey和count哈希表
		   Map<String, Integer> mapCount = new HashMap<String, Integer>();
		   
		   //rowKey 的某列sum哈希表
		   Map<String, Integer> mapSum = new HashMap<String, Integer>();
		   
		   //max哈希表
		   Map<String, Integer> mapMax = new HashMap<String, Integer>();
		   
		   //avg哈希表
		   Map<String, Float> mapAvg = new HashMap<String, Float>();
		   
		   //min哈希表
		   Map<String, Integer> mapMin = new HashMap<String, Integer>();
		   int maxCol = -1, avgCol = -1, sumCol = -1, minCol = -1, countCol = -1;
		   
		   //根据传进来的参数设置需要进行的聚合函数
		   if(args.containsKey("count")) {
			   countCol = args.get("count");
		   }
		   if(args.containsKey("avg")) {
			   avgCol = args.get("avg");
		   }
		   if(args.containsKey("max")) {
			   maxCol = args.get("max");
		   }
		   if(args.containsKey("sum")) {
			   sumCol = args.get("sum");
		   }
		   if(args.containsKey("min")) {
			   minCol = args.get("min");
		   }
		   //算出需要用到的聚合函数
		   String str;
		   while((str = buffer.readLine()) != null) {
			   String[] col = str.split("\\|");
			   if(mapCount.containsKey(col[rowKey])) {
					 mapCount.put(col[rowKey], mapCount.get(col[rowKey]) +1 );
			   } else {
					 mapCount.put(col[rowKey], 1);
			   }
			   if(sumCol != -1) {
				   if(mapSum.containsKey(col[rowKey])) {
					   mapSum.put(col[rowKey], mapSum.get(col[rowKey]) +Integer.parseInt(col[sumCol]) );
				   } else {
					   mapSum.put(col[rowKey], Integer.parseInt(col[sumCol]));
				   }
			   }
			   if(avgCol != -1) {
				   if(mapAvg.containsKey(col[rowKey])) {
					   mapAvg.put(col[rowKey], mapAvg.get(col[rowKey]) +Float.parseFloat(col[avgCol]) );
				   } else {
					   mapAvg.put(col[rowKey], Float.parseFloat(col[avgCol]));
				   }
			   }
			   if(maxCol != -1) {
				   if(mapMax.containsKey(col[rowKey])) {
					   if(Integer.parseInt(col[maxCol]) > mapMax.get(col[rowKey]))
					      mapMax.put(col[rowKey], Integer.parseInt(col[maxCol]));
				   } else {
					   mapMax.put(col[rowKey], Integer.parseInt(col[maxCol]));
				   }
			   }
			   if(minCol != -1) {
				   if(mapMin.containsKey(col[rowKey])) {
					   if(Integer.parseInt(col[minCol]) < mapMin.get(col[rowKey]))
					      mapMin.put(col[rowKey], Integer.parseInt(col[minCol]));
				   } else {
					   mapMin.put(col[rowKey], Integer.parseInt(col[minCol]));
				   }
			   }
		   }
		   //从hashmap中插入数据表
		   for(String key : mapCount.keySet()) {
                     if(countCol != -1) {
            	     colStr = "count";
            	       insert(key, "res", colStr, mapCount.get(key) + "");
	           }
			   if(avgCol != -1) {
				 colStr = "avg(R" + avgCol + ")";
				 mapAvg.put(key, (float)Math.round(mapAvg.get(key)/mapCount.get(key)*100)/100);
				 insert(key, "res", colStr, mapAvg.get(key) + "");
			   }
			   if(maxCol != -1) {
				 colStr = "max(R" + maxCol + ")";
				 insert(key, "res", colStr, mapMax.get(key) + "");
			   }
			   if(minCol != -1) {
				 colStr = "min(R" + minCol + ")";
				 insert(key, "res", colStr, mapMin.get(key) + "");
			   }
			   if(sumCol != -1) {
				 colStr = "sum(R" + sumCol + ")";
				 insert(key, "res", colStr, mapSum.get(key) + "");
			   }
		   }
		   System.out.println("handle data success");
	}
	public static void main(String[] args) throws IOException, URISyntaxException {
		/**
		 * 命令参数解析,解析出文件名,group by的列,需要求的聚合函数
		 */
		if(args.length != 3) {
			System.out.println("input args length error");
			System.exit(0);
		}
		String file = StringUtils.substringAfter(args[0], "=");
		if(file == null) {
			System.out.println("args error");
			System.exit(0);
		}
		String keyNum = StringUtils.substringAfter(args[1], "R");
		if(keyNum  == null) {
			System.out.println("args error");
			System.exit(0);
		}
		int rowKey = Integer.parseInt(keyNum);
		
		String colsName = StringUtils.substringAfter(args[2], ":");
		if(colsName == null) {
			System.out.println("args error");
			System.exit(0);
		}
		String[] cmdStr = colsName.split(",");
		Map<String, Integer> cmd = new HashMap<String, Integer>();
		for(int i = 0; i < cmdStr.length; i++) {
			if(!cmdStr[i].equals("count")) {
			    cmd.put(StringUtils.substringBefore(cmdStr[i], "("), Integer.parseInt(StringUtils.substringBetween(cmdStr[i],"R", ")")));
			} else {
				cmd.put(cmdStr[i], rowKey);
			}
		}
		System.out.println("file:" + file);
		for(String key : cmd.keySet()) {
			System.out.println(key + ":" + cmd.get(key));
		}
		Hw1Grp2 h = new Hw1Grp2();
		h.setTable(h.createTable(TABLE_NAME));
		h.handleData(file, rowKey, cmd);
		System.out.println("program is over");
	}
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-05-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
大数据应用之HBase数据插入性能优化实测教程
大家在使用HBase的过程中,总是面临性能优化的问题,本文从HBase客户端参数设置的角度,研究HBase客户端数据批量插入性能优化的问题。事实胜于雄辩,数据比理论更有说服力,基于此,作者设计了这么一个HBase数据插入性能优化实测实验,希望大家用自己的服务器跑出的结果,给自己一个值得信服的结论。
数据饕餮
2019/01/14
9100
文件倒排索引算法及其hadoop实现
什么是文件的倒排索引? 简单讲就是一种搜索引擎的算法。过倒排索引,可以根据单词快速获取包含这个单词的文档列表。倒排索引主要由两个部分组成:“单词”和对应出现的“倒排文件”。 详细解释有一篇博客说得挺好:http://blog.csdn.net/hguisu/article/details/7962350 MapReduce的设计思路 整个过程包含map、combiner、reduce三个阶段,它们各自对应的key和value类型如下表所示: InputKey Inpu
triplebee
2018/01/12
7390
文件倒排索引算法及其hadoop实现
Hbase API开发实例(Java版)
import java.io.IOException; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.util.Map; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.util.*; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.MasterNotRunningException; //import org.apache.hadoop.hbase.ZooKeeperConnectionException; public class HBaseHandler {     private static HBaseConfiguration conf = null;     /**      * 初始化配置      */     static {        //conf = HBaseConfiguration.create();          conf = new HBaseConfiguration();          conf.addResource("hbase-site.xml");     }     /**      * @param args      * @throws IOException      */     public static void main(String[] args) throws IOException {         // TODO Auto-generated method stub         System.out.println("Helloworld");         String[] cfs;         cfs = new String[1];         cfs[0] = "Hello";         createTable("Test",cfs);     }     /**      * 创建表操作      * @throws IOException      */     public static void createTable(String tablename, String[] cfs) throws IOException {         HBaseAdmin admin = new HBaseAdmin(conf);         if (admin.tableExists(tablename)) {
数据饕餮
2019/01/14
1.2K0
Hadoop学习笔记—15.HBase框架学习(基础实践篇)
  伪分布模式安装即在一台计算机上部署HBase的各个角色,HMaster、HRegionServer以及ZooKeeper都在一台计算机上来模拟。
Edison Zhou
2018/08/20
4330
Hadoop学习笔记—15.HBase框架学习(基础实践篇)
HBase篇--HBase操作Api和Java操作Hbase相关Api
Hbase shell启动命令窗口,然后再Hbase shell中对应的api命令如下。
LhWorld哥陪你聊算法
2018/09/13
1.7K0
HBase篇--HBase操作Api和Java操作Hbase相关Api
Hadoop——HBase配置、shell编程和api编程
点击下载 链接:https://pan.baidu.com/s/17r-mfTTYwrgLFh50xDVEvA 提取码:h25r
不愿意做鱼的小鲸鱼
2022/09/24
3920
Hadoop——HBase配置、shell编程和api编程
hadoop2-HBase的Java API操作
Hbase提供了丰富的Java API,以及线程池操作,下面我用线程池来展示一下使用Java API操作Hbase。
Hongten
2018/12/10
7710
hadoop2-HBase的Java API操作
spark操作hbase的两种方法
添加数据之前先 create table create 'student','cmf1','cmf2','cmf3' 1、RDD[(String,String)]类型添加 package com.xtd.hbase import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration} import org.apache.hadoop.hbase.io
静谧星空TEL
2021/04/27
9480
spark操作hbase的两种方法
Hadoop基础教程-第10章 HBase:Hadoop数据库(10.6 HBase API)
第10章 HBase:Hadoop数据库 10.6 HBase API (新特性) 本节所有代码可以从https://github.com/ihadron/hbase.git下载。 10.6.1 HB
程裕强
2018/01/02
2.4K0
Hadoop基础教程-第10章 HBase:Hadoop数据库(10.6 HBase API)
Win10 IDEA远程连接HBase
IDEA自带Maven,如果需要自己安装Maven可以参考安装Maven 创建项目,选择Maven,模板选择第一个maven-archetype-archetype
超级小的大杯柠檬水
2024/11/21
1080
Win10 IDEA远程连接HBase
大数据应用之Windows平台Hbase客户端Eclipse环境搭建-Java版
  大数据的场景下,NoSql型数据库的优势不言而喻,但是涉及NoSQL数据库的实际动手开发的东西多是Linux平台,大多语焉不详,至于Windows平台介绍的东西就更少了,而且大多无法运行。本文就Windows平台基于Eclipse搭建Hbase环境客户端开发环境做一个介绍。另外基于Thrift实现的Windows版本Hbase客户端库也做了封装,有需要的可以留言索取。
数据饕餮
2019/01/14
8740
Hbase入门篇03---Java API使用,HBase高可用配置和架构设计
因为缴费明细的数据记录非常庞大,该公司的信息部门决定使用HBase来存储这些数据。并且,他们希望能够通过Java程序来访问这些数据。
大忽悠爱学习
2023/05/23
8850
Hbase入门篇03---Java API使用,HBase高可用配置和架构设计
discuz论坛apache日志hadoop大数据分析项目:hive以及hbase是如何入库以及代码实现
about云discuz论坛apache日志hadoop大数据分析项目: 数据时如何导入hbase与hive的到了这里项目的基本核心功能已经完成。这里介绍一下hive以及hbase是如何入库以及代码实现。 首先我们将hbase与hive整合,详细参考 about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的 about云分析discuz论坛apache日志hadoop大数据项目:hive与hbase是如何整合使用的 整合完毕,我们就可以通过map
用户1410343
2018/03/27
8730
discuz论坛apache日志hadoop大数据分析项目:hive以及hbase是如何入库以及代码实现
Flume+Kafka+Storm+Hbase+HDSF+Poi整合
举例:这个网站www.hongten.com(当然这是一个我虚拟的电商网站),用户在这个网站里面可以有很多行为,比如注册,登录,查看,点击,双击,购买东西,加入购物车,添加记录,修改记录,删除记录,评论,登出等一系列我们熟悉的操作。这些操作都被记录在日志信息里面。我们要对日志信息进行分析。
Hongten
2018/12/28
7190
HBase新版本Java API编程实战及基本操作方法封装
我的HBase版本是0.98 首先说明一下,如果用eclipse操作hbase时,如果报Unknown host错误,找不到主机,是因为你没有配IP地址的映射 方法是 找到你的系统盘里面的C:\Windows\System32\drivers\etc下的hosts文件,打开,增加一个映射 加一个映射 192.168.52.140 master 话不多说,直接看代码,注释很详细 import java.io.IOException; import java.util.Arrays; imp
汤高
2018/01/11
2.4K0
JAVA使用HBASE常用方法
package HBaseTest; /** * Created by root on 11/11/22. */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.had
用户3003813
2018/09/06
8730
JAVA使用HBASE常用方法
HBase Java API 01:基础操作
---- HBase版本:1.2.6 1. HBaseUtil.java import java.io.IOException; import java.util.Date; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor
CoderJed
2018/09/13
6300
一脸懵逼学习HBase---基于HDFS实现的。(Hadoop的数据库,分布式的,大数据量的,随机的,实时的,非关系型数据库)
1:HBase官网网址:http://hbase.apache.org/ 2:HBase表结构:建表时,不需要指定表中的字段,只需要指定若干个列族,插入数据时,列族中可以存储任意多个列(即KEY-VA
别先生
2018/01/02
1.5K0
一脸懵逼学习HBase---基于HDFS实现的。(Hadoop的数据库,分布式的,大数据量的,随机的,实时的,非关系型数据库)
HBase客户端Write Buffer 介绍及设置
HBase客户端API提供了Write Buffer的方式,即批量提交一批Put对象到HBase服务端。本文将结合HBase相关源码,分析如何在实际项目中合理设置和使用它。
大鹅
2021/06/16
2.6K0
HBase学习
操作 HBase 所用的 jar 包,使用 Maven 导入,引入依赖 hbase-it,pom.xml 文件依赖部分如下:
Wizey
2018/08/30
8220
推荐阅读
相关推荐
大数据应用之HBase数据插入性能优化实测教程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验