Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Flink reduce results in many values instead of one

I am trying to implement a reduce on a WindowedStream, like so:

                .keyBy(t -> t.key)
            .timeWindow(Time.of(15, MINUTES), Time.of(1, MINUTES))
            .reduce(new ReduceFunction<TwitterSentiments>() {
                @Override
                public TwitterSentiments reduce(TwitterSentiments t2, TwitterSentiments t1) throws Exception {
                    t2.positive += t1.positive;
                    t2.neutral += t1.neutral;
                    t2.negative += t1.negative;

                    return t2;
                }
            });

The problem I am having is that when I call stream.print(), I get many values (looks like one per TwitterSentiments object, instead of a single aggregate object.

I have also tried using an AggregationFunction like this, with the same issue:

                .aggregate(new AggregateFunction<TwitterSentiments, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
                @Override
                public Tuple3<Long, Long, Long> createAccumulator() {
                    return new Tuple3<Long, Long, Long>(0L,0L,0L);
                }

                @Override
                public Tuple3<Long, Long, Long> add(TwitterSentiments ts, Tuple3<Long, Long, Long> accumulator) {
                    return new Tuple3<Long, Long, Long>(
                            accumulator.f0 + ts.positive.longValue(),
                            accumulator.f1 + ts.neutral.longValue(),
                            accumulator.f2 + ts.negative.longValue()
                    );
                }

                @Override
                public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator) {
                    return accumulator;
                }

                @Override
                public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> accumulator1, Tuple3<Long, Long, Long> accumulator2) {
                    return new Tuple3<Long, Long, Long>(
                            accumulator1.f0 + accumulator2.f0,
                            accumulator1.f1 + accumulator2.f1,
                            accumulator1.f2 + accumulator2.f1);
                }
            });

What are the reasons why stream.print() would still output many records after these aggregations?

like image 542
Chris Katzmann Avatar asked Dec 18 '25 08:12

Chris Katzmann


1 Answers

If you don't need a result per key, you can use timeWindowAll to produce a single result. However, timeWindowAll does not run in parallel. If you want to compute the result in a more scalable way, you can do this:

    .keyBy(t -> t.key)
    .timeWindow(<time specification>)
    .reduce(<reduce function>)
    .timeWindowAll(<same time specification>)
    .reduce(<same reduce function>)

You might expect Flink's runtime to be smart enough to do this parallel pre-aggregation for you (provided you are using a ReduceFunction or AggregateFunction), but it's not.

like image 64
David Anderson Avatar answered Dec 19 '25 23:12

David Anderson



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!