Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava buffer/window until element suffers condition

Tags:

rx-java

I am trying to read lines from a reader and group them to blocks that belong together.

Source text:

bla1
bla2
### block separator ###
bla3
bla4
### block separator ###
...

I need to get the two blocks (bla1, bla2) and (bla3, bla4).

Code:

import org.apache.commons.lang3.StringUtils;
import rx.Observable;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.Reader;
import java.util.Iterator;

public class BlockBuilder {

  public static void main(String[] args) {

    try {
      FileReader fileReader = new FileReader("/path/to/some/file");
      LineIterable lineIterable = new LineIterable(fileReader);

      Observable.from(lineIterable)
          .buffer(100)
          // Needed instead of time/count: until line matches condition
          // something like .buffer(line -> line.equals("### block separator ###")
          .forEach(gatheredLines -> {
            String gatheredBlock = StringUtils.join(gatheredLines, '\n');
            System.out.println(gatheredBlock);
            System.out.println("###### ###### ###### ######");
          });
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }

  private static class LineIterable implements Iterable<String> {
    private final Iterator<String> iterator;
    public LineIterable(Reader reader) {
      iterator = new BufferedReader(reader).lines().iterator();
    }
    @Override
    public Iterator<String> iterator() {
      return iterator;
    }
  } 
}

It doesn't matter if buffer or window is used or if I'm completely wrong for thinking of those two.

I thought it must be possible with the bufferClosingSelector for the buffer or the closingSelector for the window. Both are functions that create an Observer which can trigger the closing of the current buffer or window but I can't see where I can get hold of the current line here.

like image 208
snieke Avatar asked Mar 20 '26 00:03

snieke


1 Answers

You can publish your source and use it for both buffering and buffer boundary:

Observable<String> source = Observable.just(
        "a", "b", "#", 
        "c", "d", "e", "#", 
        "f", "g");

source.publish(p -> 
        p.filter(v -> !"#".equals(v))
        .buffer(() -> p.filter(v -> "#".equals(v))))
.subscribe(System.out::println);
like image 197
akarnokd Avatar answered Mar 25 '26 00:03

akarnokd



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!