首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将Stream<Stream<T>>转换为Stream<T>

将Stream<Stream<T>>转换为Stream<T>
EN

Stack Overflow用户
提问于 2019-06-26 10:17:05
回答 4查看 2.5K关注 0票数 5

我正在使用雷克斯达特包来处理dart中的流。我被困在处理一个特殊的问题上。

请看一下这个虚拟代码:

代码语言:javascript
运行
复制
final userId = BehaviorSubject<String>();

Stream<T> getStream(String uid) {
  // a sample code that returns a stream
  return BehaviorSubject<T>().stream;
}

final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));

现在,我希望将oops变量转换为只获得Observable<T>

我发现很难解释清楚。但让我试试。我有一个流A,我把A流的每个输出映射到另一个B流。现在我有了Stream<Stream<B>> --一种循环流。我只想听听这个模式产生的最新价值。我怎样才能做到这一点?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2019-06-26 10:51:23

有一个Stream<Stream<Something>>有点少见,所以它并不是有很多明确的支持。

原因之一是,有几种(至少两种)方法可以将一条物流组合成一条事物流。

  1. 您可以依次侦听每个流,在开始下一个流之前等待它完成,然后按顺序发出事件。
  2. 或者,当每个新流可用时,您就会监听它,然后尽快从任何流中发出事件。

前者易于用async/await编写。

代码语言:javascript
运行
复制
Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

后者更复杂,因为它需要一次侦听多个流,并组合它们的事件。(如果只有StreamController.addStream一次允许多个流,那么它就容易得多)。您可以将StreamGroup类从package:async中用于以下内容:

代码语言:javascript
运行
复制
import "package:async/async" show StreamGroup;

Stream<T> mergeStreams<T>(Stream<Stream<T>> source) {
  var sg = StreamGroup<T>();
  source.forEach(sg.add).whenComplete(sg.close);
  // This doesn't handle errors in [source].
  // Maybe insert 
  //   .catchError((e, s) {
  //      sg.add(Future<T>.error(e, s).asStream())
  // before `.whenComplete` if you worry about errors in [source].          
  return sg.stream;
}
票数 3
EN

Stack Overflow用户

发布于 2019-06-27 05:16:46

我将列出几种将Stream<Stream<T>>简化为单个Stream<T>的方法。

1.使用纯省道

正如@伊恩所回答的那样,这是一个纯粹的省道解决方案:

代码语言:javascript
运行
复制
Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Stream<int> s = flattenStreams(Stream.fromIterable(list).map(getStream));
    s.listen(print);
}

产出:1 2 3 4 1 2 3 4 1 2 3 4

2.使用Observable.flatMap

flatMap方法使输出流变得平坦,并将其附加到正在进行的流:

代码语言:javascript
运行
复制
import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.fromIterable(list).flatMap(getStream);
    s.listen(print);
}

产出:1 2 3 4 1 2 3 4 1 2 3 4

3.使用Observable.switchLatest

将发出流(也称为“高阶流”)的流转换为单个可观测的流,该流发射那些最近发出的流所发出的项。

这就是我一直在寻找的解决方案!我只需要内部流发出的最新输出。

代码语言:javascript
运行
复制
import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.switchLatest(
            Observable.fromIterable(list).map(getStream));
    s.listen(print);
}

产出:1 1 1 2 3 4

票数 5
EN

Stack Overflow用户

发布于 2019-06-26 10:37:18

如果您想要一个Stream>返回一个流,您基本上需要平缓这个流。平面映射函数是您在这里使用的

代码语言:javascript
运行
复制
public static void main(String args[]) {
    StreamTest st = new StreamTest<String>();
    List<String> l = Arrays.asList("a","b", "c");
    Stream s = l.stream().map(i -> st.getStream(i)).flatMap(i->i);
}

Stream<T> static getStream(T uid) {
    // a sample code that returns a stream
    return Stream.of(uid);
}

如果需要第一个对象,那么使用findFirst()方法

代码语言:javascript
运行
复制
public static void main(String args[]) {
    StreamTest st = new StreamTest<String>();
    List<String> l = Arrays.asList("a","b", "c");
    String str = l.stream().map(i -> st.getStream(i)).flatMap(i->i).findFirst().get();
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56770400

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档