I am using Flink 1.4.0.
I am consuming data from a Kafka Topic into a DataStream. The data is transformed into a POJO, say Employee, and I end up with a something like:
DataStream<Employee> employeeStream = ...;
Now, I need to enrich some fields in the instances of this stream using data from a big csv file. I found a nice way to load the csv and create a another DataStream:
DataStream<Enrichements> enrichmentsStream = ...;
Now, both POJOs share one field (id) which can be used for a JOIN operation. If these were DataSets, I would have been able to apply a leftOuterJoin(), but they are not. I don't care about windowing, as I want any Employee to be enriched with information from Enrichments, if its id is present in the csv. How do I do this? Would a join operation that ignores windowing work? Would it be resource hungry? Would it look like this?:
employeeStream
.join(enrichmentsStream )
.where(new SelectKeyForEmployee())
.equalTo(new SelectKeyForEnrichments())
.window(?????)
.apply(new JoinEnrichFunction());
Also, since the window has to be shared by the two streams, how do I define their windowing prior to applying the JOIN function and what would be the implementation of the JoinEnrichFunction()?
It turns out that a join operation is redundant in this case. Stream joins are not intuitive and they make sense only when applied between streams that share the same windowing mechanisms.
In this case, a map function is more than enough to satisfy the enrichment objective detailed here. The following snippet of code should be clarifying enough:
public MainClass {
public void main(String[] args) {
...
// Some custom way of loading the csv data into a Map<POJO> format
MetadataLoader loader = new MetadataLoader("pathToData.csv");
Map<Employee> metadataHashMap = loader.getMetadataMap(employeeEnrichmentData);
...
// Enrichment
SingleOutputStreamOperator<Employee>> enrichedStream = rawStream
.map(new MapMetadataToEmployees(metadataHashMap))
.name("Enrich: with Employee Metadata");
// Some sink opeartion
...
}
}
final class MapMetadataToEmployees implements MapFunction<Employee, Employee>, Serializable {
private Map<Employee> metaDataMap;
public MapMetadataToEmployees(Map<String, Employee> metaDataMap) {
this.metaDataMap = metaDataMap;
}
@Override
public Employee map(Employee employee) {
if (metaDataMap.containsKey(employee.getId())) {
Employee employeeWithMetaData = metaDataMap.get(employee.getId());
employee.setSalary(employeeWithMetaData.getSalary);
employee.setRank(employeeWithMetaData.getRank());
employee.setBusinessTitle(employeeWithMetaData.getBusinessTitle());
}
return employee;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With