Yue Ma created FLINK-22888:
------------------------------ Summary: Matches results may be wrong when using notNext as the last part of the pattern with Window Key: FLINK-22888 URL: https://issues.apache.org/jira/browse/FLINK-22888 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.9.0 Reporter: Yue Ma the pattern is like Pattern.begin("start").where(records == "a") .notNext("notNext").where(records == "b") .withIn(5milliseconds). If there is only one event *"a"* in 5 milliseconds. I think this *“a”* should be output as the correct result of the match next time in advanceTime. But in the actual operation of CEP. This “a” will be treated as matching timeout data {code:java} // code placeholder @Test public void testNoNextWithWindow() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // (Event, timestamp) DataStream<Event> input = env.fromElements( Tuple2.of(new Event(1, "start", 1.0), 5L), // last element for high final watermark Tuple2.of(new Event(5, "final", 5.0), 100L) ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { @Override public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) { return element.f1; } @Override public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) { return new Watermark(lastElement.f1 - 5); } }).map(new MapFunction<Tuple2<Event, Long>, Event>() { @Override public Event map(Tuple2<Event, Long> value) throws Exception { return value.f0; } }); Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("start"); } }).notNext("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("middle"); } }).within(Time.milliseconds(5L)); DataStream<String> result = CEP.pattern(input, pattern).select( new PatternSelectFunction<Event, String>() { @Override public String select(Map<String, List<Event>> pattern) { StringBuilder builder = new StringBuilder(); builder.append(pattern.get("start").get(0).getId()); return builder.toString(); } } ); List<String> resultList = new ArrayList<>(); DataStreamUtils.collect(result).forEachRemaining(resultList::add); resultList.sort(String::compareTo); assertEquals(Arrays.asList("1"), resultList); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |