引言
Hadoop提供的HDFS布式文件存储系统,提供了基于thrift的客户端访问支持,但是因为Thrift自身的访问特点,在高并发的访问情况下,thrift自身结构可能将会成为HDFS文件存储系统的一个性能瓶颈。我们先来看一下一不使用Thrfit方式访问HDFS文件系统的业务流程。
一、HDFS文件读取流程
流程说明:
二、HDFS文件写入流程
流程说明:
三、关键词
HDFSClient通过文件IO操作最终实现是通过直接访问DataNode进行。
四、Thrift的访问流程:猜测版
流程说明:
1.ThriftClient客户端将操作命令传给ThriftServer。
2.ThriftServer调用HDFSClient接口API实现HDFS读写操作,操作流程如二和三所示。
五、疑问
与DataNode发生数据交换的到底是ThriftServer还是ThriftClient,如果是ThriftServer,那么多个ThriftClient并行访问时,ThriftServer必将成为HDFS访问的性能瓶颈;如果是ThriftClient直接访问DataNode,那么理论依据何在呢?
六、示例程序
下面是一个基于Thrift实现的HDFS客户端程序,实现了文件的访问和创建和读取
1 // HdfsDemo.cpp : Defines the entry point for the console application.
2 //
3
4 #include "stdafx.h"
5 #include <iostream>
6 #include <string>
7 #include <boost/lexical_cast.hpp>
8 #include <protocol/TBinaryProtocol.h>
9 #include <transport/TSocket.h>
10 #include <transport/TTransportUtils.h>
11 #include "ThriftHadoopFileSystem.h"
12
13 #ifndef _WIN32_WINNT
14 #define _WIN32_WINNT 0x0500
15 #endif
16 using namespace std;
17 using namespace apache::thrift;
18 using namespace apache::thrift::protocol;
19 using namespace apache::thrift::transport;
20
21 int _tmain(int argc, _TCHAR* argv[])
22 {
23 if (argc < 3)
24 {
25 std::cerr << "Invalid arguments!\n" << "Usage: DemoClient host port" << std::endl;
26 //return -1;
27 }
28 boost::shared_ptr<TTransport> socket(new TSocket("192.168.230.133", 55952));//boost::lexical_cast<int>(argv[2])));
29 boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
30 boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
31 ThriftHadoopFileSystemClient client(protocol);
32 try
33 {
34 transport->open();
35 Pathname path;
36 //01_create directory
37 path.__set_pathname("/user/hadoop");
38 if(client.exists(path) == true)
39 {
40 printf("path is exists.\r\n");
41 }
42 else
43 {
44 printf("path is not exists.");
45 //return 0;
46 }
47 //02_put file
48 Pathname filepath;
49 filepath.__set_pathname("/user/hadoop/in/test1.txt");
50 /*
51 FILE* localfile = fopen("E:\\project\\Hadoop\\HdfsDemo\\Debug\\hello.txt","rb");
52 if (localfile == NULL)
53 {
54 transport->close();
55 return 0;
56 }
57 ThriftHandle hdl;
58 client.create(hdl,filepath);
59 while (true)
60 {
61 char data[1024];
62 memset(data,0x00,sizeof(data));
63 size_t Num = fread(data,1,1024,localfile);
64 if (Num <= 0)
65 {
66 break;
67 }
68 client.write(hdl,data);
69 }
70 fclose(localfile);
71 client.close(hdl);
72 */
73 //03_get file
74 /*
75 ThriftHandle hd2;
76 FileStatus stat1;
77 client.open(hd2,filepath);
78 client.stat(stat1,filepath);
79 int index = 0;
80 while(true)
81 {
82 string data;
83 if (stat1.length <= index)
84 {
85 break;
86 }
87 client.read(data,hd2,index,1024);
88
89 index += data.length();
90 printf("==%s\r\n",data.c_str());
91 }
92 client.close(hd2);
93 */
94
95 //04_list files
96 std::vector<FileStatus> vFileStatus;
97 client.listStatus(vFileStatus,path);
98 for (int i=0;i<vFileStatus.size();i++)
99 {
100 printf("i=%d file=%s\r\n",i,vFileStatus[i].path.c_str());
101 }
102 transport->close();
103 } catch (const TException &tx) {
104 std::cerr << "ERROR: " << tx.what() << std::endl;
105 }
106 getchar();
107 return 0;
108 }
七、源码分析
1.文件创建:
1 /**
2 * Create a file and open it for writing, delete file if it exists
3 */
4 public ThriftHandle createFile(Pathname path,
5 short mode,
6 boolean overwrite,
7 int bufferSize,
8 short replication,
9 long blockSize) throws ThriftIOException {
10 try {
11 now = now();
12 HadoopThriftHandler.LOG.debug("create: " + path +
13 " permission: " + mode +
14 " overwrite: " + overwrite +
15 " bufferSize: " + bufferSize +
16 " replication: " + replication +
17 " blockSize: " + blockSize);
18 FSDataOutputStream out = fs.create(new Path(path.pathname),
19 new FsPermission(mode),
20 overwrite,
21 bufferSize,
22 replication,
23 blockSize,
24 null); // progress
25 long id = insert(out);
26 ThriftHandle obj = new ThriftHandle(id);
27 HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
28 return obj;
29 } catch (IOException e) {
30 throw new ThriftIOException(e.getMessage());
31 }
32 }
ThriftHandle的两端到底是谁呢?是ThriftClient和DataNode?还是ThriftServer与DataNode?
2.文件写入
1 public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
2 try {
3 now = now();
4 HadoopThriftHandler.LOG.debug("write: " + tout.id);
5 FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
6 byte[] tmp = data.getBytes("UTF-8");
7 out.write(tmp, 0, tmp.length);
8 HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
9 return true;
10 } catch (IOException e) {
11 throw new ThriftIOException(e.getMessage());
12 }
13 }
写入时依赖的还是ThriftHandle?
3.文件读取
1 /**
2 * read from a file
3 */
4 public String read(ThriftHandle tout, long offset,
5 int length) throws ThriftIOException {
6 try {
7 now = now();
8 HadoopThriftHandler.LOG.debug("read: " + tout.id +
9 " offset: " + offset +
10 " length: " + length);
11 FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
12 if (in.getPos() != offset) {
13 in.seek(offset);
14 }
15 byte[] tmp = new byte[length];
16 int numbytes = in.read(offset, tmp, 0, length);
17 HadoopThriftHandler.LOG.debug("read done: " + tout.id);
18 return new String(tmp, 0, numbytes, "UTF-8");
19 } catch (IOException e) {
20 throw new ThriftIOException(e.getMessage());
21 }
22 }
八、遗留问题
ThriftHandle可以看做是Socket连接句柄,但是他的两端到底是谁呢?如果是ThriftClient代表的客户端则一切OK,那么我该如何证明呢?存疑待考!