Using guava ImmutableMap Serializer from magro/kyro-serializers

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Using guava ImmutableMap Serializer from magro/kyro-serializers

vijikarthi
Hello,

I am using Flink 1.7.2. I wrote a small application which uses
KeyedProcessFunction to maintain application state. The state value object
is using Guava's (version 18.0.50) ImmutableMap to create a copy of the map
instance as part of its constructor.

I am using "de.javakaffee:kryo-serializers:0.45" to take handle the guava
ImmutableMap serialization and configured the serializer using `
env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class,
ImmutableMapSerializer.class)`.

I am still seeing an issue with ImmutableMap serialization and I am not sure
if it is using the serializer that I have configured? (may be I am not
configuring it correctly??). Using standard java Map works properly.

Appreciate any inputs? I have provided the error stack trace and the code
snippet below.

Regards
Vijay

Error StackTrace
```
Serialization trace:
itemMap (com.flink.SensorState)
start (com.flink.ValueObject)
        at
com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:133)
~[kryo-5.0.0-RC1.jar:na]
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:122)
~[kryo-5.0.0-RC1.jar:na]
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:712)
~[kryo-5.0.0-RC1.jar:na]
        at
com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:114)
~[kryo-5.0.0-RC1.jar:na]
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:122)
~[kryo-5.0.0-RC1.jar:na]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:793)
~[kryo-5.0.0-RC1.jar:na]
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
~[flink-core-1.7.2.jar:1.7.2]
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
~[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
~[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[flink-runtime_2.12-1.7.2.jar:1.7.2]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.UnsupportedOperationException: null
        at
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:326)
~[flink-shaded-guava-18.0-5.0.jar:18.0-5.0]
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:230)
~[kryo-5.0.0-RC1.jar:na]
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:42)
~[kryo-5.0.0-RC1.jar:na]
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:712)
~[kryo-5.0.0-RC1.jar:na]
        at
com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:114)
~[kryo-5.0.0-RC1.jar:na]
        ... 22 common frames omitted
```

```
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(CHECKPOINT_INTERVAL);
        env.setParallelism(PARALLELISM);
        env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class,
ImmutableMapSerializer.class);

        int totalRecords = 1000;

        env.addSource(new StateDataProducer(totalRecords))
                .keyBy(0)
                .process((KeyedProcessFunction) new VOBookmarker())
                .print();
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
```  

```
class SensorState implements Serializable {

    private final int sensorId;
    private final Map<String,String> itemMap;

    public SensorState(int sensorId, Map<String,String> itemMap) {
        this.sensorId = sensorId;
        this.itemMap = ImmutableMap.copyOf(itemMap);
        //this.itemMap = itemMap;
    }

    public int getSensorId() {
        return sensorId;
    }

    public Map<String, String> getItemMap() {
        return Collections.unmodifiableMap(itemMap);
    }
}

public class ValueObject implements Serializable {

    private final int sensorId;
    private SensorState start;
    private SensorState end;

    public ValueObject(int sensorId) {
        this.sensorId = sensorId;
    }

    public int getSensorId() {
        return sensorId;
    }

    public void setStartState(SensorState start) {
        this.start = start;
    }

    public void setEndState(SensorState end) {
        this.end = end;
    }
}
```



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Using guava ImmutableMap Serializer from magro/kyro-serializers

JingsongLee-2
Hi @Vijay Srinivasaraghavan, as far as I know, kryo need register all implementation classes instead of abstract class or interface.
You can see the detail in de.javakaffee.kryoserializers.guava.ImmutableMapSerializer.registerSerializers.

Maybe you can try do same thing to ExecutionConfig like ImmutableMapSerializer.registerSerializers:
private enum DummyEnum {
   VALUE1,
   VALUE2
}

private static void registerSerializers(ExecutionConfig conf) {
   Class<ImmutableMapSerializer> serializer = ImmutableMapSerializer.class;
   conf.registerTypeWithKryoSerializer(ImmutableMap.class, serializer);
   conf.registerTypeWithKryoSerializer(ImmutableMap.of().getClass(), serializer);
   Object o1 = new Object();
   Object o2 = new Object();
   conf.registerTypeWithKryoSerializer(ImmutableMap.of(o1, o1).getClass(), serializer);
   conf.registerTypeWithKryoSerializer(ImmutableMap.of(o1, o1, o2, o2).getClass(), serializer);
   Map<DummyEnum, Object> enumMap = new EnumMap<>(DummyEnum.class);
   for (DummyEnum e : DummyEnum.values()) {
      enumMap.put(e, o1);
   }
   conf.registerTypeWithKryoSerializer(ImmutableMap.copyOf(enumMap).getClass(), serializer);
   ImmutableTable<Object, Object, Object> denseImmutableTable = ImmutableTable.builder().put("a", 1, 1).put("b", 1, 1).build();
   conf.registerTypeWithKryoSerializer(denseImmutableTable.rowMap().getClass(), serializer);
   conf.registerTypeWithKryoSerializer(((Map)denseImmutableTable.rowMap().get("a")).getClass(), serializer);
   conf.registerTypeWithKryoSerializer(denseImmutableTable.columnMap().getClass(), serializer);
   conf.registerTypeWithKryoSerializer(((Map)denseImmutableTable.columnMap().get(1)).getClass(), serializer);
}

Best, JingsongLee


------------------------------------------------------------------
From:vijikarthi <[hidden email]>
Send Time:2019年4月17日(星期三) 04:45
To:dev <[hidden email]>
Subject:Using guava ImmutableMap Serializer from magro/kyro-serializers

Hello,

I am using Flink 1.7.2. I wrote a small application which uses
KeyedProcessFunction to maintain application state. The state value object
is using Guava's (version 18.0.50) ImmutableMap to create a copy of the map
instance as part of its constructor.

I am using "de.javakaffee:kryo-serializers:0.45" to take handle the guava
ImmutableMap serialization and configured the serializer using `
env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class,
ImmutableMapSerializer.class)`.

I am still seeing an issue with ImmutableMap serialization and I am not sure
if it is using the serializer that I have configured? (may be I am not
configuring it correctly??). Using standard java Map works properly.

Appreciate any inputs? I have provided the error stack trace and the code
snippet below.

Regards
Vijay

Error StackTrace
```
Serialization trace:
itemMap (com.flink.SensorState)
start (com.flink.ValueObject)
 at
com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:133)
~[kryo-5.0.0-RC1.jar:na]
 at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:122)
~[kryo-5.0.0-RC1.jar:na]
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:712)
~[kryo-5.0.0-RC1.jar:na]
 at
com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:114)
~[kryo-5.0.0-RC1.jar:na]
 at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:122)
~[kryo-5.0.0-RC1.jar:na]
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:793)
~[kryo-5.0.0-RC1.jar:na]
 at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
~[flink-core-1.7.2.jar:1.7.2]
 at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
~[flink-runtime_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
~[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
~[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
[flink-streaming-java_2.12-1.7.2.jar:1.7.2]
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[flink-runtime_2.12-1.7.2.jar:1.7.2]
 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.UnsupportedOperationException: null
 at
org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:326)
~[flink-shaded-guava-18.0-5.0.jar:18.0-5.0]
 at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:230)
~[kryo-5.0.0-RC1.jar:na]
 at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:42)
~[kryo-5.0.0-RC1.jar:na]
 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:712)
~[kryo-5.0.0-RC1.jar:na]
 at
com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:114)
~[kryo-5.0.0-RC1.jar:na]
 ... 22 common frames omitted
```

```
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(CHECKPOINT_INTERVAL);
        env.setParallelism(PARALLELISM);
        env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class,
ImmutableMapSerializer.class);

        int totalRecords = 1000;

        env.addSource(new StateDataProducer(totalRecords))
                .keyBy(0)
                .process((KeyedProcessFunction) new VOBookmarker())
                .print();
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
```  

```
class SensorState implements Serializable {

    private final int sensorId;
    private final Map<String,String> itemMap;

    public SensorState(int sensorId, Map<String,String> itemMap) {
        this.sensorId = sensorId;
        this.itemMap = ImmutableMap.copyOf(itemMap);
        //this.itemMap = itemMap;
    }

    public int getSensorId() {
        return sensorId;
    }

    public Map<String, String> getItemMap() {
        return Collections.unmodifiableMap(itemMap);
    }
}

public class ValueObject implements Serializable {

    private final int sensorId;
    private SensorState start;
    private SensorState end;

    public ValueObject(int sensorId) {
        this.sensorId = sensorId;
    }

    public int getSensorId() {
        return sensorId;
    }

    public void setStartState(SensorState start) {
        this.start = start;
    }

    public void setEndState(SensorState end) {
        this.end = end;
    }
}
```



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Using guava ImmutableMap Serializer from magro/kyro-serializers

vijikarthi
Thanks JingsongLee. You are right. I have registered all the implementation
class and it worked.



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/