I'm having trouble finding an example of how to make a custom operator with RxJava 2. I've considered a few approaches:
Observable.create
, and then flatMap
ing on it from the source observable. I can get this working, but it doesn't quite feel right. I end up creating a static function which I provide the source Observable
, and then flatMap on the source. In the OnSubscribe, I then instantiate an object that I pass the emitter to, which handles and manages the Observable / Emitter (as it's not trivial, and I want everything as encapsulated as possible).ObservableOperator
and providing it to Observable.lift
. I can't find any examples of this for RxJava 2. I had to debug into my own example to make sure my understanding of upstream and downstream were correct. Because I can't find any examples or documentation on this for RxJava 2 I'm a little worried I might accidentally do something I'm not supposed to.Observable
type. This seems to be how the underlying operators work, many of which extend AbstractObservableWithUpstream
. There is a lot going on here though, and it seems easy to miss something or do something I shouldn't. I'm not sure if I'm supposed to take an approach like this or not. I stepped myself through the mental process, and it seems like it can get hairy pretty quickly.I'm going to proceed forward with option #2, but thought it worthwhile to ask what the supported method for doing this was in RxJava2 and also find out if there was any documentation or examples for this.
Writing operators is not recommended for beginners and many desired flow patterns can be achieved via existing operators.
Have you looked at RxJava's wiki about writing operators for 2.x? I suggest reading it from top to bottom.
create()
is possible but most people use it to emit the elements of a List
with a for-each loop, not recognizing that Flowable.fromIterable
does that.lift()
themselves. If you want to avoid some boilerplate with option 3. then you may try this route.AbstractObservableWithUpstream
is a small convenience and not necessary for external implementors.This may help you. I implement operator RxJava2 to handle APiError. I used lift operator.
See the example.
public final class ApiClient implements ApiClientInterface {
...
@NonNull
@Override
public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) {
return myApiService.activate(email, emailData)
.lift(getApiErrorTransformer())
.subscribeOn(Schedulers.io());
}
private <T>ApiErrorOperator<T> getApiErrorTransformer() {
return new ApiErrorOperator<>(gson, networkService);
}
}
And then you can find custom operator
public final class ApiErrorOperator<T> implements ObservableOperator<T, T> {
private static final String TAG = "ApiErrorOperator";
private final Gson gson;
private final NetworkService networkService;
public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) {
this.gson = gson;
this.networkService = networkService;
}
@Override
public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
return new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
}
@Override
public void onNext(T value) {
observer.onNext(value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError", e);
if (e instanceof HttpException) {
try {
HttpException error = (HttpException) e;
Response response = error.response();
String errorBody = response.errorBody().string();
ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class);
ApiException exception = new ApiException(errorResponse, response);
observer.onError(exception);
} catch (IOException exception) {
observer.onError(exception);
}
} else if (!networkService.isNetworkAvailable()) {
observer.onError(new NetworkException(ErrorResponse.builder()
.setErrorCode("")
.setDescription("No Network Connection Error")
.build()));
} else {
observer.onError(e);
}
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With