import com.ericsson.config.Configuration; import com.ericsson.pojos.DataPojo; import com.ericsson.pojos.out.TimeDrift; import com.ericsson.timedrift.TimeShiftWindowMapFunction; import com.ericsson.utils.DataPojoSerializer; import com.ericsson.utils.PojoSerializer; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedDataStream; import org.apache.flink.streaming.api.datastream.WindowedDataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.concurrent.TimeUnit; /** * Time drift host and time based. */ public class TimeDriftKafkaExampleTst { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream in = env.readTextFile("./datasets/first10000.json"); DataStream events = in.map(new DeserialiseMap()).returns(DataPojo.class); KeyedDataStream groupedEvents = events.groupBy(t -> { return t.getHost(); }); WindowedDataStream drift = groupedEvents //.window(Count.of(50)).every(Count.of(10)) .window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(20, TimeUnit.SECONDS)) .foldWindow(new TimeDrift(),new TimeShiftFold()); //.mapWindow(new TimeShiftWindowMapFunction()); DataStream out = drift.flatten(); out.print(); env.execute(); } private static class DeserialiseMap implements MapFunction { private static final DataPojoSerializer deserializer = new DataPojoSerializer(); @Override public DataPojo map(String value) throws Exception { return deserializer.deserialize(value.getBytes()); } } private static class TimeShiftFold implements FoldFunction { @Override public TimeDrift fold(TimeDrift accumulator, DataPojo value) throws Exception { long ts = value.getTimestampInMillis(); long highest = accumulator.highest; if(accumulator.host!= null && !accumulator.host.equalsIgnoreCase(value.getHost())){ System.err.println(accumulator.host + " != " + value.getHost()); } accumulator.host = value.getHost(); accumulator.totalNbEvents += 1; if(highest>ts){ accumulator.totalOoOrderEvents += 1; accumulator.accumulatedDelta += highest - ts; return accumulator; } // else case accumulator.highest = ts; return accumulator; } } }