[bigdata@hadoop002 hbase]$ bin/hbase mapredcp
上图标记处为所需jar包
$ export HBASE_HOME=/opt/module/hbase $ export HADOOP_HOME=/opt/module/hadoop-2.7.2 $ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp` // 也可以直接这样 [bigdata@hadoop002 hbase]$ export HADOOP_CLASSPATH=`/opt/module/hbase/bin/hbase mapredcp`
配置完成后查看是否成功
export HBASE_HOME=/opt/module/hbase export HADOOP_HOME=/opt/module/hadoop-2.7.2
在hadoop-env.sh中配置:(注意:在for循环之后配)
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
– 案例一:统计Student表中有多少行数据
[bigdata@hadoop002 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
– 案例二:使用MapReduce将HDFS导入到HBase
[bigdata@hadoop002 datas]$ vim fruit.tsv 1001 Apple Red 1002 Pear Yellow 1003 Pineapple Yellow
hbase(main):001:0> create 'fruit','info'
[bigdata@hadoop002 datas]$ hadoop fs -mkdir /input_fruit/ [bigdata@hadoop002 datas]$ hadoop fs -put fruit.tsv /input_fruit/
[bigdata@hadoop002 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \ -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \ hdfs://hadoop002:9000/input_fruit
hbase(main):001:0> scan ‘fruit’
经过测试证明是没问题的
目标:将fruit表中的一部分数据,通过MR迁入到fruit_mr表中。
package com.buwenbuhuo.hbase.mr; import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; /** * @author 卜温不火 * @create 2020-05-12 19:32 * com.buwenbuhuo.hbase.mr - the name of the target package where the new class or interface will be created. * hbase0512 - the name of the current project. */ public class ReadMapper extends TableMapper<ImmutableBytesWritable,Put> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { Put put = new Put(key.copyBytes()); for (Cell cell : value.rawCells()) { if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ put.add(cell); } } context.write(key,put); } }
package com.buwenbuhuo.hbase.mr; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import java.io.IOException; /** * @author 卜温不火 * @create 2020-05-12 19:32 * com.buwenbuhuo.hbase.mr - the name of the target package where the new class or interface will be created. * hbase0512 - the name of the current project. */ public class WriteReducer extends TableReducer<ImmutableBytesWritable,Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for (Put value : values) { context.write(NullWritable.get(),value); } } }
package com.buwenbuhuo.hbase.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HRegionPartitioner; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; /** * @author 卜温不火 * @create 2020-05-12 19:32 * com.buwenbuhuo.hbase.mr - the name of the target package where the new class or interface will be created. * hbase0512 - the name of the current project. */ public class Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop002,hadoop003,hadoop004"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = Job.getInstance(); job.setJarByClass(Driver.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob( "fruit", scan, ReadMapper.class, ImmutableBytesWritable.class, Put.class, job ); job.setNumReduceTasks(100); TableMapReduceUtil.initTableReducerJob("fruit_mr",WriteReducer.class,job,HRegionPartitioner.class); job.waitForCompletion(true); } }
hbase(main):003:0> create 'fruit_mr','info'
[bigdata@hadoop002 hbase]$ hadoop jar hbase-0512-1.0-SNAPSHOT.jar com.buwenbuhuo.hbase.mr.Driver
hbase(main):005:0> scan 'fruit_mr'
目标:实现将HDFS中的数据写入到HBase表中。
package com.buwenbuhuo.hbase.mr2; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author 卜温不火 * @create 2020-05-12 22:41 * com.buwenbuhuo.hbase.mr2 - the name of the target package where the new class or interface will be created. * hbase0512 - the name of the current project. */ public class ReadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); if (split.length <= 3){ return; } Put put = new Put(Bytes.toBytes(split[0])); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(split[1])); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(split[2])); context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put); } }
package com.buwenbuhuo.hbase.mr2; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import java.io.IOException; /** * @author 卜温不火 * @create 2020-05-12 23:09 * com.buwenbuhuo.hbase.mr2 - the name of the target package where the new class or interface will be created. * hbase0512 - the name of the current project. */ public class WriteReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for (Put value : values){ context.write(NullWritable.get(),value); } } }
package com.buwenbuhuo.hbase.mr2; import com.buwenbuhuo.hbase.mr.WriteReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HRegionPartitioner; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; /** * @author 卜温不火 * @create 2020-05-12 23:09 * com.buwenbuhuo.hbase.mr2 - the name of the target package where the new class or interface will be created. * hbase0512 - the name of the current project. */ public class Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop002,hadoop003,hadoop004"); Job job = Job.getInstance(conf); job.setJarByClass(Driver.class); job.setMapperClass(ReadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); FileInputFormat.setInputPaths(job,new Path("/input_fruit")); job.setNumReduceTasks(10); TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteReducer.class,job, HRegionPartitioner.class); job.waitForCompletion(true); } }
[bigdata@hadoop002 hbase]$ hadoop jar hbase-0512-1.0-SNAPSHOT.jar com.buwenbuhuo.hbase.mr2.Driver
本次的分享就到这里了
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句