I'm using the MongoDB Reactive Streams Java API which I implemented following this example, but I'm encountering a serious problem: sometimes, when I try to query a collection, the await methods doesn't work, and it hangs until the timeout is reached.
The onSubscribe methods gets called correctly, but then neither onNext, nor onError nor onComplete get called.
There doesn't seem to be a specific circumstance causing this issue.
This is my code
MongoDatabase database = MongoDBConnector.getClient().getDatabase("myDb");
MongoCollection<Document> collection = database.getCollection("myCollection");
FindPublisher<Document> finder = collection.find(Filters.exists("myField"));
SettingSubscriber tagSub = new SettingSubscriber(finder);
//SettingsSubscriber is a subclass of ObservableSubscriber which calls publisher.subscribe(this)
tagSub.await(); //this is where it hangs
return tagSub.getWrappedData();
I wrote a simple implementation of what I assumed the SettingSubscriber looked like and tried to recreate the problem using a groovy script. I couldn't - my code runs without hanging, prints each output record and exits. Code for reference below:
@Grab(group = 'org.mongodb', module = 'mongodb-driver-reactivestreams', version = '4.3.3')
@Grab(group = 'org.slf4j', module = 'slf4j-api', version = '1.7.32')
@Grab(group = 'ch.qos.logback', module = 'logback-classic', version = '1.2.6')
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;
MongoClientSettings.Builder clientSettingsBuilder = MongoClientSettings.builder()
.applyToClusterSettings { clusterSettingsBuilder ->
clusterSettingsBuilder.hosts( Arrays.asList(new ServerAddress("localhost", 27017)))
};
MongoClient mongoClient = MongoClients.create(clientSettingsBuilder.build());
MongoDatabase database = mongoClient.getDatabase("myDb");
MongoCollection<Document> collection = database.getCollection("myCollection");
FindPublisher<Document> finder = collection.find(Filters.exists("myField"));
SettingSubscriber tagSub = new SettingSubscriber(finder);
tagSub.await();
class SettingSubscriber implements Subscriber<Document> {
private final CountDownLatch latch = new CountDownLatch(1);
private Subscription subscription;
private List<Document> data = new ArrayList<>();
public SettingSubscriber(FindPublisher<Document> finder) {
finder.subscribe(this);
}
@Override
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(final Document document) {
System.out.println("Received: " + document);
data.add(document);
subscription.request(1);
}
@Override
public void onError(final Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
@Override
public void onComplete() {
System.out.println("Completed");
latch.countDown();
}
public List<Document> getWrappedData() {
return data;
}
public void await() throws Throwable {
await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public void await(final long timeout, final TimeUnit unit) throws Throwable {
if (!latch.await(timeout, unit)) {
System.out.println("Publish timed out");
}
}
}
Can you compare this implementation of the SettingSubscriber with yours to see if something is missed?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With