Hi all,
In case it is useful to people: I was testing new DataStream convergent batch/streaming API and join did not work in batch mode : https://issues.apache.org/jira/browse/FLINK-22587 I had to manually code an inner join using *KeyedCoProcessFunction* and *states*. Here is an example of a manual join (implementing part of TPC-DS query3 with avro GenericRecords) it may not be the best code, but it could serve as an example for people interested Best, Etienne // Join1: WHERE date_dim.d_date_sk = store_sales.ss_sold_date_sk Schema schemaJoinDateSk =AvroUtils .getSchemaMerged(dateDimAvroSchema, storeSalesAvroSchema, "recordsJoinDateSk"); final DataStream<GenericRecord> recordsJoinDateSk = dateDim .keyBy((KeySelector<GenericRecord, Integer>) value -> (Integer) value.get("d_date_sk")) .connect(storeSales.keyBy( (KeySelector<GenericRecord, Integer>) value -> (Integer) value.get("ss_sold_date_sk"))) .process(new JoinRecords(dateDimAvroSchema, storeSalesAvroSchema, schemaJoinDateSk)) .returns(new GenericRecordAvroTypeInfo(schemaJoinDateSk)); Where *JoinRecords* does a manual inner join with states : public JoinRecords(Schema schemaLhs, Schema schemaRhs, Schema outputSchema) { this.schemaLhs = schemaLhs; this.schemaRhs = schemaRhs; this.outputSchema = outputSchema; this.schemaLhsString = schemaLhs.toString(); this.schemaRhsString = schemaRhs.toString(); this.schemaString = outputSchema.toString(); } @Override public void open(Configuration parameters)throws Exception { state1 = getRuntimeContext().getMapState( new MapStateDescriptor<>("records_dataStream_1", Integer.class, GenericRecord.class)); state2 = getRuntimeContext().getMapState( new MapStateDescriptor<>("records_dataStream_2", Integer.class, GenericRecord.class)); } private GenericRecordjoinRecords(GenericRecord first, GenericRecord second) throws Exception { // after deserialization if (schemaLhs ==null) { schemaLhs =new Schema.Parser().parse(schemaLhsString); } if (schemaRhs ==null) { schemaRhs =new Schema.Parser().parse(schemaRhsString); } if (outputSchema ==null) { outputSchema =new Schema.Parser().parse(schemaString); } GenericRecord outputRecord =new GenericRecordBuilder(outputSchema).build(); for (Schema.Field f :outputSchema.getFields()) { if (schemaLhs.getField(f.name()) !=null) { outputRecord.put(f.name(), first.get(f.name())); }else if (schemaRhs.getField(f.name()) !=null) { outputRecord.put(f.name(), second.get(f.name())); } } return outputRecord; } private GenericRecordstateJoin(GenericRecord currentRecord, int currentDatastream, Context context)throws Exception { final Integer currentKey = context.getCurrentKey(); MapState<Integer, GenericRecord> myState = currentDatastream ==1 ?state1 :state2; MapState<Integer, GenericRecord> otherState = currentDatastream ==1 ?state2 :state1; // join with the other datastream by looking into the state of the other datastream final GenericRecord otherRecord = otherState.get(currentKey); if (otherRecord ==null) {// did not find a record to join with, store record for later join myState.put(currentKey, currentRecord); return null; }else {// found a record to join with (same key), join (with using the correct avro schema) return currentDatastream ==1 ? joinRecords(currentRecord, otherRecord) : joinRecords(otherRecord, currentRecord); } } @Override public void processElement1(GenericRecord currentRecord, Context context, Collector<GenericRecord> collector)throws Exception { final GenericRecord jointRecord = stateJoin(currentRecord, 1, context); if (jointRecord !=null) { collector.collect(jointRecord); } } @Override public void processElement2(GenericRecord currentRecord, Context context, Collector<GenericRecord> collector)throws Exception { final GenericRecord jointRecord = stateJoin(currentRecord, 2, context); if (jointRecord !=null) { collector.collect(jointRecord); } } } |
Free forum by Nabble | Edit this page |