little-tomato created FLINK-20814:
------------------------------------- Summary: The CEP code is not running properly Key: FLINK-20814 URL: https://issues.apache.org/jira/browse/FLINK-20814 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.12.0 Environment: flink1.12.0 jdk1.8 Reporter: little-tomato The cep code is running properly on flink1.11.2,but it is not working properly on flink1.12.0. Can somebody help me? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // DataStream : source DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1,"Device01", 22.0), new TemperatureEvent(1,"Device01", 27.1), new TemperatureEvent(2,"Device01", 28.1), new TemperatureEvent(1,"Device01", 22.2), new TemperatureEvent(3,"Device01", 22.1), new TemperatureEvent(1,"Device02", 22.3), new TemperatureEvent(4,"Device02", 22.1), new TemperatureEvent(1,"Device02", 22.4), new TemperatureEvent(5,"Device02", 22.7), new TemperatureEvent(1,"Device02", 27.0), new TemperatureEvent(6,"Device02", 30.0)); Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start") .subtype(TemperatureEvent.class) .where(new SimpleCondition<TemperatureEvent>() { @Override public boolean filter(TemperatureEvent subEvent) { if (subEvent.getTemperature() >= 26.0) { return true; } return false; } }).where(new SimpleCondition<TemperatureEvent>() { @Override public boolean filter(TemperatureEvent subEvent) { if (subEvent.getMachineName().equals("Device02")) { return true; } return false; } }).within(Time.seconds(10)); DataStream<Alert> patternStream = CEP.pattern(input, warningPattern) .select( new RichPatternSelectFunction<TemperatureEvent, Alert>() { /** * */ private static final long serialVersionUID = 1L; @Override public void open(Configuration parameters) throws Exception { System.out.println(getRuntimeContext().getUserCodeClassLoader()); } @Override public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception { return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start")); } }); patternStream.print(); env.execute("CEP on Temperature Sensor"); it should be output(on flink1.11.2): Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]] Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]] -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |