我有一个liveData设置,它发射一些整数。但我想控制排放,这样每5秒就会发生一次。但是在前5秒过去之后,节流器就可以被移除了。
在这里,我观察到这样一位伯爵:
count1.observe(this, new Observer() {
@Override public void onChanged(@Nullable Integer i) {
//Do something with "integer"
}
});
但我希望对onChanged()回调进行节流,使其在头5秒内最多调用2次。在前5秒过去之后,我根本不需要节流器了。我不知道如何做到这一点,因为我没有看到任何选择的livedata来处理流。我尝试过转换,但它只有map和switchMap。如何才能做到这一点?enter code here
发布于 2020-06-19 09:43:22
下面是MediatorLiveData
实现,它使用Handler
(在Kotlin中,因为Java有很多样板)。
如果源实时数据更新太频繁,则结果发布将被延迟:
import android.os.Handler
import android.os.Looper
import androidx.lifecycle.LiveData
import androidx.lifecycle.MediatorLiveData
/**
* LiveData throttling value emissions so they don't happen more often than [delayMs].
*/
class ThrottledLiveData<T>(source: LiveData<T>, delayMs: Long) : MediatorLiveData<T>() {
val handler = Handler(Looper.getMainLooper())
var delayMs = delayMs
private set
private var isValueDelayed = false
private var delayedValue: T? = null
private var delayRunnable: Runnable? = null
set(value) {
field?.let { handler.removeCallbacks(it) }
value?.let { handler.postDelayed(it, delayMs) }
field = value
}
private val objDelayRunnable = Runnable { if (consumeDelayedValue()) startDelay() }
init {
addSource(source) { newValue ->
if (delayRunnable == null) {
value = newValue
startDelay()
} else {
isValueDelayed = true
delayedValue = newValue
}
}
}
/** Start throttling or modify the delay. If [newDelay] is `0` (default) reuse previous delay value. */
fun startThrottling(newDelay: Long = 0L) {
require(newDelay >= 0L)
when {
newDelay > 0 -> delayMs = newDelay
delayMs < 0 -> delayMs *= -1
delayMs > 0 -> return
else -> throw IllegalArgumentException("newDelay cannot be zero if old delayMs is zero")
}
}
/** Stop throttling, if [immediate] emit any pending value now. */
fun stopThrottling(immediate: Boolean = false) {
if (delayMs <= 0) return
delayMs *= -1
if (immediate) consumeDelayedValue()
}
override fun onInactive() {
super.onInactive()
consumeDelayedValue()
}
// start counting the delay or clear it if conditions are not met
private fun startDelay() {
delayRunnable = if (delayMs > 0 && hasActiveObservers()) objDelayRunnable else null
}
private fun consumeDelayedValue(): Boolean {
delayRunnable = null
return if (isValueDelayed) {
value = delayedValue
delayedValue = null
isValueDelayed = false
true
} else false
}
}
使用它时,将源实时数据作为第一个参数传递并进行观察:
val throttledCount = ThrottledLiveData(count1, 2500L) // maximum of one update per 2.5 sec
throttledCount.observe(this, Observer { i: Int ->
//Do something with "integer"
})
如果您想在5秒后停止节流,只需发布一个延迟的可运行程序,它将禁用它:
val disableThrottle = Runnable { throttledCount.stopThrottling() }
throttledCount.handler.postDelayed(disableThrottle, 5000L)
发布于 2020-06-18 19:28:37
在您的情况下,您应该使用rxjava的Interval操作符。
区间算子创建了一个可观测的序列,它发射一个由给定时间间隔间隔的整数序列。当我们想要一次又一次地做一次又一次的任务时,就会使用它。
例子:
val disposable =
Observable.interval(0, 2, TimeUnit.SECONDS)
.flatMap {
return@flatMap Observable.create<String> { emitter ->
Log.d("IntervalExample", "Create")
emitter.onNext("MindOrks")
emitter.onComplete()
}
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.d("IntervalExample", it)
}
compositeDisposable.add(disposable)
在这里,任务将在间隔2秒后一次又一次地完成。
有一件事需要注意:它将永远持续下去。
怎样才能阻止这一切?
有两种方法可以停止。以下是阻止它的两种方法。
使用compositeDisposable.dispose()的
take(n)
运算符,如下所示Observable.interval(0,2,TimeUnit.SECONDS).take(5).flatMap { return@flatMap Observable.create { emitter -> Log.d("IntervalExample","Create") emitter.onNext("MindOrks") emitter.onComplete() }} .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.d("IntervalExample",it) }
由于我们在take(5)
中将5作为参数传递,任务将在间隔2秒内执行5次。
https://stackoverflow.com/questions/62457503
复制相似问题