jinguishi created FLINK-13383:
---------------------------------
Summary: Customize the problem in the trigger
Key: FLINK-13383
URL:
https://issues.apache.org/jira/browse/FLINK-13383 Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.8.0
Environment: The development environment is idea, the flink version is 1.8
Reporter: jinguishi
Fix For: 1.8.0
Attachments: WX20190723-174236.png, WechatIMG2.png
Using a Tumbling time window, I created a time-based and counter trigger. The parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(), get negative values,
There are screenshots in the attachment。
code show as below
{code:java}
public class CountTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private CountTrigger(int count) {
this.threshold = count;
}
private int count = 0;
private int threshold;
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
long watermark = ctx.getCurrentWatermark();
ctx.registerEventTimeTimer(window.maxTimestamp());
if (count > threshold) {
count = 0;
return TriggerResult.FIRE;
} else {
count++;
}
System.out.println("onElement: " + element);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public String toString() {
return "CountTrigger";
}
public static <W extends Window> CountTrigger of(int threshold) {
return new CountTrigger(threshold);
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)