假设我有一个很大的文件,我希望每一行都读100行并执行一次操作。(我想合并100行并发送rest请求)
在Java 7中,我会做如下的事情。
try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
String line;
int count = 0;
List<String> list = new ArrayList<>();
while ((line = br.readLine()) != null) {
list.add(line);
count++;
if (count % 100 == 0) {
//do the operation on list
list = new ArrayList();
}
}
} catch (IOException e) {
e.printStackTrace();
}这里有什么可以利用Java 8流的东西吗?我知道我们可以这样做,但是它在每条线上运行,而不是100行。所以我认为foreach不是这里的选择。
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
stream.forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}发布于 2017-07-05 22:53:17
如果您不喜欢上面的方法,可以简单地使用第二种方法,但是不能并行地创建部分流,因为您必须按顺序读取lines。例如:
split(Paths.get("file"), 100).forEach(this::sendRequest);
void sendRequest(List<String> each) {
// then you must send the rest request in parallel here
}Stream<List<String>> split(Path path, int limit) throws IOException {
// skip the remaining lines if its size < limit
return split(Files.lines(path), limit, true);
}
<T> Stream<List<T>> split(Stream<T> source,
int limit, boolean skipRemainingElements) {
//variables just for printing purpose
Spliterator<T> it = source.spliterator();
long size = it.estimateSize();
int c = it.characteristics();// characteristics
return stream(new AbstractSpliterator<List<T>>(size, c) {
private int thresholds = skipRemainingElements ? limit : 1;
@Override
@SuppressWarnings("StatementWithEmptyBody")
public boolean tryAdvance(Consumer<? super List<T>> action) {
List<T> each = new ArrayList<>(limit);
while (each.size() < limit && it.tryAdvance(each::add)) ;
if (each.size() < thresholds) return false;
action.accept(each);
return true;
}
}, false).onClose(source::close);
}发布于 2017-07-05 20:58:55
您可以使用Stream#skip和Stream#limit拆分流,然后每100行并行发送rest请求。例如:
split(Paths.get("file"), 100).parallel().forEach(this::sendRequest);Stream<Stream<String>> split(Path path, int limit) throws IOException {
return LongStream.of(0, lines(path) / limit).parallel()
.map(it -> it * limit)
.mapToObj(offset -> {
try {
return Files.lines(path).skip(offset).limit(limit);
} catch (IOException e) {
throw new RejectedExecutionException(e);
}
});
}
long lines(Path path) throws IOException {
try (LineNumberReader in = open(path)) {
return in.getLineNumber();
}
}
LineNumberReader open(Path path) throws IOException {
return new LineNumberReader(newBufferedReader(path));
}
void sendRequest(Stream<String> each) {
try (BufferedWriter out = null) {// todo: create the output writer
each.forEach(line -> {
try {
out.write(line);
} catch (IOException e) {
// todo: handle error
}
});
} catch (IOException ex) {
//todo: handle error
}
}如果需要更高的性能,则必须在split & lines方法中实现自己的算法。Note LineNumberReader#getLineNumber是int而不是long。对于计算行,我认为有许多开源项目可以并行计算总行数。
和--如果您想获得最高的性能,这只是一个框架。首先,需要将行信息(例如:(totalLines和offset) )并行索引(如合并排序)到内存或磁盘(如果需要的话)。然后可以使用RandomeAccessFile快速跳转到offset。
索引行信息文件格式,如下所示
total_lines|ofsset1|offset2|...|offsetN备注:在行信息文件中没有分隔符|。每个值都必须使用DataOutputStream#writeLong编写为long,因为以这种格式写入Line,您可以按字节(例如:8*M; M=(1..N) )计算offsetN的位置,然后从offsetN读取8个字节即可获得offsetN。
实际上,当创建了大型文件(如果文件太大,需要拆分)时,应该创建索引文件,这样就可以节省进一步使用所需的时间。
https://stackoverflow.com/questions/44935165
复制相似问题