我的事件发送类有代码:
private val socketListeners: ArrayList<SocketContentListener> = ArrayList()
//add listener here
override fun subscribe(socketListener: SocketContentListener) {
socketListeners.add(socketListener)
}
private fun getSocketConnectListener()
: SocketContentListener {
/**
* Post received messages to listeners via Handler
* because handler helps to set all messages in order on main thread.
*/
return object : SocketContentListener {
override fun onUdpServerListenerCreated(inetAddress: InetAddress?, port: Int) {
val subscribers = ArrayList<SocketContentListener>(socketListeners)
for (listener in subscribers) {
Handler(Looper.getMainLooper()).post({ listener.onUdpServerListenerCreated(inetAddress, port) })
}
}
}
我试着观察到:
val udpObservable = Observable.create<Int> { emitter ->
val listener = object : SocketListener() {
override fun onUdpServerListenerCreated(inetAddress: InetAddress, port: Int) {
emitter.onNext(port)
emitter.onComplete()
}
}
//add listener here
socketSource.subscribe(listener)
emitter.setCancellable { socketSource.unSubscribe(listener) }
}.subscribeOn(Schedulers.io())
.doOnNext { Log.d("123-thread", "current is: " + Thread.currentThread().name) }
.onErrorReturn { throw ConnectionException(it) }
.subscribe()
但是在测试期间,我看到的不是预期的RxCachedThreadScheduler-1 thread
工作
D/123-thread: current is:-> main
你能帮我吗?请。我的错误在哪里?如何为rx链实现所需的RxCachedThreadScheduler线程?
发布于 2018-02-25 07:51:04
用于创建可观察性的代码将在调度程序上执行,任何地方都不存在隐式上下文更改。
使您的事件从主线程上的侦听器到达。然后将它们发送到同一个线程上的发射器。
因此,订阅使用调度器io Thread,而发送程序则使用Main Thread
因此,解决方案是在可观察的创建之后添加observerOn(Schedulers.newThread())
。Like this
val udpObservable = Observable.create<Int> { emitter ->
val listener = object : SocketListener() {
override fun onUdpServerListenerCreated(inetAddress: InetAddress, port: Int) {
emitter.onNext(port)
emitter.onComplete()
}
}
//add listener here
socketSource.subscribe(listener)
emitter.setCancellable { socketSource.unSubscribe(listener) }
}.subscribeOn(Schedulers.io())
//need add this for work with emmit data on background
.observerOn(Schedulers.newThread())
.doOnNext { Log.d("123-thread", "current is: " + Thread.currentThread().name) }
.onErrorReturn { throw ConnectionException(it) }
.subscribe()
https://stackoverflow.com/questions/48972417
复制