> 下面是被转发的邮件: > > 发件人: 刘 文 <[hidden email]> > 主题: DataStream EventTime last data cannot be output? > 日期: 2019年3月6日 GMT+8 下午10:51:14 > 收件人: [hidden email] > > DataStream EventTime last data cannot be output ? > > > In the verification of EventTime plus watermark processing, I found that the data sent to the socket cannot be output in time or output. > ). The verification found that only the timestamp of the current send data of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end of the last window > ). But the latest record can not be processed in time, or can not be processed > ). How can I deal with this problem? > > > > The following is the Flink program ,Flink 1.7.2 > --------------------------------------------------------------------------- > > > > package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime > > import java.util.{Date, Properties} > > import com.alibaba.fastjson.JSON > import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil > import org.apache.flink.api.common.serialization.SimpleStringSchema > import org.apache.flink.configuration.Configuration > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction > import org.apache.flink.streaming.api.watermark.Watermark > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.windows.TimeWindow > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer > import org.apache.flink.util.Collector > > > object SockWordCountRun { > > > > def main(args: Array[String]): Unit = { > > > // get the execution environment > // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment > > > val configuration : Configuration = ConfigurationUtil.getConfiguration(true) > > val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration) > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > > > import org.apache.flink.streaming.api.scala._ > val dataStream = env.socketTextStream("localhost", 1234, '\n') > > // .setParallelism(3) > > > dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] { > > val maxOutOfOrderness = 2 * 1000L // 3.5 seconds > var currentMaxTimestamp: Long = _ > var currentTimestamp: Long = _ > > override def getCurrentWatermark: Watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness) > > override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = { > val jsonObject = JSON.parseObject(element) > > val timestamp = jsonObject.getLongValue("extract_data_time") > currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp) > currentTimestamp = timestamp > > /* println("===========watermark begin===========") > println() > println(new Date(currentMaxTimestamp - 20 * 1000)) > println(jsonObject) > println("===========watermark end===========") > println()*/ > timestamp > } > > }) > .timeWindowAll(Time.seconds(3)) > > .process(new ProcessAllWindowFunction[String,String,TimeWindow]() { > override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { > > > println() > println("开始提交window") > println(new Date()) > for(e <- elements) out.collect(e) > println("结束提交window") > println(new Date()) > println() > } > }) > > .print() > //.setParallelism(3) > > > > > > println("==================================以下为执行计划==================================") > println("执行地址(firefox效果更好):https://flink.apache.org/visualizer <https://flink.apache.org/visualizer>") > //执行计划 > println(env.getStreamGraph.getStreamingPlanAsJSON) > println("==================================以上为执行计划 JSON串==================================\n") > > > env.execute("Socket 水印作业") > > > > > > > println("结束") > > } > > > // Data type for words with count > case class WordWithCount(word: String, count: Long){ > //override def toString: String = Thread.currentThread().getName + word + " : " + count > } > > > def getConfiguration(isDebug:Boolean = false):Configuration = { > > val configuration : Configuration = new Configuration() > > if(isDebug){ > val timeout = "100000 s" > val timeoutHeartbeatPause = "1000000 s" > configuration.setString("akka.ask.timeout",timeout) > configuration.setString("akka.lookup.timeout",timeout) > configuration.setString("akka.tcp.timeout",timeout) > configuration.setString("akka.transport.heartbeat.interval",timeout) > configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) > configuration.setString("akka.watch.heartbeat.pause",timeout) > configuration.setInteger("heartbeat.interval",10000000) > configuration.setInteger("heartbeat.timeout",50000000) > } > > > configuration > } > > > } > > > > > > best thinktothings > |
Free forum by Nabble | Edit this page |