我有两个发布服务器A和B,它们是不平衡的,因为在A中,A将发出3个值,然后完成,B只发出1个值,然后完成(A实际上可以发出一个可变的数字,B将保持1,如果这有帮助的话):
A => 1, 2, 3
B => X
B也异步运行,很可能只在A已经发出其第二个值之后才发出一个值(参见上面的图表)。(B也可能只发出任何时间,包括在A已经完成之后)。
我想公布A值的元组和B值的组合:
(1, X) (2, X) (3, X)
combineLatest
不适合这项工作,因为它将跳过A的第一个值,只发出(2, X)
和(3, X)
。另一方面,zip
不适用于我,因为B只发出一个值。
我正在寻找一种优雅的方式来实现这一点。谢谢!
编辑和接近解决方案
有点哲学,但我认为,如果您想要走zip
或combineLatest
路线,这是一个根本性的问题。您肯定需要某种类型的存储,以便更快的发布者缓冲事件,而等待较慢的服务器开始释放值。
一种解决方案可能是创建一个发布者,从A收集事件,直到B发出,然后发出所有收集的事件,并继续发出A提供的内容。这实际上是有可能的
let bufferedSubject1 = Publishers.Concatenate(
prefix: Publishers.PrefixUntilOutput(upstream: subject1, other: subject2).collect().flatMap(\.publisher),
suffix: subject1)
PrefixUntilOutput
将收集所有信息,直到B发出(subject2
),然后切换到只定期传递输出。
但是如果你跑
let cancel = bufferedSubject1.combineLatest(subject2)
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { v in
print(v)
})
您仍然忽略了(1,X)
中的第一个值--这似乎有点像一个争用条件:bufferedSubject1
是先发出所有值还是subject2首先为combineLatest
提供一个值?
我认为有趣的是,如果没有任何异步调用,行为似乎是未定义的。如果您运行下面的示例,有时™️将得到发出的所有值。有时你会错过(1,X)
。由于这里没有异步调用和dispatchQueue切换,我甚至认为这是一个错误。
您可以通过提供一个delay
,甚至是bufferedSubject1
和combineLatest
之间的一个receive(on: DispatchQueue.main)
来“修正”竞争条件,这样在我们继续管道之前,我们可以将控制返回给DispatchQueue,让subject2发出到combineLatest
。
但是,我不认为优雅和仍然在寻找使用zip
语义的解决方案,但不需要创建相同值的无限集合(按照我的看法,这不能很好地处理顺序处理和无限需求)。
示例:
var subject1 = PassthroughSubject<Int, Never>()
var subject2 = PassthroughSubject<String, Never>()
let bufferedSubject1 = Publishers.Concatenate(prefix: Publishers.PrefixUntilOutput(upstream: subject1, other: subject2).collect().flatMap(\.publisher),
suffix: subject1)
let bufferedSubject2 = Publishers.Concatenate(prefix: Publishers.PrefixUntilOutput(upstream: subject2, other: subject1).collect().flatMap(\.publisher),
suffix: subject2)
let cancel = bufferedSubject1.combineLatest(subject2)
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { v in
print(v)
})
subject1.send(1)
subject1.send(2)
subject2.send("X")
subject2.send(completion: .finished)
subject1.send(3)
subject1.send(completion: .finished)
发布于 2021-05-16 18:55:49
好吧,这是一个有趣的挑战,虽然看起来很简单,但我找不到一个简单优雅的方法。
这里有一种工作方法(虽然不太优雅),它似乎不受使用PrefixUntilOutput
/Concatenate
组合的竞争条件的影响。
这样做的目的是使用combineLatest
,但是第一个发行者一发出就会发出,而另一个值是nil
,这样我们就不会丢失初始值。这里有一个方便的操作符,我称之为combineLatestOptional
extension Publisher {
func combineLatestOptional<Other: Publisher>(_ other: Other)
-> AnyPublisher<(Output?, Other.Output?), Failure>
where Other.Failure == Failure {
self.map { Optional.some($0) }.prepend(nil)
.combineLatest(
other.map { Optional.some($0) }.prepend(nil)
)
.dropFirst() // drop the first (nil, nil)
.eraseToAnyPublisher()
}
}
在上述情况下,管道中的第二步使用Scan
将值收集到累加器中,直到其他发行者发出第一个值为止。我用State<L, R>
类型表示这个状态的累加器有4种状态:
fileprivate enum State<L, R> {
case initial // before any one publisher emitted
case left([L]) // left emitted; right hasn't emitted
case right([R]) // right emitted; left hasn't emitted
case final([L], [R]) // final steady-state
}
最后一个操作符combineLatestLossless
实现如下:
extension Publisher {
func combineLatestLossless<Other: Publisher>(_ other: Other)
-> AnyPublisher<(Output, Other.Output), Failure>
where Failure == Other.Failure {
self.combineLatestOptional(other)
.scan(State<Output, Other.Output>.initial, { state, tuple in
switch (state, tuple.0, tuple.1) {
case (.initial, let l?, nil): // left emits first value
return .left([l]) // -> collect left values
case (.initial, nil, let r?): // right emits first value
return .right([r]) // -> collect right values
case (.left(let ls), let l?, nil): // left emits another
return .left(ls + [l]) // -> append to left values
case (.right(let rs), nil, let r?): // right emits another
return .right(rs + [r]) // -> append to right values
case (.left(let ls), _, let r?): // right emits after left
return .final(ls, [r]) // -> go to steady-state
case (.right(let rs), let l?, _): // left emits after right
return .final([l], rs) // -> go to steady-state
case (.final, let l?, let r?): // final steady-state
return .final([l], [r]) // -> pass the values as-is
default:
fatalError("shouldn't happen")
}
})
.flatMap { status -> AnyPublisher<(Output, Other.Output), Failure> in
if case .final(let ls, let rs) = status {
return ls.flatMap { l in rs.map { r in (l, r) }}
.publisher
.setFailureType(to: Failure.self)
.eraseToAnyPublisher()
} else {
return Empty().eraseToAnyPublisher()
}
}
.eraseToAnyPublisher()
}
}
最后一个flatMap
根据所有累积值创建一个Publishers.Sequence
发布服务器。在最后的稳态中,每个数组只有一个值.
用法很简单:
let c = pub1.combineLatestLossless(pub2)
.sink { print($0) }
发布于 2021-05-15 21:50:33
另一方面,zip不适用于我,因为B只发出一个值。
对,所以把它修好这样就不对了。在B处启动管道,使用平面映射将其信号转换为该信号序列的发布者,重复。用A.
示例:
import UIKit
import Combine
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController: UIViewController {
var storage = Set<AnyCancellable>()
let s1 = PassthroughSubject<Int,Never>()
let s2 = PassthroughSubject<String,Never>()
override func viewDidLoad() {
super.viewDidLoad()
let p1 = s1
let p2 = s2.flatMap { (val:String) -> AnyPublisher<String,Never> in
let seq = Array(repeating: val, count: 100)
return seq.publisher.eraseToAnyPublisher()
}
p1.zip(p2)
.sink{print($0)}
.store(in: &storage)
delay(1) {
self.s1.send(1)
}
delay(2) {
self.s1.send(2)
}
delay(3) {
self.s1.send(3)
}
delay(2.5) {
self.s2.send("X")
}
}
}
结果:
(1, "X")
(2, "X")
(3, "X")
发布于 2021-05-15 18:00:47
编辑
在遇到这个帖子之后,我想知道您的示例中的问题是否与PassthroughSubject
无关
如果下游没有对其提出任何要求,PassthroughSubject将降低它们的值。
事实上,使用:
var subject1 = Timer.publish(every: 1, on: .main, in: .default, options: nil)
.autoconnect()
.measureInterval(using: RunLoop.main, options: nil)
.scan(DateInterval()) { res, interval in
.init(start: res.start, duration: res.duration + interval.magnitude)
}
.map(\.duration)
.map { Int($0) }
.eraseToAnyPublisher()
var subject2 = PassthroughSubject<String, Never>()
let bufferedSubject1 = Publishers.Concatenate(prefix: Publishers.PrefixUntilOutput(upstream: subject1, other: subject2).collect().flatMap(\.publisher),
suffix: subject1)
let cancel = bufferedSubject1.combineLatest(subject2)
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { v in
print(v)
})
subject2.send("X")
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
subject2.send("Y")
}
我得到了这个输出:
(1, "X")
(2, "X")
(3, "X")
(3, "Y")
(4, "Y")
(5, "Y")
(6, "Y")
这似乎是我们想要的行为。
我不知道这是否是一个优雅的解决方案,但您可以尝试使用Publishers.CollectByTime
:
import PlaygroundSupport
import Combine
PlaygroundPage.current.needsIndefiniteExecution = true
let queue = DispatchQueue(label: "com.foo.bar")
let cancellable = letters
.combineLatest(indices
.collect(.byTimeOrCount(queue, .seconds(1), .max))
.flatMap { indices in indices.publisher })
.sink { letter, index in print("(\(index), \(letter))") }
indices.send(1)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
indices.send(2)
indices.send(3)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
letters.send("X")
}
DispatchQueue.main.asyncAfter(deadline: .now() + 3.3) {
indices.send(4)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 3.5) {
letters.send("Y")
}
DispatchQueue.main.asyncAfter(deadline: .now() + 3.7) {
indices.send(5)
indices.send(6)
}
产出:
(X, 1)
(X, 2)
(X, 3)
(Y, 3)
(Y, 4)
(Y, 5)
(Y, 6)
https://stackoverflow.com/questions/67549504
复制相似问题