Fwd: DataStream EventTime last data cannot be output?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Fwd: DataStream EventTime last data cannot be output?

刘 文


> 下面是被转发的邮件:
>
> 发件人: 刘 文 <[hidden email]>
> 主题: DataStream EventTime last data cannot be output?
> 日期: 2019年3月6日 GMT+8 下午10:51:14
> 收件人: [hidden email]
>
> DataStream EventTime last data cannot be output ?
>
>
> In the verification of EventTime plus watermark processing, I found that the data sent to the socket cannot be output in time or output.
> ). The verification found that only the timestamp of the current send data of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end of the last window
> ). But the latest record can not be processed in time, or can not be processed
> ). How can I deal with this problem?
>
>
>
> The following is the Flink program ,Flink 1.7.2
> ---------------------------------------------------------------------------
>
>
>
> package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
>
> import java.util.{Date, Properties}
>
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
>
>
> object SockWordCountRun {
>
>
>
>   def main(args: Array[String]): Unit = {
>
>
>     // get the execution environment
>    // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
>
>
>     val configuration : Configuration = ConfigurationUtil.getConfiguration(true)
>
>     val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
>
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>
>
>     import org.apache.flink.streaming.api.scala._
>     val dataStream = env.socketTextStream("localhost", 1234, '\n')
>
>      // .setParallelism(3)
>
>
>     dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
>
>         val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
>         var currentMaxTimestamp: Long = _
>         var currentTimestamp: Long = _
>
>         override def getCurrentWatermark: Watermark =  new Watermark(currentMaxTimestamp - maxOutOfOrderness)
>
>         override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
>           val jsonObject = JSON.parseObject(element)
>
>           val timestamp = jsonObject.getLongValue("extract_data_time")
>           currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>           currentTimestamp = timestamp
>
>         /*  println("===========watermark begin===========")
>           println()
>           println(new Date(currentMaxTimestamp - 20 * 1000))
>           println(jsonObject)
>           println("===========watermark end===========")
>           println()*/
>           timestamp
>         }
>
>       })
>       .timeWindowAll(Time.seconds(3))
>
>       .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>       override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
>
>
>         println()
>         println("开始提交window")
>         println(new Date())
>         for(e <- elements) out.collect(e)
>         println("结束提交window")
>         println(new Date())
>         println()
>       }
>     })
>
>       .print()
>       //.setParallelism(3)
>
>
>
>
>
>     println("==================================以下为执行计划==================================")
>     println("执行地址(firefox效果更好):https://flink.apache.org/visualizer <https://flink.apache.org/visualizer>")
>     //执行计划
>     println(env.getStreamGraph.getStreamingPlanAsJSON)
>     println("==================================以上为执行计划 JSON串==================================\n")
>
>
>     env.execute("Socket 水印作业")
>
>
>
>
>
>
>     println("结束")
>
>   }
>
>
>   // Data type for words with count
>   case class WordWithCount(word: String, count: Long){
>     //override def toString: String = Thread.currentThread().getName + word + " : " + count
>   }
>
>
>   def getConfiguration(isDebug:Boolean = false):Configuration = {
>
>     val configuration : Configuration = new Configuration()
>
>     if(isDebug){
>       val timeout = "100000 s"
>       val timeoutHeartbeatPause = "1000000 s"
>       configuration.setString("akka.ask.timeout",timeout)
>       configuration.setString("akka.lookup.timeout",timeout)
>       configuration.setString("akka.tcp.timeout",timeout)
>       configuration.setString("akka.transport.heartbeat.interval",timeout)
>       configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>       configuration.setString("akka.watch.heartbeat.pause",timeout)
>       configuration.setInteger("heartbeat.interval",10000000)
>       configuration.setInteger("heartbeat.timeout",50000000)
>     }
>
>
>     configuration
>   }
>
>
> }
>
>
>
>
>
> best   thinktothings
>