[jira] [Created] (FLINK-13383) Customize the problem in the trigger

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-13383) Customize the problem in the trigger

Shang Yuanchun (Jira)
jinguishi created FLINK-13383:
---------------------------------

             Summary: Customize the problem in the trigger
                 Key: FLINK-13383
                 URL: https://issues.apache.org/jira/browse/FLINK-13383
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.8.0
         Environment: The development environment is idea, the flink version is 1.8
            Reporter: jinguishi
             Fix For: 1.8.0
         Attachments: WX20190723-174236.png, WechatIMG2.png

Using a Tumbling time window, I created a time-based and counter trigger. The parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(), get negative values,
There are screenshots in the attachment。

code show as below


{code:java}
public class CountTrigger extends Trigger<Object, TimeWindow> {

    private static final long serialVersionUID = 1L;

    private CountTrigger(int count) {
        this.threshold = count;
    }

    private int count = 0;
    private int threshold;

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {

        long watermark = ctx.getCurrentWatermark();
        ctx.registerEventTimeTimer(window.maxTimestamp());
        if (count > threshold) {
            count = 0;
            return TriggerResult.FIRE;
        } else {
            count++;
        }
        System.out.println("onElement: " + element);
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    @Override
    public String toString() {
        return "CountTrigger";
    }

    public static <W extends Window> CountTrigger of(int threshold) {
        return new CountTrigger(threshold);
    }
}

{code}






--
This message was sent by Atlassian JIRA
(v7.6.14#76016)