rxbus连续发送消息报错 财富值12

2016-11-01 11:10发布

我用rxbus连续发送大量消息报错

08-26 09:37:13.458 10637-10637/com.dituwuyou E/AndroidRuntime: FATAL EXCEPTION: main

                                                           Process: com.dituwuyou, PID: 10637                                                            java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.                                                                at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)                                                                at android.os.Handler.handleCallback(Handler.java:739)                                                                at android.os.Handler.dispatchMessage(Handler.java:95)                                                                at android.os.Looper.loop(Looper.java:148)                                                                at android.app.ActivityThread.main(ActivityThread.java:5417)                                                                at java.lang.reflect.Method.invoke(Native Method)                                                                at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)                                                                at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)                                                             Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests                                                                at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)                                                                at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)                                                                at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)                                                                at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)                                                                at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)                                                                at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)                                                                at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)                                                                at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)                                                                at android.os.Handler.handleCallback(Handler.java:739)                                                                 at android.os.Handler.dispatchMessage(Handler.java:95)                                                                 at android.os.Looper.loop(Looper.java:148)                                                                 at android.app.ActivityThread.main(ActivityThread.java:5417)                                                                 at java.lang.reflect.Method.invoke(Native Method)                                                                 at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)                                                                 at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)                                                              Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests                                                                at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308)                                                                at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)                                                                at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)                                                                at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)                                                                at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)                                                                at com.dituwuyou.widget.rxjava.RxBus.send(RxBus.java:45)                                                                at com.dituwuyou.joint.CoorSocketService.messageReceived(CoorSocketService.java:51)                                                                at com.dituwuyou.fayeclient.FayeClient.parseFayeMessage(FayeClient.java:535)                                                                at com.dituwuyou.fayeclient.FayeClient.onMessage(FayeClient.java:390)                                                                at com.dituwuyou.fayeclient.HybiParser.emitFrame(HybiParser.java:304)                                                                at com.dituwuyou.fayeclient.HybiParser.start(HybiParser.java:130)                                                                at com.dituwuyou.fayeclient.WebSocketClient$1.run(WebSocketClient.java:119)                                                                at java.lang.Thread.run(Thread.java:818) 

我的rxbus是这样定义的

import rx.Observable; import rx.subjects.PublishSubject; import rx.subjects.SerializedSubject; import rx.subjects.Subject;  /**  * Created by xg on 2016/6/24.  * 消息传递(替换handler,eventbus)  */ public class RxBus {     private static volatile RxBus mInstance;     private final Subject bus;      public RxBus() {         bus = new SerializedSubject<>(PublishSubject.create());     }      /**      * 单例模式RxBus      *      * @return      */     public static RxBus getRxBusSingleton() {         RxBus rxBus2 = mInstance;         if (mInstance == null) {             synchronized (RxBus.class) {                 rxBus2 = mInstance;                 if (mInstance == null) {                     rxBus2 = new RxBus();                     mInstance = rxBus2;                 }             }         }         return rxBus2;     }      /**      * 发送消息      *      * @param object      */     public void send(Object object) {         bus.onNext(object);     }      /**      * 接收消息      *      * @return      */     public Observable toObserverable() {         return bus;     } } 

这是第45行
bus.onNext(object);
应该是出现背压了

友情提示: 此问题已得到解决,问题已经关闭,关闭后问题禁止继续编辑,回答。
该问题目前已经被作者或者管理员关闭, 无法添加新回复
2条回答

用的什么版本的RxJava. 在1.1.6版本中已经没有rx.subjects.PublishSubject$PublishSubjectProducer这个类了

1.看不懂你为什么要加个rxBus2
2.要处理onError的情况
3.先注册 后send

一周热门 更多>