Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Android rxjava2 Flowable with compositedisposable

I have a problem with Flowables and adding them to the compositeDisposables. I want to switch from an Observable to a Flowable as the operation might emit 1000 or more values. Im somewhat unexperienced with rxjava2 so please forgive me if that question is stupid :)

So far I used the observable like this:

 public Observable<String> uploadPictureRx(String path)
    {
        return Observable.create(new ObservableOnSubscribe<String>()
        {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception
            {
                Uri file = Uri.fromFile(new File(path));
                String segment = file.getLastPathSegment();
                UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file);


                uploadTask.addOnFailureListener(new OnFailureListener()
                {
                    @Override
                    public void onFailure(@NonNull Exception exception)
                    {
                        e.onError(exception);
                    }
                }).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
                    {                           
                        //noinspection VisibleForTests
                        downloadUrl = taskSnapshot.getDownloadUrl();
                        String url = downloadUrl.getPath();
                        e.onNext(url);
                        e.onComplete();
                    }
                }).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
                    {
                        //noinspection VisibleForTests
                        long bytes = taskSnapshot.getBytesTransferred();
                        String bytesS = String.valueOf(bytes);
                        e.onNext(bytesS);
                    }
                });
            }
        });
    }

and called the method like this:

private void uploadPicToFireBaseStorage(String path)
    {
        compositeDisposable.add(storageService.uploadPictureRx(path)
                .subscribeOn(Schedulers.io())
                .observeOn(mainScheduler)
                .subscribeWith(new DisposableObserver<String>()
                {

                    @Override
                    public void onNext(String s)
                    {
                        String ss = s;
                        System.out.println(ss);
                    }

                    @Override
                    public void onError(Throwable e)
                    {
                        e.printStackTrace();
                    }

                    @Override
                    public void onComplete()
                    {
                        view.displayToast("Picture Upload completed");
                    }
                })
        );
    }

This works fine! However when I try to do the same with a Flowable instead of observable it wont compile:

public Flowable<String> uploadPictureRx(String path)
    {
        return Flowable.create(new FlowableOnSubscribe<String>()
        {

            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception
            {
                Uri file = Uri.fromFile(new File(path));
                String segment = file.getLastPathSegment();
                UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);    

                uploadTask.addOnFailureListener(new OnFailureListener()
                {
                    @Override
                    public void onFailure(@NonNull Exception exception)
                    {
                        e.onError(exception);
                    }
                }).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
                    {                           
                        //noinspection VisibleForTests
                        downloadUrl = taskSnapshot.getDownloadUrl();
                        String url = downloadUrl.getPath();
                        e.onNext(url);
                        e.onComplete();
                    }
                }).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
                {
                    @Override
                    public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
                    {
                        //noinspection VisibleForTests
                        long bytes = taskSnapshot.getBytesTransferred();
                        String bytesS = String.valueOf(bytes);
                        e.onNext(bytesS);
                    }
                });
            }
        }, BackpressureStrategy.BUFFER);

    }

The Error is: Inferred type 'E' for type parameter 'E' is not within its bound; should implement 'org.reactivestreams.Subscriber

My guess is, that Flowable does not implement Disposable and thats why it wont compile. I have no clue if thats true or not, just my best guess so far. Or do I have to change subscribeWith() to subscribe()? I dont know what the impact of that change would be.

Anyway suggestions how to make this work and get this Flowable into my compositedisposable is really appreciated.

Thanks guys!

Edit:

Tried to change the DisposableObserver into a Subscriber. But this results in the following Error: Compiler Error

like image 245
Al Cabone Avatar asked Jan 30 '26 02:01

Al Cabone


1 Answers

Flowables use Subscription instead of Disposable for the reason of Backpressure. Basically use Subscription.request() method to tell observable how many items I want for that moment.

Change your code:

private void uploadPicToFireBaseStorage(String path)
{
    compositeDisposable.add(storageService.uploadPictureRx(path)
            .subscribeOn(Schedulers.io())
            .observeOn(mainScheduler)
            .subscribeWith(new DisposableObserver<String>()
            {

                @Override
                public void onNext(String s)
                {
                    String ss = s;
                    System.out.println(ss);
                }

                @Override
                public void onError(Throwable e)
                {
                    e.printStackTrace();
                }

                @Override
                public void onComplete()
                {
                    view.displayToast("Picture Upload completed");
                }
            })
    );
}

into

private void uploadPicToFireBaseStorage(String path)
{
    compositeDisposable.add(storageService.uploadPictureRx(path)
            .subscribeOn(Schedulers.io())
            .observeOn(mainScheduler)
            .subscribeWith(new ResourceSubscriber<String>()
            {
                @Override
                public void onNext(String s)
                {
                    String ss = s;
                    System.out.println(ss);
                }

                @Override
                public void onError(Throwable e)
                {
                    e.printStackTrace();
                }

                @Override
                public void onComplete()
                {
                    view.displayToast("Picture Upload completed");
                }
            })
    );
}
like image 121
Phoenix Wang Avatar answered Jan 31 '26 19:01

Phoenix Wang