我用rxbus连续发送大量消息报错
08-26 09:37:13.458 10637-10637/com.dituwuyou E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.dituwuyou, PID: 10637 java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling. at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112) at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:148) at android.app.ActivityThread.main(ActivityThread.java:5417) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386) at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383) at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44) at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152) at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219) at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107) at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:148) at android.app.ActivityThread.main(ActivityThread.java:5417) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308) at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220) at rx.subjects.PublishSubject.onNext(PublishSubject.java:73) at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67) at com.dituwuyou.widget.rxjava.RxBus.send(RxBus.java:45) at com.dituwuyou.joint.CoorSocketService.messageReceived(CoorSocketService.java:51) at com.dituwuyou.fayeclient.FayeClient.parseFayeMessage(FayeClient.java:535) at com.dituwuyou.fayeclient.FayeClient.onMessage(FayeClient.java:390) at com.dituwuyou.fayeclient.HybiParser.emitFrame(HybiParser.java:304) at com.dituwuyou.fayeclient.HybiParser.start(HybiParser.java:130) at com.dituwuyou.fayeclient.WebSocketClient$1.run(WebSocketClient.java:119) at java.lang.Thread.run(Thread.java:818)
我的rxbus是这样定义的
import rx.Observable; import rx.subjects.PublishSubject; import rx.subjects.SerializedSubject; import rx.subjects.Subject; /** * Created by xg on 2016/6/24. * 消息传递(替换handler,eventbus) */ public class RxBus { private static volatile RxBus mInstance; private final Subject bus; public RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); } /** * 单例模式RxBus * * @return */ public static RxBus getRxBusSingleton() { RxBus rxBus2 = mInstance; if (mInstance == null) { synchronized (RxBus.class) { rxBus2 = mInstance; if (mInstance == null) { rxBus2 = new RxBus(); mInstance = rxBus2; } } } return rxBus2; } /** * 发送消息 * * @param object */ public void send(Object object) { bus.onNext(object); } /** * 接收消息 * * @return */ public Observable toObserverable() { return bus; } }
这是第45行
bus.onNext(object);
应该是出现背压了
用的什么版本的RxJava. 在1.1.6版本中已经没有rx.subjects.PublishSubject$PublishSubjectProducer这个类了
一周热门 更多>