首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >nodejs暂停sax流

nodejs暂停sax流
EN

Stack Overflow用户
提问于 2017-06-27 22:57:51
回答 2查看 982关注 0票数 2

我正在使用sax解析超大的XML文件。我正在从我的XML文件创建一个readStream,并将其通过管道传输到sax,如下所示:

this.sourceStream = fs.createReadStream(file);
this.sourceStream
    .pipe(this.saxStream);

我正在听一些这样的事件:

this.saxStream.on("error", (err) => {
    logger.error(`Error during XML Parsing`, err);
});
this.saxStream.on("opentag", (node) => {
    // doing some stuff
});
this.saxStream.on("text", (t) => {
    // doing some stuff
});
this.saxStream.on("closetag", () => {
    if( this.current_element.parent === null ) {
        this.sourceStream.pause();
        this.process_company_information(this.current_company, (err) => {
            if( err ) {
                logger.error("An error appeared while parsing company", err);
            }
            this.sourceStream.resume();
        });
    }
    else {
        this.current_element = this.current_element.parent;
    }
});
this.saxStream.on("end", () => {
    logger.info("Finished reading through stream");
});

在特定的结束标记进入sax流之后,流需要暂停,当前元素需要处理,然后流可以继续。正如您在我的代码中看到的那样,我尝试暂停sourceStream,但是我发现如果readStream是通过管道传输的,那么暂停它将不起作用。

因此,我的一般问题是,如何让sax解析器暂停,直到处理完当前解析的元素?

我读过关于解开管道和暂停,然后再次管道和恢复,这真的是这样做的方式,它也是可靠的吗?

为了更好地说明,这里有一些日志:

debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: New root tag found
debug: Done with root tag, can continue stream
debug: Done with root tag, can continue stream
debug: Done with root tag, can continue stream
debug: Done with root tag, can continue stream

我真正想要的是这样的日志:

debug: New root tag found
debug: Done with root tag, can continue stream
debug: New root tag found
debug: Done with root tag, can continue stream
debug: New root tag found
debug: Done with root tag, can continue stream
debug: New root tag found
debug: Done with root tag, can continue stream
debug: New root tag found

在其当前状态下,sax比处理器快得多,因此不暂停流将导致内存问题。

EN

回答 2

Stack Overflow用户

发布于 2021-01-31 20:48:33

目前没有积极维护sax (https://github.com/isaacs/sax-js/issues/238)。我建议您迁移到另一个解析器。例如,发送到saxes https://github.com/lddubeau/saxes

您可以使用带有GeneratorIterable (https://nodejs.org/api/stream.html#stream_consuming_readable_streams_with_async_iterators)的for-await-of构造,而不是暂停/恢复流。

安装dep:yarn add emittery saxesnpm install emittery saxes

然后做一些像这样的事情:

import {createReadStream} from 'fs';
import {SaxesParser, SaxesTagPlain} from 'saxes';
import Emittery from 'emittery';

export interface SaxesEvent {
  type: 'opentag' | 'text' | 'closetag' | 'end';
  tag?: SaxesTagPlain;
  text?: string;
}

/**
  * Generator method.
  * Parses one chunk of the iterable input (Readable stream in the string data reading mode).
  * @see https://nodejs.org/api/stream.html#stream_event_data
  * @param iterable Iterable or Readable stream in the string data reading mode.
  * @returns Array of SaxesParser events
  * @throws Error if a SaxesParser error event was emitted.
  */
async function *parseChunk(iterable: Iterable<string> | Readable): AsyncGenerator<SaxesEvent[], void, undefined> {
  const saxesParser = new SaxesParser<{}>();
  let error;
  saxesParser.on('error', _error => {
    error = _error;
  });

  // As a performance optimization, we gather all events instead of passing
  // them one by one, which would cause each event to go through the event queue
  let events: SaxesEvent[] = [];
  saxesParser.on('opentag', tag => {
    events.push({
      type: 'opentag',
      tag
    });
  });

  saxesParser.on('text', text => {
    events.push({
      type: 'text',
      text
    });
  });

  saxesParser.on('closetag', tag => {
    events.push({
      type: 'closetag',
      tag
    });
  });

  for await (const chunk of iterable) {
    saxesParser.write(chunk as string);
    if (error) {
      throw error;
    }

    yield events;
    events = [];
  }

  yield [{
    type: 'end'
  }];
}

const eventEmitter = new Emittery();
eventEmitter.on('text', async (text) => {
  console.log('Start');
  await new Promise<void>(async (resolve) => {
    await new Promise<void>((resolve1) => {
      console.log('First Level Promise End');
      resolve1();
    });
    console.log('Second Level Promise End');
    resolve();
  });
});

const readable = createReadStream('./some-file.xml');
// Enable string reading mode
readable.setEncoding('utf8');
// Read stream chunks
for await (const saxesEvents of parseChunk(iterable) ?? []) {
  // Process batch of events
  for (const saxesEvent of saxesEvents ?? []) {
    // Emit ordered events and process them in the event handlers strictly one-by-one
    // See https://github.com/sindresorhus/emittery#emitserialeventname-data
    await eventEmitter.emitSerial(event.type, event.tag || event.text);
  }
}

另外,还可以查看此解决方案https://github.com/lddubeau/saxes/issues/32的主要讨论

票数 1
EN

Stack Overflow用户

发布于 2017-06-29 15:55:00

对于将来遇到类似问题的任何人,这里是我尝试过的其他方法,我最终是如何让它工作的,尽管这是一个有点变通的方法。

我试图在尝试pause-streampass-stream之间插入一个暂停流,因为它们应该在暂停时进行缓冲。由于某种原因,这又一次根本没有改变行为。

最后,我决定从根本上解决这个问题,而不是创建一个ReadingStream并将其输送到sax中,而是使用line-by-line从XML中读取行,并写入sax解析器。现在可以正确地暂停这个行读取过程,并最终帮助我实现了所需的行为

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44783597

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档