首页
学习
活动
专区
圈层
工具
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往
清单首页Java文章详情

高效读取大数据文本文件(上亿行数据)

一.前言

本文是对大数据文本文件读取(按行读取)的优化,目前常规的方案(限于JDK)有三种,第一种LineNumberReader,第二种RandomAccessFile,第三种是内存映射文件(详见http://sgq0085.iteye.com/blog/1318622)在RandomAccessFile基础上调用getChannel().map(...)。

1.LineNumberReader

按行读取,只能从第一行向后遍历,到需要读取的行时开始读入,直到完成;在我的测试用例中,读取1000W行数据每次5万行,用时93秒,效率实测比RandomAccessFile要高,但读取一亿跳数据时效率太低了(因为每次都要从头遍历),因为测试时超过1个小时,放弃测试;

2.RandomAccessFile

实际不适用于这种大数据读取,RandomAccessFile是为了磁盘文件的随机访问,所以效率很低,1000w行测试时用时140秒,一亿行数据测试用时1438秒但由于可以通过getFilePointer方法记录位置,并通过seek方法指定读取位置,所以从理论上比较适用这种大数据按行读取的场景;

RandomAccessFile只能按照8859_1这种方法读取,所以需要对内容重新编码,方法如下

Java代码

  1. new String(pin.getBytes("8859_1"), "")

3.内存映射文件

由于每行数据大小不同,内存映射文件在这种情况下不适用,其他情况请参考我的博客(详见http://sgq0085.iteye.com/blog/1318622)

二.解决方案

如果在RandomAccessFile基础上,整合内部缓冲区,效率会有提高,测试过程中1000w行数据用时1秒,1亿行数据用时103(比1438秒快了13倍左右)

BufferedRandomAccessFile

网上已经有实现,代码如下:

Java代码

  1. package com.gqshao.file.io;
  2. import java.io.File;
  3. import java.io.FileNotFoundException;
  4. import java.io.IOException;
  5. import java.io.RandomAccessFile;
  6. import java.util.Arrays;
  7. public class BufferedRandomAccessFile extends RandomAccessFile {
  8. static final int LogBuffSz_ = 16; // 64K buffer
  9. public static final int BuffSz_ = (1 << LogBuffSz_);
  10. static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
  11. private String path_;
  12. /*
  13. * This implementation is based on the buffer implementation in Modula-3's
  14. * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
  15. */
  16. private boolean dirty_; // true iff unflushed bytes exist
  17. private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately
  18. private long curr_; // current position in file
  19. private long lo_, hi_; // bounds on characters in "buff"
  20. private byte[] buff_; // local buffer
  21. private long maxHi_; // this.lo + this.buff.length
  22. private boolean hitEOF_; // buffer contains last file block?
  23. private long diskPos_; // disk position
  24. public BufferedRandomAccessFile(File file, String mode) throws IOException {
  25. this(file, mode, 0);
  26. }
  27. public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {
  28. super(file, mode);
  29. path_ = file.getAbsolutePath();
  30. this.init(size);
  31. }
  32. /**
  33. * Open a new <code>BufferedRandomAccessFile</code> on the file named
  34. * <code>name</code> in mode <code>mode</code>, which should be "r" for
  35. * reading only, or "rw" for reading and writing.
  36. */
  37. public BufferedRandomAccessFile(String name, String mode) throws IOException {
  38. this(name, mode, 0);
  39. }
  40. public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {
  41. super(name, mode);
  42. path_ = name;
  43. this.init(size);
  44. }
  45. private void init(int size) {
  46. this.dirty_ = false;
  47. this.lo_ = this.curr_ = this.hi_ = 0;
  48. this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
  49. this.maxHi_ = (long) BuffSz_;
  50. this.hitEOF_ = false;
  51. this.diskPos_ = 0L;
  52. }
  53. public String getPath() {
  54. return path_;
  55. }
  56. public void sync() throws IOException {
  57. if (syncNeeded_) {
  58. flush();
  59. getChannel().force(true);
  60. syncNeeded_ = false;
  61. }
  62. }
  63. // public boolean isEOF() throws IOException
  64. // {
  65. // assert getFilePointer() <= length();
  66. // return getFilePointer() == length();
  67. // }
  68. public void close() throws IOException {
  69. this.flush();
  70. this.buff_ = null;
  71. super.close();
  72. }
  73. /**
  74. * Flush any bytes in the file's buffer that have not yet been written to
  75. * disk. If the file was created read-only, this method is a no-op.
  76. */
  77. public void flush() throws IOException {
  78. this.flushBuffer();
  79. }
  80. /* Flush any dirty bytes in the buffer to disk. */
  81. private void flushBuffer() throws IOException {
  82. if (this.dirty_) {
  83. if (this.diskPos_ != this.lo_)
  84. super.seek(this.lo_);
  85. int len = (int) (this.curr_ - this.lo_);
  86. super.write(this.buff_, 0, len);
  87. this.diskPos_ = this.curr_;
  88. this.dirty_ = false;
  89. }
  90. }
  91. /*
  92. * Read at most "this.buff.length" bytes into "this.buff", returning the
  93. * number of bytes read. If the return result is less than
  94. * "this.buff.length", then EOF was read.
  95. */
  96. private int fillBuffer() throws IOException {
  97. int cnt = 0;
  98. int rem = this.buff_.length;
  99. while (rem > 0) {
  100. int n = super.read(this.buff_, cnt, rem);
  101. if (n < 0)
  102. break;
  103. cnt += n;
  104. rem -= n;
  105. }
  106. if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) {
  107. // make sure buffer that wasn't read is initialized with -1
  108. Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
  109. }
  110. this.diskPos_ += cnt;
  111. return cnt;
  112. }
  113. /*
  114. * This method positions <code>this.curr</code> at position <code>pos</code>.
  115. * If <code>pos</code> does not fall in the current buffer, it flushes the
  116. * current buffer and loads the correct one.<p>
  117. *
  118. * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
  119. * is at or past the end-of-file, which can only happen if the file was
  120. * opened in read-only mode.
  121. */
  122. public void seek(long pos) throws IOException {
  123. if (pos >= this.hi_ || pos < this.lo_) {
  124. // seeking outside of current buffer -- flush and read
  125. this.flushBuffer();
  126. this.lo_ = pos & BuffMask_; // start at BuffSz boundary
  127. this.maxHi_ = this.lo_ + (long) this.buff_.length;
  128. if (this.diskPos_ != this.lo_) {
  129. super.seek(this.lo_);
  130. this.diskPos_ = this.lo_;
  131. }
  132. int n = this.fillBuffer();
  133. this.hi_ = this.lo_ + (long) n;
  134. } else {
  135. // seeking inside current buffer -- no read required
  136. if (pos < this.curr_) {
  137. // if seeking backwards, we must flush to maintain V4
  138. this.flushBuffer();
  139. }
  140. }
  141. this.curr_ = pos;
  142. }
  143. public long getFilePointer() {
  144. return this.curr_;
  145. }
  146. public long length() throws IOException {
  147. // max accounts for the case where we have written past the old file length, but not yet flushed our buffer
  148. return Math.max(this.curr_, super.length());
  149. }
  150. public int read() throws IOException {
  151. if (this.curr_ >= this.hi_) {
  152. // test for EOF
  153. // if (this.hi < this.maxHi) return -1;
  154. if (this.hitEOF_)
  155. return -1;
  156. // slow path -- read another buffer
  157. this.seek(this.curr_);
  158. if (this.curr_ == this.hi_)
  159. return -1;
  160. }
  161. byte res = this.buff_[(int) (this.curr_ - this.lo_)];
  162. this.curr_++;
  163. return ((int) res) & 0xFF; // convert byte -> int
  164. }
  165. public int read(byte[] b) throws IOException {
  166. return this.read(b, 0, b.length);
  167. }
  168. public int read(byte[] b, int off, int len) throws IOException {
  169. if (this.curr_ >= this.hi_) {
  170. // test for EOF
  171. // if (this.hi < this.maxHi) return -1;
  172. if (this.hitEOF_)
  173. return -1;
  174. // slow path -- read another buffer
  175. this.seek(this.curr_);
  176. if (this.curr_ == this.hi_)
  177. return -1;
  178. }
  179. len = Math.min(len, (int) (this.hi_ - this.curr_));
  180. int buffOff = (int) (this.curr_ - this.lo_);
  181. System.arraycopy(this.buff_, buffOff, b, off, len);
  182. this.curr_ += len;
  183. return len;
  184. }
  185. public void write(int b) throws IOException {
  186. if (this.curr_ >= this.hi_) {
  187. if (this.hitEOF_ && this.hi_ < this.maxHi_) {
  188. // at EOF -- bump "hi"
  189. this.hi_++;
  190. } else {
  191. // slow path -- write current buffer; read next one
  192. this.seek(this.curr_);
  193. if (this.curr_ == this.hi_) {
  194. // appending to EOF -- bump "hi"
  195. this.hi_++;
  196. }
  197. }
  198. }
  199. this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
  200. this.curr_++;
  201. this.dirty_ = true;
  202. syncNeeded_ = true;
  203. }
  204. public void write(byte[] b) throws IOException {
  205. this.write(b, 0, b.length);
  206. }
  207. public void write(byte[] b, int off, int len) throws IOException {
  208. while (len > 0) {
  209. int n = this.writeAtMost(b, off, len);
  210. off += n;
  211. len -= n;
  212. this.dirty_ = true;
  213. syncNeeded_ = true;
  214. }
  215. }
  216. /*
  217. * Write at most "len" bytes to "b" starting at position "off", and return
  218. * the number of bytes written.
  219. */
  220. private int writeAtMost(byte[] b, int off, int len) throws IOException {
  221. if (this.curr_ >= this.hi_) {
  222. if (this.hitEOF_ && this.hi_ < this.maxHi_) {
  223. // at EOF -- bump "hi"
  224. this.hi_ = this.maxHi_;
  225. } else {
  226. // slow path -- write current buffer; read next one
  227. this.seek(this.curr_);
  228. if (this.curr_ == this.hi_) {
  229. // appending to EOF -- bump "hi"
  230. this.hi_ = this.maxHi_;
  231. }
  232. }
  233. }
  234. len = Math.min(len, (int) (this.hi_ - this.curr_));
  235. int buffOff = (int) (this.curr_ - this.lo_);
  236. System.arraycopy(b, off, this.buff_, buffOff, len);
  237. this.curr_ += len;
  238. return len;
  239. }
  240. }

三.测试

1.FileUtil

用于封装三种方案(LineNumberReader、RandomAccessFile、BufferedRandomAccessFile)的文件读取

Java代码

  1. package com.gqshao.file.util;
  2. import com.google.common.collect.Lists;
  3. import com.google.common.collect.Maps;
  4. import com.gqshao.file.io.BufferedRandomAccessFile;
  5. import org.apache.commons.io.IOUtils;
  6. import org.apache.commons.lang3.StringUtils;
  7. import java.io.*;
  8. import java.util.List;
  9. import java.util.Map;
  10. public class FileUtil {
  11. /**
  12. * 通过BufferedRandomAccessFile读取文件,推荐
  13. *
  14. * @param file 源文件
  15. * @param encoding 文件编码
  16. * @param pos 偏移量
  17. * @param num 读取量
  18. * @return pins文件内容,pos当前偏移量
  19. */
  20. public static Map<String, Object> BufferedRandomAccessFileReadLine(File file, String encoding, long pos, int num) {
  21. Map<String, Object> res = Maps.newHashMap();
  22. List<String> pins = Lists.newArrayList();
  23. res.put("pins", pins);
  24. BufferedRandomAccessFile reader = null;
  25. try {
  26. reader = new BufferedRandomAccessFile(file, "r");
  27. reader.seek(pos);
  28. for (int i = 0; i < num; i++) {
  29. String pin = reader.readLine();
  30. if (StringUtils.isBlank(pin)) {
  31. break;
  32. }
  33. pins.add(new String(pin.getBytes("8859_1"), encoding));
  34. }
  35. res.put("pos", reader.getFilePointer());
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. } finally {
  39. IOUtils.closeQuietly(reader);
  40. }
  41. return res;
  42. }
  43. /**
  44. * 通过RandomAccessFile读取文件,能出来大数据文件,效率低
  45. *
  46. * @param file 源文件
  47. * @param encoding 文件编码
  48. * @param pos 偏移量
  49. * @param num 读取量
  50. * @return pins文件内容,pos当前偏移量
  51. */
  52. public static Map<String, Object> readLine(File file, String encoding, long pos, int num) {
  53. Map<String, Object> res = Maps.newHashMap();
  54. List<String> pins = Lists.newArrayList();
  55. res.put("pins", pins);
  56. RandomAccessFile reader = null;
  57. try {
  58. reader = new RandomAccessFile(file, "r");
  59. reader.seek(pos);
  60. for (int i = 0; i < num; i++) {
  61. String pin = reader.readLine();
  62. if (StringUtils.isBlank(pin)) {
  63. break;
  64. }
  65. pins.add(new String(pin.getBytes("8859_1"), encoding));
  66. }
  67. res.put("pos", reader.getFilePointer());
  68. } catch (Exception e) {
  69. e.printStackTrace();
  70. } finally {
  71. IOUtils.closeQuietly(reader);
  72. }
  73. return res;
  74. }
  75. /**
  76. * 使用LineNumberReader读取文件,1000w行比RandomAccessFile效率高,无法处理1亿条数据
  77. *
  78. * @param file 源文件
  79. * @param encoding 文件编码
  80. * @param index 开始位置
  81. * @param num 读取量
  82. * @return pins文件内容
  83. */
  84. public static List<String> readLine(File file, String encoding, int index, int num) {
  85. List<String> pins = Lists.newArrayList();
  86. LineNumberReader reader = null;
  87. try {
  88. reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), encoding));
  89. int lines = 0;
  90. while (true) {
  91. String pin = reader.readLine();
  92. if (StringUtils.isBlank(pin)) {
  93. break;
  94. }
  95. if (lines >= index) {
  96. if (StringUtils.isNotBlank(pin)) {
  97. pins.add(pin);
  98. }
  99. }
  100. if (num == pins.size()) {
  101. break;
  102. }
  103. lines++;
  104. }
  105. } catch (Exception e) {
  106. e.printStackTrace();
  107. } finally {
  108. IOUtils.closeQuietly(reader);
  109. }
  110. return pins;
  111. }
  112. }

2.RandomAccessFileTest

测试方法,涉及到的randomFile只是一个掺杂中文的文本文件,可以自己随便写一个

Java代码

  1. package com.gqshao.file;
  2. import com.gqshao.file.util.FileUtil;
  3. import org.apache.commons.collections.CollectionUtils;
  4. import org.apache.commons.collections.MapUtils;
  5. import org.apache.commons.io.IOUtils;
  6. import org.junit.Test;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.io.*;
  10. import java.util.List;
  11. import java.util.Map;
  12. public class RandomAccessFileTest {
  13. private static final Logger logger = LoggerFactory.getLogger(RandomAccessFileTest.class);
  14. private static final String ENCODING = "UTF-8";
  15. private static final int NUM = 50000;
  16. private static File file = new File(ClassLoader.getSystemResource("").getPath() + File.separator + "test.txt");
  17. private static File randomFile = new File(ClassLoader.getSystemResource("").getPath() + File.separator + "RandomFile.txt");
  18. /**
  19. * 生成1000w随机文本文件
  20. */
  21. @Test
  22. public void makePin() {
  23. String prefix = "_$#";
  24. OutputStreamWriter out = null;
  25. try {
  26. out = new OutputStreamWriter(new FileOutputStream(file, true), ENCODING);
  27. // 在1500w里随机1000w数据
  28. for (int j = 0; j < 100000000; j++) {
  29. out.write(prefix + (int) (130000000 * Math.random()) + "\n");
  30. }
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. } finally {
  34. IOUtils.closeQuietly(out);
  35. }
  36. logger.info(file.getAbsolutePath());
  37. }
  38. /**
  39. * 测试RandomAccessFile读取文件
  40. */
  41. @Test
  42. public void testRandomAccessRead() {
  43. long start = System.currentTimeMillis();
  44. //
  45. logger.info(String.valueOf(file.exists()));
  46. long pos = 0L;
  47. while (true) {
  48. Map<String, Object> res = FileUtil.readLine(file, ENCODING, pos, NUM);
  49. // 如果返回结果为空结束循环
  50. if (MapUtils.isEmpty(res)) {
  51. break;
  52. }
  53. Object po = res.get("pins");
  54. List<String> pins = (List<String>) res.get("pins");
  55. if (CollectionUtils.isNotEmpty(pins)) {
  56. // logger.info(Arrays.toString(pins.toArray()));
  57. if (pins.size() < NUM) {
  58. break;
  59. }
  60. } else {
  61. break;
  62. }
  63. pos = (Long) res.get("pos");
  64. }
  65. logger.info(((System.currentTimeMillis() - start) / 1000) + "");
  66. }
  67. /**
  68. * 测试RandomAccessFile读取文件
  69. */
  70. @Test
  71. public void testBufferedRandomAccessRead() {
  72. long start = System.currentTimeMillis();
  73. //
  74. logger.info(String.valueOf(file.exists()));
  75. long pos = 0L;
  76. while (true) {
  77. Map<String, Object> res = FileUtil.BufferedRandomAccessFileReadLine(file, ENCODING, pos, NUM);
  78. // 如果返回结果为空结束循环
  79. if (MapUtils.isEmpty(res)) {
  80. break;
  81. }
  82. List<String> pins = (List<String>) res.get("pins");
  83. if (CollectionUtils.isNotEmpty(pins)) {
  84. // logger.info(Arrays.toString(pins.toArray()));
  85. if (pins.size() < NUM) {
  86. break;
  87. }
  88. } else {
  89. break;
  90. }
  91. pos = (Long) res.get("pos");
  92. }
  93. logger.info(((System.currentTimeMillis() - start) / 1000) + "");
  94. }
  95. /**
  96. * 测试普通读取文件
  97. */
  98. @Test
  99. public void testCommonRead() {
  100. long start = System.currentTimeMillis();
  101. logger.info(String.valueOf(randomFile.exists()));
  102. int index = 0;
  103. while (true) {
  104. List<String> pins = FileUtil.readLine(file, ENCODING, index, NUM);
  105. if (CollectionUtils.isNotEmpty(pins)) {
  106. // logger.info(Arrays.toString(pins.toArray()));
  107. if (pins.size() < NUM) {
  108. break;
  109. }
  110. } else {
  111. break;
  112. }
  113. index += NUM;
  114. }
  115. logger.info(((System.currentTimeMillis() - start) / 1000) + "");
  116. }
  117. }
下一篇
举报
领券
首页
学习
活动
专区
圈层
工具