mzz created FLINK-18741:
--------------------------- Summary: ProcessWindowFunction's process function exception Key: FLINK-18741 URL: https://issues.apache.org/jira/browse/FLINK-18741 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.10.0 Reporter: mzz I use ProcessWindowFunction to achieve PV calculation, but when rewriting process, the user-defined state value cannot be returned。 code: {code:java} tem.keyBy(x => (x._1, x._2, x._4, x._5, x._6, x._7, x._8)) .timeWindow(Time.seconds(15 * 60)) //15 min window .process(new ProcessWindowFunction[(String, String, String, String, String, String, String, String, String), CkResult, (String, String, String, String, String, String, String), TimeWindow] { var clickCount: ValueState[Long] = _ * var requestCount: ValueState[Long] = _ * var returnCount: ValueState[Long] = _ var videoCount: ValueState[Long] = _ var noVideoCount: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { clickCount = getRuntimeContext.getState(new ValueStateDescriptor("clickCount", classOf[Long])) * requestCount = getRuntimeContext.getState(new ValueStateDescriptor("requestCount", classOf[Long]))* returnCount = getRuntimeContext.getState(new ValueStateDescriptor("returnCount", classOf[Long])) videoCount = getRuntimeContext.getState(new ValueStateDescriptor("videoCount", classOf[Long])) noVideoCount = getRuntimeContext.getState(new ValueStateDescriptor("noVideoCount", classOf[Long])) } override def process(key: (String, String, String, String, String, String, String), context: Context, elements: Iterable[(String, String, String, String, String, String, String, String, String)], out: Collector[CkResult]) = { try { var clickNum: Long = clickCount.value val dateNow = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")).toLong var requestNum: Long = requestCount.value var returnNum: Long = returnCount.value var videoNum: Long = videoCount.value var noVideoNum: Long = noVideoCount.value if (requestNum == null) { requestNum = 0 } val ecpm = key._7.toDouble.formatted("%.2f").toFloat val created_at = getSecondTimestampTwo(new Date) * elements.foreach(e => { if ("adreq".equals(e._3)) { requestNum += 1 println(key._1, requestNum) } }) requestCount.update(requestNum) println(requestNum, key._1)* out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, key._6, key._1, requestCount.value, returnCount.value, fill_rate, noVideoCount.value + videoCount.value, expose_rate, clickCount.value, click_rate, ecpm, (noVideoCount.value * ecpm + videoCount.value * ecpm / 1000.toFloat).formatted("%.2f").toFloat, created_at)) } catch { case e: Exception => println(key, e) } } }) {code} {code:java} elements.foreach(e => { if ("adreq".equals(e._3)) { requestNum += 1 println(key._1, requestNum) // The values printed here like : //(key,1) //(key,2) //(key,3) } }) //But print outside the for loop always like : //(key,0) println(requestNum, key._1) {code} who can help me ,plz thx。 -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |