I'm trying to connect to an MQTT broker. I want to retry in case I fail to connect. I get a callback on success or failure of connection.
After reading multiple examples of retryWhen and handling asynchronous callbacks, I put together this code. It works fine if I succeed to connect. Also, it retries 3 times if I call e.onError(throwable) synchronously from the Flowable. But it crashes my android app if I call e.onError(throwable) from the callback's onFailure() method.
Here's the code:
RxJava chain
createConnectionFlowable(client, options)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(createRetryFunction())
.subscribe(createConsumer());
create a Flowable
private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
return Flowable.create(new FlowableOnSubscribe<String>() {
public void subscribe(final FlowableEmitter<String> e) throws Exception {
client.connect(options).setActionCallback(new IMqttActionListener() {
public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
});
}
}, BackpressureStrategy.BUFFER);
}
Create a retry function
private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
return new Function<Flowable<Throwable>, Publisher<?>>() {
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.zipWith(
Flowable.range(1, 3),
new BiFunction<Throwable, Integer, Integer>() {
public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
}
)
.flatMap(new Function<Integer, Publisher<?>>() {
public Publisher<?> apply(Integer integer) throws Exception {
return Flowable.timer(integer, TimeUnit.SECONDS);
}
});
}
};
}
The Consumer: do all the good stuff here
private Consumer<String> createConsumer() {
return new Consumer<String>() {
public void accept(String s) throws Exception {
Log.d(TAG, "accept: do important stuff here" + s);
}
};
}
Error logs
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.work.app, PID: 16769
Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms:
Questions
e.onError(throwable) synchronously from the Flowable.subscribe() method?References
subscribe using Consumer<String> you do not define an error handler for the stream. This means the error will get passed to the default error handler through RxJavaPlugins.getErrorHandler().handleError(...). On android this handler seems to cause a fatal error. To fix this use a Observer<String> instead of Consumer<String>client.connect(...) exhibit some form of weird behavior causing the problem. Since the log shows error - 1 sec wait - error, error I guess the callbacks remain active so the second failure gets sent to RxJava twice.waitForCompletion() method when you are talking about synchronous it would support my assumptions in 2. Since no callbacks are registered each throwable would only get reported once, fixing the behavior.I'm not sure why the emitter would remain functional after it terminates (onError/onComplete) but since the spec mandates that those methods are only called once it might be unspecified behavior causing this problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With