我正在使用sax解析超大的XML文件。我正在从我的XML文件创建一个readStream,并将其通过管道传输到sax,如下所示:
this.sourceStream = fs.createReadStream(file);
this.sourceStream
.pipe(this.saxStream);
我正在听一些这样的事件:
this.saxStream.on("error", (err) => {
logger.error(`Error during XML Parsing`, err);
});
this.saxStream.on("opentag", (node) => {
// doing some stuff
});
this.saxStream.on("text", (t) => {
// doing some stuff
});
this.saxStream.on("closetag", () => {
if( this.current_element.parent === null ) {
this.sourceStream.pause();
this.process_company_information(this.current_company, (err) => {
if( err ) {
logger.error("An error appeared while parsing company", err);
}
this.sourceStream.resume();
});
}
else {
this.current_element = this.current_element.parent;
}
});
this.saxStream.on("end", () => {
logger.info("Finished reading through stream");
});
在特定的结束标记进入sax流之后,流需要暂停,当前元素需要处理,然后流可以继续。正如您在我的代码中看到的那样,我尝试暂停sourceStream
,但是我发现如果readStream是通过管道传输的,那么暂停它将不起作用。
因此,我的一般问题是,如何让sax解析器暂停,直到处理完当前解析的元素?
我读过关于解开管道和暂停,然后再次管道和恢复,这真的是这样做的方式,它也是可靠的吗?
为了更好地说明,这里有一些日志:
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: Done with root tag, can continue stream
debug: Done with root tag, can continue stream
debug: Done with root tag, can continue stream
debug: Done with root tag, can continue stream
我真正想要的是这样的日志:
debug: New root tag found
debug: Done with root tag, can continue stream
debug: New root tag found
debug: Done with root tag, can continue stream
debug: New root tag found
debug: Done with root tag, can continue stream
debug: New root tag found
debug: Done with root tag, can continue stream
debug: New root tag found
在其当前状态下,sax比处理器快得多,因此不暂停流将导致内存问题。
发布于 2021-01-31 20:48:33
目前没有积极维护sax
(https://github.com/isaacs/sax-js/issues/238)。我建议您迁移到另一个解析器。例如,发送到saxes
https://github.com/lddubeau/saxes。
您可以使用带有Generator
和Iterable
(https://nodejs.org/api/stream.html#stream_consuming_readable_streams_with_async_iterators)的for-await-of
构造,而不是暂停/恢复流。
安装dep:yarn add emittery saxes
或npm install emittery saxes
然后做一些像这样的事情:
import {createReadStream} from 'fs';
import {SaxesParser, SaxesTagPlain} from 'saxes';
import Emittery from 'emittery';
export interface SaxesEvent {
type: 'opentag' | 'text' | 'closetag' | 'end';
tag?: SaxesTagPlain;
text?: string;
}
/**
* Generator method.
* Parses one chunk of the iterable input (Readable stream in the string data reading mode).
* @see https://nodejs.org/api/stream.html#stream_event_data
* @param iterable Iterable or Readable stream in the string data reading mode.
* @returns Array of SaxesParser events
* @throws Error if a SaxesParser error event was emitted.
*/
async function *parseChunk(iterable: Iterable<string> | Readable): AsyncGenerator<SaxesEvent[], void, undefined> {
const saxesParser = new SaxesParser<{}>();
let error;
saxesParser.on('error', _error => {
error = _error;
});
// As a performance optimization, we gather all events instead of passing
// them one by one, which would cause each event to go through the event queue
let events: SaxesEvent[] = [];
saxesParser.on('opentag', tag => {
events.push({
type: 'opentag',
tag
});
});
saxesParser.on('text', text => {
events.push({
type: 'text',
text
});
});
saxesParser.on('closetag', tag => {
events.push({
type: 'closetag',
tag
});
});
for await (const chunk of iterable) {
saxesParser.write(chunk as string);
if (error) {
throw error;
}
yield events;
events = [];
}
yield [{
type: 'end'
}];
}
const eventEmitter = new Emittery();
eventEmitter.on('text', async (text) => {
console.log('Start');
await new Promise<void>(async (resolve) => {
await new Promise<void>((resolve1) => {
console.log('First Level Promise End');
resolve1();
});
console.log('Second Level Promise End');
resolve();
});
});
const readable = createReadStream('./some-file.xml');
// Enable string reading mode
readable.setEncoding('utf8');
// Read stream chunks
for await (const saxesEvents of parseChunk(iterable) ?? []) {
// Process batch of events
for (const saxesEvent of saxesEvents ?? []) {
// Emit ordered events and process them in the event handlers strictly one-by-one
// See https://github.com/sindresorhus/emittery#emitserialeventname-data
await eventEmitter.emitSerial(event.type, event.tag || event.text);
}
}
另外,还可以查看此解决方案https://github.com/lddubeau/saxes/issues/32的主要讨论
发布于 2017-06-29 15:55:00
对于将来遇到类似问题的任何人,这里是我尝试过的其他方法,我最终是如何让它工作的,尽管这是一个有点变通的方法。
我试图在尝试pause-stream和pass-stream之间插入一个暂停流,因为它们应该在暂停时进行缓冲。由于某种原因,这又一次根本没有改变行为。
最后,我决定从根本上解决这个问题,而不是创建一个ReadingStream并将其输送到sax中,而是使用line-by-line从XML中读取行,并写入sax解析器。现在可以正确地暂停这个行读取过程,并最终帮助我实现了所需的行为
https://stackoverflow.com/questions/44783597
复制相似问题