[jira] [Created] (FLINK-15719) Exceptions when using scala types directly with the State Process API

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15719) Exceptions when using scala types directly with the State Process API

Shang Yuanchun (Jira)
Ying Z created FLINK-15719:
------------------------------

             Summary: Exceptions when using scala types directly with the State Process API
                 Key: FLINK-15719
                 URL: https://issues.apache.org/jira/browse/FLINK-15719
             Project: Flink
          Issue Type: Bug
          Components: API / State Processor
    Affects Versions: 1.9.1
            Reporter: Ying Z


I followed these steps to generate and read states:
 # implements the example[1] `CountWindowAverage` in Scala(exactly same), and run jobA => that makes good.
 # execute `flink cancel -s ${JobID}` => savepoints was generated as expected.
 # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), and run jobB => failed, exceptions shows that "Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible."

ReaderFunction code as below:
{code:java}
// code placeholder
  class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {
    var countState: ValueState[(Long, Long)] = _
    override def open(parameters: Configuration): Unit = {
      val stateDescriptor = new ValueStateDescriptor("average", createTypeInformation[(Long, Long)])
      countState = getRuntimeContext().getState(stateDescriptor)
    }    override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = {
      out.collect(countState.value())
    }
  }
{code}
1: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state

2: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state



--
This message was sent by Atlassian Jira
(v8.3.4#803005)