HBase Java编程示例

HelloWorld.zip

点击(此处)折叠或打开

  1. package elementary;
  2. import java.io.IOException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.List;
  7. import java.util.concurrent.atomic.AtomicInteger;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.TimeUnit;
  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.hbase.Cell;
  13. import org.apache.hadoop.hbase.HBaseConfiguration;
  14. import org.apache.hadoop.hbase.HColumnDescriptor;
  15. import org.apache.hadoop.hbase.HTableDescriptor;
  16. import org.apache.hadoop.hbase.MasterNotRunningException;
  17. import org.apache.hadoop.hbase.TableName;
  18. import org.apache.hadoop.hbase.ZooKeeperConnectionException;
  19. import org.apache.hadoop.hbase.client.Delete;
  20. import org.apache.hadoop.hbase.client.Get;
  21. import org.apache.hadoop.hbase.client.Admin;
  22. import org.apache.hadoop.hbase.client.BufferedMutator;
  23. import org.apache.hadoop.hbase.client.BufferedMutatorParams;
  24. import org.apache.hadoop.hbase.client.Connection;
  25. import org.apache.hadoop.hbase.client.ConnectionFactory;
  26. import org.apache.hadoop.hbase.client.Table;
  27. import org.apache.hadoop.hbase.client.Put;
  28. import org.apache.hadoop.hbase.client.Result;
  29. import org.apache.hadoop.hbase.client.ResultScanner;
  30. import org.apache.hadoop.hbase.client.Scan;
  31. import org.apache.hadoop.hbase.util.Bytes;
  32. import org.apache.hadoop.util.ThreadUtil;
  33. public class HelloWorld {
  34. private static Configuration conf = null;
  35. private static Connection conn = null;
  36. private static Admin admin = null;
  37. public static AtomicInteger count = new AtomicInteger();
  38. /**
  39.      * 初始化配置
  40.      */
  41. static {
  42.         conf = HBaseConfiguration.create();
  43. //如果沒有配置文件,一定要記得手動宣告
  44.         conf.set("hbase.zookeeper.quorum", "10.148.137.143");
  45.         conf.set("hbase.zookeeper.property.clientPort", "2181");
  46. }
  47. static {
  48. try {
  49.          conn = ConnectionFactory.createConnection();
  50.      admin = conn.getAdmin();
  51. } catch (IOException e) {
  52.      e.printStackTrace();
  53. }
  54. }
  55. static public class MyThread extends Thread
  56. {
  57. int _start;
  58. String _tablename;
  59. Connection conn;
  60. //BufferedMutator table;
  61. Table table;
  62. public MyThread(int start, String tablename) {
  63.             _start = start;
  64.             _tablename = tablename;
  65. }
  66. public void run() {
  67. String tablename = _tablename;
  68. Thread current = Thread.currentThread();
  69. long thread_id = current.getId();
  70. System.out.printf("thread[%d] run\n", thread_id);
  71. try {
  72.                 conn = ConnectionFactory.createConnection();
  73. //BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename));
  74. //params.writeBufferSize(1024 * 4);
  75. //table = conn.getBufferedMutator(params);
  76. table = conn.getTable(TableName.valueOf(tablename));
  77. for (int j=_start; j<100; ++j) {
  78. for (int i=0; i<10000000; ++i) {
  79. // zkb_0_0
  80. String zkb = "zkb_" + String.valueOf(_start) + "_" + String.valueOf(i);
  81. Put put = new Put(Bytes.toBytes(zkb));
  82. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field1"),Bytes.toBytes(String.valueOf(i+0)));
  83. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field2"),Bytes.toBytes(String.valueOf(i+1)));
  84. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field3"),Bytes.toBytes(String.valueOf(i+2)));
  85. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field4"),Bytes.toBytes(String.valueOf(i+3)));
  86. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field5"),Bytes.toBytes(String.valueOf(i+4)));
  87. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field6"),Bytes.toBytes(String.valueOf(i+5)));
  88. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field7"),Bytes.toBytes(String.valueOf(i+6)));
  89. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field8"),Bytes.toBytes(String.valueOf(i+7)));
  90. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field9"),Bytes.toBytes(String.valueOf(i+8)));
  91. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field10"),Bytes.toBytes(String.valueOf(i+9)));
  92. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field11"),Bytes.toBytes(String.valueOf(i+10)));
  93. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field12"),Bytes.toBytes(String.valueOf(i+11)));
  94. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field13"),Bytes.toBytes(String.valueOf(i+12)));
  95. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field14"),Bytes.toBytes(String.valueOf(i+13)));
  96. put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field15"),Bytes.toBytes(String.valueOf(i+14)));
  97. //table.mutate(put);
  98. table.put(put);
  99. int m = HelloWorld.count.incrementAndGet();
  100. if (m % 10000 == 0) {
  101. Date dt = new Date();
  102. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss aa");
  103. String now = sdf.format(dt);
  104. System.out.printf("[%s] thread[%d] m=%d, j=%d, i=%d\n", now, thread_id, m, j, i);
  105. }
  106. }
  107. }
  108. System.out.printf("thread[%d] over\n", thread_id);
  109. }
  110. catch (Exception e) {
  111.                 e.printStackTrace();
  112. }
  113. }
  114. }
  115. /**
  116.      * 建立表格
  117.      * @param tablename
  118.      * @param cfs
  119.      */
  120. public static void createTable(String tablename, String[] cfs){
  121. try {
  122. if (admin.tableExists(TableName.valueOf(tablename))) {
  123. System.out.println("table already exists!");
  124. } else {
  125.                 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tablename));
  126. for (int i = 0; i < cfs.length; i++) {
  127.                     HColumnDescriptor desc = new HColumnDescriptor(cfs[i]);
  128.                     desc.setMaxVersions(3650);
  129.                     tableDesc.addFamily(desc);
  130. }
  131. byte[][] splitKeys = new byte[][] {
  132.                     Bytes.toBytes("zkb_0_0"),
  133.                     Bytes.toBytes("zkb_10_0"),
  134.                     Bytes.toBytes("zkb_20_0"),
  135.                     Bytes.toBytes("zkb_30_0"),
  136.                     Bytes.toBytes("zkb_40_0"),
  137.                     Bytes.toBytes("zkb_50_0"),
  138.                     Bytes.toBytes("zkb_60_0"),
  139.                     Bytes.toBytes("zkb_70_0"),
  140.                     Bytes.toBytes("zkb_80_0"),
  141.                     Bytes.toBytes("zkb_90_0"),
  142.                     Bytes.toBytes("zkb_100_0")
  143. };
  144.                 admin.createTable(tableDesc, splitKeys);
  145.                 admin.close();
  146. System.out.println("create table " + tablename + " ok.");
  147. }
  148. } catch (MasterNotRunningException e) {
  149.             e.printStackTrace();
  150. } catch (ZooKeeperConnectionException e) {
  151.             e.printStackTrace();
  152. } catch (IOException e) {
  153.             e.printStackTrace();
  154. }
  155. }
  156. /**
  157.      * 刪除表格
  158.      * @param tablename
  159.      */
  160. public static void deleteTable(String tablename){
  161. try {
  162. //Connection conn = ConnectionFactory.createConnection();
  163. //Admin admin = conn.getAdmin();     
  164.             admin.disableTable(TableName.valueOf(tablename));
  165.             admin.deleteTable(TableName.valueOf(tablename));
  166. System.out.println("delete table " + tablename + " ok.");
  167. } catch (IOException e) {
  168.             e.printStackTrace();
  169. }
  170. }
  171. /**
  172.      * 刪除一筆資料
  173.      * @param tableName
  174.      * @param rowKey
  175.      */
  176. public static void delRecord (String tableName, String rowKey){
  177. try {
  178. Table table = conn.getTable(TableName.valueOf(tableName));
  179. List<Delete> list = new ArrayList<Delete>();
  180. Delete del = new Delete(rowKey.getBytes());
  181. list.add(del);
  182. table.delete(list);
  183. System.out.println("del recored " + rowKey + " ok.");
  184. } catch (IOException e) {
  185.             e.printStackTrace();
  186. }
  187. }
  188. /**
  189.      * 取得一筆資料
  190.      * @param tableName
  191.      * @param rowKey
  192.      */
  193. public static void getOneRecord (String tableName, String rowKey){
  194. try {
  195. Table table = conn.getTable(TableName.valueOf(tableName));
  196. Get get = new Get(rowKey.getBytes());
  197. Result rs = table.get(get);
  198. List<Cell> list = rs.listCells();
  199. for(Cell cell:list){
  200. System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  201. System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  202. System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  203. System.out.print(cell.getTimestamp() + " " );
  204. System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  205. System.out.println("");
  206. }
  207. } catch (IOException e) {
  208.             e.printStackTrace();
  209. }
  210. }
  211. /**
  212.      * 取得所有資料
  213.      * @param tableName
  214.      */
  215. public static void getAllRecord (String tableName) {
  216. try{
  217. //Connection conn = ConnectionFactory.createConnection();
  218. Table table = conn.getTable(TableName.valueOf(tableName));
  219.             Scan scan = new Scan();
  220.             ResultScanner resultscanner = table.getScanner(scan);
  221. for(Result rs:resultscanner){
  222. List<Cell> list = rs.listCells();
  223. for(Cell cell:list){
  224. System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  225. System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  226. System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  227. System.out.print(cell.getTimestamp() + " " );
  228. System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  229. System.out.println("");
  230. }
  231. }
  232. } catch (IOException e){
  233.             e.printStackTrace();
  234. }
  235. }
  236. /**
  237.      * 取得Family清單
  238.      * @param tableName
  239.      * @return
  240.      */
  241. public static ArrayList<String> getAllFamilyName(String tableName) {
  242. ArrayList<String> familyname_list = new ArrayList<String>();
  243. try{
  244. //Connection conn = ConnectionFactory.createConnection();
  245. Table table = conn.getTable(TableName.valueOf(tableName));
  246.             HTableDescriptor htabledescriptor = table.getTableDescriptor();
  247.             HColumnDescriptor[] hdlist = htabledescriptor.getColumnFamilies();
  248. for(int i=0;i<hdlist.length;i++){
  249.                 HColumnDescriptor hd = hdlist[i];
  250.                 familyname_list.add(hd.getNameAsString());
  251. }
  252. } catch (IOException e){
  253.             e.printStackTrace();
  254. }
  255. return familyname_list;
  256. }
  257. // java -cp HelloWorld.jar:`ls lib/*.jar|awk '{printf("%s:", $0)}'` elementary.HelloWorld 5
  258. public static void main(String[] args) {
  259. System.out.println("HelloWorldX");
  260. if (args.length > 0)
  261. System.out.println(args[0]);
  262. int start = 0;
  263. if (args.length > 1)
  264. start = Integer.valueOf(args[1]);
  265. if (start < 0)
  266. start = 0;
  267. int num_threads = 16;
  268. if (args.length > 2)
  269.             num_threads = Integer.valueOf(args[2]);
  270. try {
  271. String tablename = "scores";
  272. String[] familys = {"grade", "course"};
  273.             HelloWorld.createTable(tablename, familys);
  274. //ExecutorService thread_pool = Executors.newSingleThreadExecutor();
  275. ExecutorService thread_pool = Executors.newFixedThreadPool(num_threads);
  276. Thread[] pool = new HelloWorld.MyThread[80];
  277. for (int i=0; i<pool.length; ++i) {
  278.                 pool[i] = new HelloWorld.MyThread(i, tablename);
  279.                 thread_pool.execute(pool[i]);
  280. }
  281.             thread_pool.shutdown();
  282. System.out.println("over");
  283. }
  284. catch (Exception e) {
  285.             e.printStackTrace();
  286. }
  287. }
  288. }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

自定义kafka streams的processor

本文来解析一下kafka streams的KStreamBuilder以及举例如何自定义kafka streams的processor

1312
来自专栏码匠的流水账

Flux OOM实例

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxSink.java

1111
来自专栏10km的专栏

thrift:返回null的解决办法

最的项目用到swift:thrift做RPC框架,开始也没有了解太深,就开始干了,今天开始测试了,发现thrift居然不允许服务接口返回null。跟踪源码到下面...

4156
来自专栏Ceph对象存储方案

查询bucket已用量脚本-python

目前仅支持ceph的s3方案,具体配置看说明 # -*- coding: utf-8 -*- import requests import json from ...

2959
来自专栏码匠的流水账

聊聊storm的LinearDRPCTopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder...

1243
来自专栏码匠的流水账

聊聊openmessaging-java

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/Pro...

2761
来自专栏码匠的流水账

聊聊storm的maxSpoutPending

storm-2.0.0/storm-client/src/jvm/org/apache/storm/Config.java

2445
来自专栏码匠的流水账

聊聊kafka 0.8 ConsumerFetcherManager的MaxLag指标

本文主要研究一下kafka0.8.2.2版本中ConsumerFetcherManager的MaxLag指标的统计。

1121
来自专栏码匠的流水账

聊聊storm的LinearDRPCTopologyBuilder

storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder...

872
来自专栏码匠的流水账

聊聊storm的ICommitterTridentSpout

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/ICommitterTridentSp...

892

扫码关注云+社区

领取腾讯云代金券