首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在Swift组合中不丢弃值和不平衡发布的优雅combineLatest方法

在Swift组合中不丢弃值和不平衡发布的优雅combineLatest方法
EN

Stack Overflow用户
提问于 2021-05-15 17:28:09
回答 4查看 2.7K关注 0票数 2

我有两个发布服务器A和B,它们是不平衡的,因为在A中,A将发出3个值,然后完成,B只发出1个值,然后完成(A实际上可以发出一个可变的数字,B将保持1,如果这有帮助的话):

代码语言:javascript
运行
复制
A => 1,   2,   3
B =>         X

B也异步运行,很可能只在A已经发出其第二个值之后才发出一个值(参见上面的图表)。(B也可能只发出任何时间,包括在A已经完成之后)。

我想公布A值的元组和B值的组合:

代码语言:javascript
运行
复制
(1, X) (2, X) (3, X)

combineLatest不适合这项工作,因为它将跳过A的第一个值,只发出(2, X)(3, X)。另一方面,zip不适用于我,因为B只发出一个值。

我正在寻找一种优雅的方式来实现这一点。谢谢!

编辑和接近解决方案

有点哲学,但我认为,如果您想要走zipcombineLatest路线,这是一个根本性的问题。您肯定需要某种类型的存储,以便更快的发布者缓冲事件,而等待较慢的服务器开始释放值。

一种解决方案可能是创建一个发布者,从A收集事件,直到B发出,然后发出所有收集的事件,并继续发出A提供的内容。这实际上是有可能的

代码语言:javascript
运行
复制
let bufferedSubject1 = Publishers.Concatenate(
   prefix: Publishers.PrefixUntilOutput(upstream: subject1, other: subject2).collect().flatMap(\.publisher),
   suffix: subject1)

PrefixUntilOutput将收集所有信息,直到B发出(subject2),然后切换到只定期传递输出。

但是如果你跑

代码语言:javascript
运行
复制
let cancel = bufferedSubject1.combineLatest(subject2)    
    .sink(receiveCompletion: { c in
        print(c)
    }, receiveValue: { v in
        print(v)
    })

您仍然忽略了(1,X)中的第一个值--这似乎有点像一个争用条件:bufferedSubject1是先发出所有值还是subject2首先为combineLatest提供一个值?

我认为有趣的是,如果没有任何异步调用,行为似乎是未定义的。如果您运行下面的示例,有时™️将得到发出的所有值。有时你会错过(1,X)。由于这里没有异步调用和dispatchQueue切换,我甚至认为这是一个错误。

您可以通过提供一个delay,甚至是bufferedSubject1combineLatest之间的一个receive(on: DispatchQueue.main)来“修正”竞争条件,这样在我们继续管道之前,我们可以将控制返回给DispatchQueue,让subject2发出到combineLatest

但是,我不认为优雅和仍然在寻找使用zip语义的解决方案,但不需要创建相同值的无限集合(按照我的看法,这不能很好地处理顺序处理和无限需求)。

示例:

代码语言:javascript
运行
复制
var subject1 = PassthroughSubject<Int, Never>()
var subject2 = PassthroughSubject<String, Never>()

let bufferedSubject1 = Publishers.Concatenate(prefix: Publishers.PrefixUntilOutput(upstream: subject1, other: subject2).collect().flatMap(\.publisher),
                                      suffix: subject1)

let bufferedSubject2 = Publishers.Concatenate(prefix: Publishers.PrefixUntilOutput(upstream: subject2, other: subject1).collect().flatMap(\.publisher),
                                      suffix: subject2)

let cancel = bufferedSubject1.combineLatest(subject2)
    .sink(receiveCompletion: { c in
        print(c)
    }, receiveValue: { v in
        print(v)
    })

subject1.send(1)
subject1.send(2)
subject2.send("X")
subject2.send(completion: .finished)
subject1.send(3)
subject1.send(completion: .finished)
EN

回答 4

Stack Overflow用户

发布于 2021-05-16 18:55:49

好吧,这是一个有趣的挑战,虽然看起来很简单,但我找不到一个简单优雅的方法。

这里有一种工作方法(虽然不太优雅),它似乎不受使用PrefixUntilOutput/Concatenate组合的竞争条件的影响。

这样做的目的是使用combineLatest,但是第一个发行者一发出就会发出,而另一个值是nil,这样我们就不会丢失初始值。这里有一个方便的操作符,我称之为combineLatestOptional

代码语言:javascript
运行
复制
extension Publisher {
    func combineLatestOptional<Other: Publisher>(_ other: Other) 
        -> AnyPublisher<(Output?, Other.Output?), Failure> 
        where Other.Failure == Failure {
        
        self.map { Optional.some($0) }.prepend(nil)
            .combineLatest(
                other.map { Optional.some($0) }.prepend(nil)
            )
            .dropFirst() // drop the first (nil, nil)
            .eraseToAnyPublisher()
    }
}

在上述情况下,管道中的第二步使用Scan将值收集到累加器中,直到其他发行者发出第一个值为止。我用State<L, R>类型表示这个状态的累加器有4种状态:

代码语言:javascript
运行
复制
fileprivate enum State<L, R> {
    case initial         // before any one publisher emitted
    case left([L])       // left emitted; right hasn't emitted
    case right([R])      // right emitted; left hasn't emitted
    case final([L], [R]) // final steady-state
}

最后一个操作符combineLatestLossless实现如下:

代码语言:javascript
运行
复制
extension Publisher {
   func combineLatestLossless<Other: Publisher>(_ other: Other) 
          -> AnyPublisher<(Output, Other.Output), Failure> 
          where Failure == Other.Failure {

      self.combineLatestOptional(other)
          .scan(State<Output, Other.Output>.initial, { state, tuple in
             switch (state, tuple.0, tuple.1) {
             case (.initial, let l?, nil):       // left emits first value
                return .left([l])                // -> collect left values

             case (.initial, nil, let r?):       // right emits first value
                return .right([r])               // -> collect right values

             case (.left(let ls), let l?, nil):  // left emits another
                return .left(ls + [l])           // -> append to left values

             case (.right(let rs), nil, let r?): // right emits another
                return .right(rs + [r])          // -> append to right values

             case (.left(let ls), _, let r?):    // right emits after left
                return .final(ls, [r])           // -> go to steady-state

             case (.right(let rs), let l?, _):   // left emits after right
                return .final([l], rs)           // -> go to steady-state

             case (.final, let l?, let r?):      // final steady-state
                return .final([l], [r])          // -> pass the values as-is
             default:
                fatalError("shouldn't happen")
             }
         })
         .flatMap { status -> AnyPublisher<(Output, Other.Output), Failure> in
            if case .final(let ls, let rs) = status {
               return ls.flatMap { l in rs.map { r in (l, r) }}
                   .publisher
                   .setFailureType(to: Failure.self)
                   .eraseToAnyPublisher()
            } else {
               return Empty().eraseToAnyPublisher()
            }
         }
         .eraseToAnyPublisher()
   }
}

最后一个flatMap根据所有累积值创建一个Publishers.Sequence发布服务器。在最后的稳态中,每个数组只有一个值.

用法很简单:

代码语言:javascript
运行
复制
let c = pub1.combineLatestLossless(pub2)
            .sink { print($0) }
票数 2
EN

Stack Overflow用户

发布于 2021-05-15 21:50:33

另一方面,zip不适用于我,因为B只发出一个值。

对,所以把它修好这样就不对了。在B处启动管道,使用平面映射将其信号转换为该信号序列的发布者,重复。用A.

示例:

代码语言:javascript
运行
复制
import UIKit
import Combine

func delay(_ delay:Double, closure:@escaping ()->()) {
    let when = DispatchTime.now() + delay
    DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}

class ViewController: UIViewController {
    var storage = Set<AnyCancellable>()
    let s1 = PassthroughSubject<Int,Never>()
    let s2 = PassthroughSubject<String,Never>()
    override func viewDidLoad() {
        super.viewDidLoad()
        
        let p1 = s1
        let p2 = s2.flatMap { (val:String) -> AnyPublisher<String,Never> in
            let seq = Array(repeating: val, count: 100)
            return seq.publisher.eraseToAnyPublisher()
        }
        p1.zip(p2)
            .sink{print($0)}
            .store(in: &storage)
        
        delay(1) {
            self.s1.send(1)
        }
        delay(2) {
            self.s1.send(2)
        }
        delay(3) {
            self.s1.send(3)
        }
        delay(2.5) {
            self.s2.send("X")
        }
    }
}

结果:

代码语言:javascript
运行
复制
(1, "X")
(2, "X")
(3, "X")
票数 1
EN

Stack Overflow用户

发布于 2021-05-15 18:00:47

编辑

在遇到这个帖子之后,我想知道您的示例中的问题是否与PassthroughSubject无关

如果下游没有对其提出任何要求,PassthroughSubject将降低它们的值。

事实上,使用:

代码语言:javascript
运行
复制
var subject1 = Timer.publish(every: 1, on: .main, in: .default, options: nil)
    .autoconnect()
    .measureInterval(using: RunLoop.main, options: nil)
    .scan(DateInterval()) { res, interval in
        .init(start: res.start, duration: res.duration + interval.magnitude)
    }
    .map(\.duration)
    .map { Int($0) }
    .eraseToAnyPublisher()
var subject2 = PassthroughSubject<String, Never>()

let bufferedSubject1 = Publishers.Concatenate(prefix: Publishers.PrefixUntilOutput(upstream: subject1, other: subject2).collect().flatMap(\.publisher),
                                              suffix: subject1)

let cancel = bufferedSubject1.combineLatest(subject2)
    .sink(receiveCompletion: { c in
        print(c)
    }, receiveValue: { v in
        print(v)
    })

subject2.send("X")

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    subject2.send("Y")
}

我得到了这个输出:

代码语言:javascript
运行
复制
(1, "X")
(2, "X")
(3, "X")
(3, "Y")
(4, "Y")
(5, "Y")
(6, "Y")

这似乎是我们想要的行为。

我不知道这是否是一个优雅的解决方案,但您可以尝试使用Publishers.CollectByTime

代码语言:javascript
运行
复制
import PlaygroundSupport
import Combine

PlaygroundPage.current.needsIndefiniteExecution = true

let queue = DispatchQueue(label: "com.foo.bar")
let cancellable = letters
    .combineLatest(indices
                    .collect(.byTimeOrCount(queue, .seconds(1), .max))
                    .flatMap { indices in indices.publisher })
    .sink { letter, index in print("(\(index), \(letter))") }

indices.send(1)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    indices.send(2)
    indices.send(3)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    letters.send("X")
}

DispatchQueue.main.asyncAfter(deadline: .now() + 3.3) {
    indices.send(4)
}

DispatchQueue.main.asyncAfter(deadline: .now() + 3.5) {
    letters.send("Y")
}

DispatchQueue.main.asyncAfter(deadline: .now() + 3.7) {
    indices.send(5)
    indices.send(6)
}

产出:

代码语言:javascript
运行
复制
(X, 1)
(X, 2)
(X, 3)
(Y, 3)
(Y, 4)
(Y, 5)
(Y, 6)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67549504

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档