Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink Left-Outer-JOIN: Enriching a Stream with Data from a csv file

I am using Flink 1.4.0.

I am consuming data from a Kafka Topic into a DataStream. The data is transformed into a POJO, say Employee, and I end up with a something like:

DataStream<Employee> employeeStream = ...;

Now, I need to enrich some fields in the instances of this stream using data from a big csv file. I found a nice way to load the csv and create a another DataStream:

DataStream<Enrichements> enrichmentsStream = ...;

Now, both POJOs share one field (id) which can be used for a JOIN operation. If these were DataSets, I would have been able to apply a leftOuterJoin(), but they are not. I don't care about windowing, as I want any Employee to be enriched with information from Enrichments, if its id is present in the csv. How do I do this? Would a join operation that ignores windowing work? Would it be resource hungry? Would it look like this?:

    employeeStream 
        .join(enrichmentsStream )
        .where(new SelectKeyForEmployee())
        .equalTo(new SelectKeyForEnrichments())
        .window(?????)
        .apply(new JoinEnrichFunction());

Also, since the window has to be shared by the two streams, how do I define their windowing prior to applying the JOIN function and what would be the implementation of the JoinEnrichFunction()?

like image 676
Christos Hadjinikolis Avatar asked Oct 24 '25 03:10

Christos Hadjinikolis


1 Answers

It turns out that a join operation is redundant in this case. Stream joins are not intuitive and they make sense only when applied between streams that share the same windowing mechanisms.

In this case, a map function is more than enough to satisfy the enrichment objective detailed here. The following snippet of code should be clarifying enough:

public MainClass {
  public void main(String[] args) {
    ...
    // Some custom way of loading the csv data into a Map<POJO> format
    MetadataLoader loader = new MetadataLoader("pathToData.csv");
    Map<Employee> metadataHashMap = loader.getMetadataMap(employeeEnrichmentData);
    ...

    // Enrichment
    SingleOutputStreamOperator<Employee>> enrichedStream = rawStream
            .map(new MapMetadataToEmployees(metadataHashMap))
            .name("Enrich: with Employee Metadata");

    // Some sink opeartion
    ...

  }
}  

final class MapMetadataToEmployees implements MapFunction<Employee, Employee>, Serializable {

  private Map<Employee> metaDataMap;

  public MapMetadataToEmployees(Map<String, Employee> metaDataMap) {
      this.metaDataMap = metaDataMap;
  }

  @Override
  public Employee map(Employee employee) {

      if (metaDataMap.containsKey(employee.getId())) {

          Employee employeeWithMetaData = metaDataMap.get(employee.getId());

          employee.setSalary(employeeWithMetaData.getSalary);
          employee.setRank(employeeWithMetaData.getRank());
          employee.setBusinessTitle(employeeWithMetaData.getBusinessTitle());
      }

      return employee;
  }
}      
like image 114
Christos Hadjinikolis Avatar answered Oct 27 '25 04:10

Christos Hadjinikolis



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!