Questions of "State Processing API in Scala"

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions of "State Processing API in Scala"

YingZ
Hi community,

When I use state in Scala, something makes confused, I followed these steps to generate and read states:

a. implements the example[1] `CountWindowAverage` in Scala(exactly same), and run jobA => that makes good.

b. execute `flink cancel -s ${JobID}` => savepoints was generated as expected.

c. implements the example[2] `StatefulFunctionWithTime` in Scala(code below), and run jobB => failed, exceptions shows that "Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible."


ReaderFunction code as below:

```

  class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] {

    var countState: ValueState[(Long, Long)] = _

    override def open(parameters: Configuration): Unit = {

      val stateDescriptor = new ValueStateDescriptor("average", createTypeInformation[(Long, Long)])

      countState = getRuntimeContext().getState(stateDescriptor)

    }

    override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = {

      out.collect(countState.value())

    }

  }

```

d. then I try to use java.lang.Long instead of Long in key-type, and run jobB => exception just disappeared and that makes good.

This makes me confused. Did I miss some features in State-Processing-API, such as `magic-implicits`?

And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes again,this time I tried to use Tuple(java.lang.Long) or something else, but does not work.

Hope.

1: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state 

2: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state 
Reply | Threaded
Open this post in threaded view
|

Re: Questions of "State Processing API in Scala"

Tzu-Li (Gordon) Tai
Hi Izual,

Thanks for reporting this! I'm also forwarding this to the user mailing
list, as that is the more suitable place for this question.

I think the usability of the State Processor API in Scala is indeed
something that hasn’t been looked at closely yet.

On Tue, Jan 21, 2020 at 8:12 AM izual <[hidden email]> wrote:

> Hi community,
>
> When I use state in Scala, something makes confused, I followed these
> steps to generate and read states:
>
> a. implements the example[1] `CountWindowAverage` in Scala(exactly same),
> and run jobA => that makes good.
>
> b. execute `flink cancel -s ${JobID}` => savepoints was generated as
> expected.
>
> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code
> below), and run jobB => failed, exceptions shows that "Caused by:
> org.apache.flink.util.StateMigrationException: The new key serializer must
> be compatible."
>
>
> ReaderFunction code as below:
>
> ```
>
>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long,
> Long)] {
>
>     var countState: ValueState[(Long, Long)] = _
>
>     override def open(parameters: Configuration): Unit = {
>
>       val stateDescriptor = new ValueStateDescriptor("average",
> createTypeInformation[(Long, Long)])
>
>       countState = getRuntimeContext().getState(stateDescriptor)
>
>     }
>
>     override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context,
> out: Collector[(Long, Long)]): Unit = {
>
>       out.collect(countState.value())
>
>     }
>
>   }
>
> ```
>
> d. then I try to use java.lang.Long instead of Long in key-type, and run
> jobB => exception just disappeared and that makes good.
>
> This makes me confused. Did I miss some features in State-Processing-API,
> such as `magic-implicits`?
>

This part is explainable. The "magic-implicits" actually happen in the
DataStream Scala API.
Any primitive Scala types will inferred and serialized as their Java
counterparts.
AFAIK, this would not happen in the State Processor API yet and therefore
why you are getting the StateMigrationException.
When using Scala types directly with the State Processor API, I would guess
that Kryo (as a generic fallback) was being used to access state.
This can probably be confirmed by looking at the exception stack trace. Can
you post a full copy of that?

This should be resolvable by properly supporting Scala for the State
Processor API, but it's just that up to this point, we didn't have a plan
for that yet.
Can you open a JIRA for this? I think it'll be a reasonable extension to
the API.


>
> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
> again,this time I tried to use Tuple(java.lang.Long) or something else, but
> does not work.
>

I'm not sure what you mean here. Where is this keyBy happening? In the
Scala DataStream job, or the State Processor API?


>
> Hope.
>
> 1:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>
> 2:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state


Cheers,
Gordon