[jira] [Created] (FLINK-18741) ProcessWindowFunction's process function exception

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18741) ProcessWindowFunction's process function exception

Shang Yuanchun (Jira)
mzz created FLINK-18741:
---------------------------

             Summary: ProcessWindowFunction's  process function exception
                 Key: FLINK-18741
                 URL: https://issues.apache.org/jira/browse/FLINK-18741
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.10.0
            Reporter: mzz


I use ProcessWindowFunction to achieve PV calculation, but when rewriting process, the user-defined state value cannot be returned。
code:

{code:java}
tem.keyBy(x =>
      (x._1, x._2, x._4, x._5, x._6, x._7, x._8))
      .timeWindow(Time.seconds(15 * 60)) //15 min window
      .process(new ProcessWindowFunction[(String, String, String, String, String, String, String, String, String), CkResult, (String, String, String, String, String, String, String), TimeWindow] {
      var clickCount: ValueState[Long] = _
*      var requestCount: ValueState[Long] = _
*      var returnCount: ValueState[Long] = _
      var videoCount: ValueState[Long] = _
      var noVideoCount: ValueState[Long] = _

      override def open(parameters: Configuration): Unit = {
        clickCount = getRuntimeContext.getState(new ValueStateDescriptor("clickCount", classOf[Long]))
       * requestCount = getRuntimeContext.getState(new ValueStateDescriptor("requestCount", classOf[Long]))*
        returnCount = getRuntimeContext.getState(new ValueStateDescriptor("returnCount", classOf[Long]))
        videoCount = getRuntimeContext.getState(new ValueStateDescriptor("videoCount", classOf[Long]))
        noVideoCount = getRuntimeContext.getState(new ValueStateDescriptor("noVideoCount", classOf[Long]))
      }

      override def process(key: (String, String, String, String, String, String, String), context: Context, elements: Iterable[(String, String, String, String, String, String, String, String, String)], out: Collector[CkResult]) = {
        try {
          var clickNum: Long = clickCount.value
          val dateNow = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")).toLong
          var requestNum: Long = requestCount.value
          var returnNum: Long = returnCount.value
          var videoNum: Long = videoCount.value
          var noVideoNum: Long = noVideoCount.value
          if (requestNum == null) {
            requestNum = 0
          }
         
          val ecpm = key._7.toDouble.formatted("%.2f").toFloat
          val created_at = getSecondTimestampTwo(new Date)
         
*          elements.foreach(e => {
            if ("adreq".equals(e._3)) {
              requestNum += 1
              println(key._1, requestNum)
            }
          })
          requestCount.update(requestNum)
          println(requestNum, key._1)*

 

          out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, key._6, key._1, requestCount.value, returnCount.value, fill_rate, noVideoCount.value + videoCount.value,
            expose_rate, clickCount.value, click_rate, ecpm, (noVideoCount.value * ecpm + videoCount.value * ecpm / 1000.toFloat).formatted("%.2f").toFloat, created_at))
        }
        catch {
          case e: Exception => println(key, e)
        }
      }
    })
{code}


{code:java}

          elements.foreach(e => {
            if ("adreq".equals(e._3)) {
              requestNum += 1
              println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
            }
          })
//But print outside the for loop always like :
//(key,0)
          println(requestNum, key._1)
{code}


who can help me ,plz thx。









--
This message was sent by Atlassian Jira
(v8.3.4#803005)