join example in batch mode using DataStream API

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

join example in batch mode using DataStream API

Etienne Chauchot
Hi all,

In case it is useful to people:

I was testing new DataStream convergent batch/streaming API and join did
not work in batch mode : https://issues.apache.org/jira/browse/FLINK-22587

I had to manually code an inner join using *KeyedCoProcessFunction* and
*states*. Here is an example of a manual join (implementing part of
TPC-DS query3 with avro GenericRecords) it may not be the best code, but
it could serve as an example for people interested

Best,

Etienne


// Join1: WHERE date_dim.d_date_sk = store_sales.ss_sold_date_sk Schema schemaJoinDateSk =AvroUtils .getSchemaMerged(dateDimAvroSchema, storeSalesAvroSchema, "recordsJoinDateSk"); final DataStream<GenericRecord> recordsJoinDateSk = dateDim
   .keyBy((KeySelector<GenericRecord, Integer>) value -> (Integer) value.get("d_date_sk"))
   .connect(storeSales.keyBy(
     (KeySelector<GenericRecord, Integer>) value -> (Integer) value.get("ss_sold_date_sk")))
   .process(new JoinRecords(dateDimAvroSchema, storeSalesAvroSchema, schemaJoinDateSk))
   .returns(new GenericRecordAvroTypeInfo(schemaJoinDateSk));


Where *JoinRecords* does a manual inner join with states :


   public JoinRecords(Schema schemaLhs, Schema schemaRhs, Schema outputSchema) {
     this.schemaLhs = schemaLhs; this.schemaRhs = schemaRhs; this.outputSchema = outputSchema; this.schemaLhsString = schemaLhs.toString(); this.schemaRhsString = schemaRhs.toString(); this.schemaString = outputSchema.toString(); }

   @Override public void open(Configuration parameters)throws Exception {
     state1 = getRuntimeContext().getMapState(
       new MapStateDescriptor<>("records_dataStream_1", Integer.class, GenericRecord.class)); state2 = getRuntimeContext().getMapState(
       new MapStateDescriptor<>("records_dataStream_2", Integer.class, GenericRecord.class)); }

   private GenericRecordjoinRecords(GenericRecord first, GenericRecord second)
     throws Exception {
     // after deserialization if (schemaLhs ==null) {
       schemaLhs =new Schema.Parser().parse(schemaLhsString); }
     if (schemaRhs ==null) {
       schemaRhs =new Schema.Parser().parse(schemaRhsString); }
     if (outputSchema ==null) {
       outputSchema =new Schema.Parser().parse(schemaString); }

     GenericRecord outputRecord =new GenericRecordBuilder(outputSchema).build(); for (Schema.Field f :outputSchema.getFields()) {
       if (schemaLhs.getField(f.name()) !=null) {
         outputRecord.put(f.name(), first.get(f.name())); }else if (schemaRhs.getField(f.name()) !=null) {
         outputRecord.put(f.name(), second.get(f.name())); }
     }
     return outputRecord; }

   private GenericRecordstateJoin(GenericRecord currentRecord, int currentDatastream, Context context)throws Exception {
     final Integer currentKey = context.getCurrentKey(); MapState<Integer, GenericRecord> myState = currentDatastream ==1 ?state1 :state2; MapState<Integer, GenericRecord> otherState = currentDatastream ==1 ?state2 :state1; // join with the other datastream by looking into the state of
the other datastream final GenericRecord otherRecord = otherState.get(currentKey); if (otherRecord ==null) {// did not find a record to join with, store record for later join myState.put(currentKey, currentRecord); return null; }else {// found a record to join with (same key), join (with using the correct
avro schema) return currentDatastream ==1 ? joinRecords(currentRecord, otherRecord) : joinRecords(otherRecord, currentRecord); }
   }

   @Override public void processElement1(GenericRecord currentRecord, Context context, Collector<GenericRecord> collector)throws Exception {
     final GenericRecord jointRecord = stateJoin(currentRecord, 1, context); if (jointRecord !=null) {
       collector.collect(jointRecord); }
   }

   @Override public void processElement2(GenericRecord currentRecord, Context context, Collector<GenericRecord> collector)throws Exception {
     final GenericRecord jointRecord = stateJoin(currentRecord, 2, context); if (jointRecord !=null) {
       collector.collect(jointRecord); }
   }
}