Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating Custom Operators in RxJava2?

Tags:

rx-java2

I'm having trouble finding an example of how to make a custom operator with RxJava 2. I've considered a few approaches:

  1. Using Observable.create, and then flatMaping on it from the source observable. I can get this working, but it doesn't quite feel right. I end up creating a static function which I provide the source Observable, and then flatMap on the source. In the OnSubscribe, I then instantiate an object that I pass the emitter to, which handles and manages the Observable / Emitter (as it's not trivial, and I want everything as encapsulated as possible).
  2. Creating an ObservableOperator and providing it to Observable.lift. I can't find any examples of this for RxJava 2. I had to debug into my own example to make sure my understanding of upstream and downstream were correct. Because I can't find any examples or documentation on this for RxJava 2 I'm a little worried I might accidentally do something I'm not supposed to.
  3. Create my own Observable type. This seems to be how the underlying operators work, many of which extend AbstractObservableWithUpstream. There is a lot going on here though, and it seems easy to miss something or do something I shouldn't. I'm not sure if I'm supposed to take an approach like this or not. I stepped myself through the mental process, and it seems like it can get hairy pretty quickly.

I'm going to proceed forward with option #2, but thought it worthwhile to ask what the supported method for doing this was in RxJava2 and also find out if there was any documentation or examples for this.

like image 656
spierce7 Avatar asked Jan 24 '17 17:01

spierce7


2 Answers

Writing operators is not recommended for beginners and many desired flow patterns can be achieved via existing operators.

Have you looked at RxJava's wiki about writing operators for 2.x? I suggest reading it from top to bottom.

  1. using create() is possible but most people use it to emit the elements of a List with a for-each loop, not recognizing that Flowable.fromIterable does that.
  2. We kept this extension point although RxJava 2 operators don't use lift() themselves. If you want to avoid some boilerplate with option 3. then you may try this route.
  3. This is how RxJava 2 operators are implemented. AbstractObservableWithUpstream is a small convenience and not necessary for external implementors.
like image 167
akarnokd Avatar answered Sep 30 '22 16:09

akarnokd


This may help you. I implement operator RxJava2 to handle APiError. I used lift operator.

See the example.

  public final class ApiClient implements ApiClientInterface {
    ...
      @NonNull
      @Override
      public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) {
          return myApiService.activate(email, emailData)
                  .lift(getApiErrorTransformer())
                  .subscribeOn(Schedulers.io());
      }

      private <T>ApiErrorOperator<T> getApiErrorTransformer() {
          return new ApiErrorOperator<>(gson, networkService);
      }

  }

And then you can find custom operator

    public final class ApiErrorOperator<T> implements ObservableOperator<T, T> {
        private static final String TAG = "ApiErrorOperator";
        private final Gson gson;
        private final NetworkService networkService;

        public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) {
            this.gson = gson;
            this.networkService = networkService;
        }

        @Override
        public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
            return new Observer<T>() {
                @Override
                public void onSubscribe(Disposable d) {
                    observer.onSubscribe(d);
                }

                @Override
                public void onNext(T value) {
                    observer.onNext(value);
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError", e);

                if (e instanceof HttpException) {
                        try {
                            HttpException error = (HttpException) e;
                            Response response = error.response();
                            String errorBody = response.errorBody().string();

                            ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class);
                            ApiException exception = new ApiException(errorResponse, response);

                            observer.onError(exception);
                        } catch (IOException exception) {
                            observer.onError(exception);
                        }

                    } else if (!networkService.isNetworkAvailable()) {
                        observer.onError(new NetworkException(ErrorResponse.builder()
                                .setErrorCode("")
                                .setDescription("No Network Connection Error")
                                .build()));
                    } else {
                        observer.onError(e);
                    }
                }

                @Override
                public void onComplete() {
                    observer.onComplete();
                }
            };
        }
    }
like image 29
zkglr Avatar answered Sep 30 '22 18:09

zkglr