I am trying to implement a reduce on a WindowedStream, like so:
.keyBy(t -> t.key)
.timeWindow(Time.of(15, MINUTES), Time.of(1, MINUTES))
.reduce(new ReduceFunction<TwitterSentiments>() {
@Override
public TwitterSentiments reduce(TwitterSentiments t2, TwitterSentiments t1) throws Exception {
t2.positive += t1.positive;
t2.neutral += t1.neutral;
t2.negative += t1.negative;
return t2;
}
});
The problem I am having is that when I call stream.print(), I get many values (looks like one per TwitterSentiments object, instead of a single aggregate object.
I have also tried using an AggregationFunction like this, with the same issue:
.aggregate(new AggregateFunction<TwitterSentiments, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
@Override
public Tuple3<Long, Long, Long> createAccumulator() {
return new Tuple3<Long, Long, Long>(0L,0L,0L);
}
@Override
public Tuple3<Long, Long, Long> add(TwitterSentiments ts, Tuple3<Long, Long, Long> accumulator) {
return new Tuple3<Long, Long, Long>(
accumulator.f0 + ts.positive.longValue(),
accumulator.f1 + ts.neutral.longValue(),
accumulator.f2 + ts.negative.longValue()
);
}
@Override
public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator) {
return accumulator;
}
@Override
public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> accumulator1, Tuple3<Long, Long, Long> accumulator2) {
return new Tuple3<Long, Long, Long>(
accumulator1.f0 + accumulator2.f0,
accumulator1.f1 + accumulator2.f1,
accumulator1.f2 + accumulator2.f1);
}
});
What are the reasons why stream.print() would still output many records after these aggregations?
If you don't need a result per key, you can use timeWindowAll to produce a single result. However, timeWindowAll does not run in parallel. If you want to compute the result in a more scalable way, you can do this:
.keyBy(t -> t.key)
.timeWindow(<time specification>)
.reduce(<reduce function>)
.timeWindowAll(<same time specification>)
.reduce(<same reduce function>)
You might expect Flink's runtime to be smart enough to do this parallel pre-aggregation for you (provided you are using a ReduceFunction or AggregateFunction), but it's not.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With