I am getting started with flink and having a look at one of the official tutorials.
To my understanding the goal of this exercise is to join the two streams on the time attribute.
Task:
The result of this exercise is a data stream of Tuple2 records, one for each distinct rideId. You should ignore the END events, and only join the event for the START of each ride with its corresponding fare data.
The resulting stream should be printed to standard out.
Question: How is the EnrichmentFunction able to join the two streams aka. how does it know which fair to join with which ride? I expected it to buffer multiple fairs/rides until for an incoming fair/ride there is a matching partner.
In my understanding it just saves every ride/fair it sees and combines it with the next best ride/fair. Why is this a proper join?
Provided Solution:
/*
 * Copyright 2017 data Artisans GmbH
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.dataartisans.flinktraining.solutions.datastream_java.state;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;
/**
 * Java reference implementation for the "Stateful Enrichment" exercise of the Flink training
 * (http://training.data-artisans.com).
 *
 * The goal for this exercise is to enrich TaxiRides with fare information.
 *
 * Parameters:
 * -rides path-to-input-file
 * -fares path-to-input-file
 *
 */
public class RidesAndFaresSolution extends ExerciseBase {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        final String ridesFile = params.get("rides", pathToRideData);
        final String faresFile = params.get("fares", pathToFareData);
        final int delay = 60;                   // at most 60 seconds of delay
        final int servingSpeedFactor = 1800;    // 30 minutes worth of events are served every second
        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(ExerciseBase.parallelism);
        DataStream<TaxiRide> rides = env
                .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
                .filter((TaxiRide ride) -> ride.isStart)
                .keyBy("rideId");
        DataStream<TaxiFare> fares = env
                .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
                .keyBy("rideId");
        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
                .connect(fares)
                .flatMap(new EnrichmentFunction());
        printOrTest(enrichedRides);
        env.execute("Join Rides with Fares (java RichCoFlatMap)");
    }
    public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
        // keyed, managed state
        private ValueState<TaxiRide> rideState;
        private ValueState<TaxiFare> fareState;
        @Override
        public void open(Configuration config) {
            rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
            fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
        }
        @Override
        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiFare fare = fareState.value();
            if (fare != null) {
                fareState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                rideState.update(ride);
            }
        }
        @Override
        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiRide ride = rideState.value();
            if (ride != null) {
                rideState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                fareState.update(fare);
            }
        }
    }
}
In the context of this particular training exercise on stateful enrichment, there are three events for each value of rideId -- a TaxiRide start event, a TaxiRide end event, and a TaxiFare. The objective of this exercise is to connect each TaxiRide start event with the one TaxiFare event having the same rideId -- or in other words, to join the ride stream and fare stream on rideId, while knowing that there will be only one of each.
This exercise is demonstrating how keyed state works in Flink. Keyed state is effectively a sharded key-value store. When we have an item of ValueState, such as ValueState<TaxiRide> rideState, Flink will store a separate record in its state backend for each distinct value of the key (the rideId). 
Each time flatMap1 and flatMap2 are called there is a key (a rideId) implicitly in context, and when we call rideState.update(ride) or rideState.value() we are not accessing a single variable, but rather setting and getting an entry in a key-value store, using the rideId as the key.
In this exercise, both streams are keyed by the rideId, so there is potentially one element of rideState and one element of fareState for each distinct rideId. Hence the solution that's been provided is buffering lots of rides and fares, but only one for each rideId (which is enough, given that the rides and fares are perfectly paired in this dataset).
So, you asked:
How is the EnrichmentFunction able to join the two streams aka. how does it know which fare to join with which ride?
And the answer is
It joins the fare having the same
rideId.
This particular exercise you've asked about shows how to implement a simple enrichment join for the purpose of getting across the ideas of keyed state, and connected streams. But more complex joins are certainly possible with Flink. See the docs on joins using the DataStream API, joins with Flink's Table API, and joins with Flink SQL.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With