For example, there are two streams. One is advertisements showed to users. The tuple in which could be described as (advertiseId, showed timestamp). The other one is click stream -- (advertiseId, clicked timestamp). We want get a joined stream, which includes all the advertisement that is clicked by user in 20 minutes after showed. My solution is to join these two streams on a SlidingTimeWindow. But in the joined stream, there are many repeated tuples. How could I get joined tuple only one in new stream?
stream1.join(stream2)
        .where(0)
        .equalTo(0)
        .window(SlidingTimeWindows.of(Time.of(30, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
Solution 1:
Let flink support join two streams on separate windows like Spark streaming. In this case, implement SlidingTimeWindows(21 mins, 1 min) on advertisement stream and TupblingTimeWindows(1 min) on Click stream, then join these two windowed streams.
TupblingTimeWindows could avoid duplicate records in the joined stream. 21 mins size SlidingTimeWindows could avoid missing legal clicks. One issue is there would be some illegal click(click after 20 mins) in the joined stream. This problem could be fixed easily by adding a filter.
MultiWindowsJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
            new MultiWindowsJoinedStreams<>(advertisement, click);
    DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams.where(keySelector)
            .window(SlidingTimeWindows.of(Time.of(21, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
            .equalTo(keySelector)
            .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
            .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
                private static final long serialVersionUID = -3625150954096822268L;
                @Override
                public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                    return new Tuple3<>(first.f0, first.f1, second.f1);
                }
            });
    joinedStream = joinedStream.filter(new FilterFunction<Tuple3<String, Long, Long>>() {
        private static final long serialVersionUID = -4325256210808325338L;
        @Override
        public boolean filter(Tuple3<String, Long, Long> value) throws Exception {
            return value.f1<value.f2&&value.f1+20000>=value.f2;
        }
    });
Solution 2:
Flink supports join operation without window. A join operator implement the interface TwoInputStreamOperator keeps two buffers(time length based) of these two streams and output one joined stream.
DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).keyBy(keySelector).assignTimestamps(timestampExtractor1);
    DataStream<Tuple2<String, Long>> click = env
            .addSource(new FlinkKafkaConsumer082<String>("click", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).keyBy(keySelector).assignTimestamps(timestampExtractor2);
    NoWindowJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
            new NoWindowJoinedStreams<>(advertisement, click);
    DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams
            .where(keySelector)
            .buffer(Time.of(20, TimeUnit.SECONDS))
            .equalTo(keySelector)
            .buffer(Time.of(5, TimeUnit.SECONDS))
            .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
                private static final long serialVersionUID = -5075871109025215769L;
                @Override
                public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                    return new Tuple3<>(first.f0, first.f1, second.f1);
                }
            });
I implemented two new join operators base on Flink streaming API TwoInputTransformation. Please check Flink-stream-join. I will add more tests to this repository.
On your code, you defined an overlapping sliding window (slide is smaller than window size). If you don't want to have duplicates you can define a non-overlapping window by only specifying the window size (the default slide is equal to the window size).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With