[jira] [Created] (FLINK-22888) Matches results may be wrong when using notNext as the last part of the pattern with Window

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22888) Matches results may be wrong when using notNext as the last part of the pattern with Window

Shang Yuanchun (Jira)
Yue Ma created FLINK-22888:
------------------------------

             Summary: Matches results may be wrong when using notNext as the last part of the pattern with Window
                 Key: FLINK-22888
                 URL: https://issues.apache.org/jira/browse/FLINK-22888
             Project: Flink
          Issue Type: Bug
          Components: Library / CEP
    Affects Versions: 1.9.0
            Reporter: Yue Ma


the pattern is like 
Pattern.begin("start").where(records == "a")

            .notNext("notNext").where(records == "b")

            .withIn(5milliseconds).

 

If there is only one event *"a"* in 5 milliseconds. I think this *“a”* should be output as the correct result of the match next time in advanceTime.

But in the actual operation of CEP. This “a” will be treated as matching timeout data
{code:java}
// code placeholder
@Test
public void testNoNextWithWindow() throws Exception {
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

   // (Event, timestamp)
   DataStream<Event> input = env.fromElements(
      Tuple2.of(new Event(1, "start", 1.0), 5L),

      // last element for high final watermark
      Tuple2.of(new Event(5, "final", 5.0), 100L)
   ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {

      @Override
      public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
         return element.f1;
      }

      @Override
      public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
         return new Watermark(lastElement.f1 - 5);
      }

   }).map(new MapFunction<Tuple2<Event, Long>, Event>() {

      @Override
      public Event map(Tuple2<Event, Long> value) throws Exception {
         return value.f0;
      }
   });

   Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
      @Override
      public boolean filter(Event value) throws Exception {
         return value.getName().equals("start");
      }
   }).notNext("middle").where(new SimpleCondition<Event>() {
      @Override
      public boolean filter(Event value) throws Exception {
         return value.getName().equals("middle");
      }
   }).within(Time.milliseconds(5L));

   DataStream<String> result = CEP.pattern(input, pattern).select(
      new PatternSelectFunction<Event, String>() {
         @Override
         public String select(Map<String, List<Event>> pattern) {
            StringBuilder builder = new StringBuilder();
            builder.append(pattern.get("start").get(0).getId());
            return builder.toString();
         }
      }
   );

   List<String> resultList = new ArrayList<>();

   DataStreamUtils.collect(result).forEachRemaining(resultList::add);

   resultList.sort(String::compareTo);

   assertEquals(Arrays.asList("1"), resultList);
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)