tinny cat created FLINK-19167:
--------------------------------- Summary: Proccess Function Example could not work Key: FLINK-19167 URL: https://issues.apache.org/jira/browse/FLINK-19167 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.11.1 Reporter: tinny cat Section "*Porccess Function Example*" of [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html] current is: {code:java} // Some comments here @Override public void processElement( Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { // retrieve the current count CountWithTimestamp current = state.value(); if (current == null) { current = new CountWithTimestamp(); current.key = value.f0; } // update the state's count current.count++; // set the state's timestamp to the record's assigned event time timestamp current.lastModified = ctx.timestamp(); // write the state back state.update(current); // schedule the next timer 60 seconds from the current event time ctx.timerService().registerEventTimeTimer(current.lastModified + 60000); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception { // get the state for the key that scheduled the timer CountWithTimestamp result = state.value(); // check if this is an outdated timer or the latest timer // this will be never happened if (timestamp == result.lastModified + 60000) { // emit the state on timeout out.collect(new Tuple2<String, Long>(result.key, result.count)); } } {code} however, it should be: {code:java} @Override public void processElement( Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { // retrieve the current count CountWithTimestamp current = state.value(); if (current == null) { current = new CountWithTimestamp(); current.key = value.f0; } // update the state's count current.count++; // set the state's timestamp to the record's assigned event time timestamp // it should be the previous watermark current.lastModified = ctx.timerService().currentWatermark(); // write the state back state.update(current); // schedule the next timer 60 seconds from the current event time ctx.timerService().registerEventTimeTimer(current.lastModified + 60000); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception { // get the state for the key that scheduled the timer CountWithTimestamp result = state.value(); // check if this is an outdated timer or the latest timer if (timestamp == result.lastModified + 60000) { // emit the state on timeout out.collect(new Tuple2<String, Long>(result.key, result.count)); } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |