[jira] [Created] (FLINK-20814) The CEP code is not running properly

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20814) The CEP code is not running properly

Shang Yuanchun (Jira)
little-tomato created FLINK-20814:
-------------------------------------

             Summary: The CEP code is not running properly
                 Key: FLINK-20814
                 URL: https://issues.apache.org/jira/browse/FLINK-20814
             Project: Flink
          Issue Type: Bug
          Components: Library / CEP
    Affects Versions: 1.12.0
         Environment: flink1.12.0
jdk1.8

            Reporter: little-tomato


The cep code is running properly on flink1.11.2,but it is not working properly on flink1.12.0.
Can somebody help me?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // DataStream : source
                DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1,"Device01", 22.0),
                new TemperatureEvent(1,"Device01", 27.1), new TemperatureEvent(2,"Device01", 28.1),
                new TemperatureEvent(1,"Device01", 22.2), new TemperatureEvent(3,"Device01", 22.1),
                new TemperatureEvent(1,"Device02", 22.3), new TemperatureEvent(4,"Device02", 22.1),
                new TemperatureEvent(1,"Device02", 22.4), new TemperatureEvent(5,"Device02", 22.7),
                new TemperatureEvent(1,"Device02", 27.0), new TemperatureEvent(6,"Device02", 30.0));
       
        Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
                .subtype(TemperatureEvent.class)
                .where(new SimpleCondition<TemperatureEvent>() {
                @Override
                    public boolean filter(TemperatureEvent subEvent) {
                        if (subEvent.getTemperature() >= 26.0) {
                            return true;
                        }
                        return false;
                    }
                }).where(new SimpleCondition<TemperatureEvent>() {
                @Override
                public boolean filter(TemperatureEvent subEvent) {
                        if (subEvent.getMachineName().equals("Device02")) {
                            return true;
                        }
                        return false;
                    }
                }).within(Time.seconds(10));

        DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
                .select(
                        new RichPatternSelectFunction<TemperatureEvent, Alert>() {
                            /**
                                                         *
                                                         */
                                                        private static final long serialVersionUID = 1L;
                                                        @Override
                                                        public void open(Configuration parameters) throws Exception {
                                                                System.out.println(getRuntimeContext().getUserCodeClassLoader());
                                                        }

                                                        @Override
                            public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
                           
                                return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));

                            }
                        });

        patternStream.print();

        env.execute("CEP on Temperature Sensor");

it should be output(on flink1.11.2):
Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]]
Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]]




--
This message was sent by Atlassian Jira
(v8.3.4#803005)