Hello,
I am using Flink 1.7.2. I wrote a small application which uses KeyedProcessFunction to maintain application state. The state value object is using Guava's (version 18.0.50) ImmutableMap to create a copy of the map instance as part of its constructor. I am using "de.javakaffee:kryo-serializers:0.45" to take handle the guava ImmutableMap serialization and configured the serializer using ` env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class, ImmutableMapSerializer.class)`. I am still seeing an issue with ImmutableMap serialization and I am not sure if it is using the serializer that I have configured? (may be I am not configuring it correctly??). Using standard java Map works properly. Appreciate any inputs? I have provided the error stack trace and the code snippet below. Regards Vijay Error StackTrace ``` Serialization trace: itemMap (com.flink.SensorState) start (com.flink.ValueObject) at com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:133) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:122) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:712) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:114) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:122) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:793) ~[kryo-5.0.0-RC1.jar:na] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) ~[flink-core-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) ~[flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ~[flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) [flink-runtime_2.12-1.7.2.jar:1.7.2] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191] Caused by: java.lang.UnsupportedOperationException: null at org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:326) ~[flink-shaded-guava-18.0-5.0.jar:18.0-5.0] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:230) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:42) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:712) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:114) ~[kryo-5.0.0-RC1.jar:na] ... 22 common frames omitted ``` ``` final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(CHECKPOINT_INTERVAL); env.setParallelism(PARALLELISM); env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class, ImmutableMapSerializer.class); int totalRecords = 1000; env.addSource(new StateDataProducer(totalRecords)) .keyBy(0) .process((KeyedProcessFunction) new VOBookmarker()) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } ``` ``` class SensorState implements Serializable { private final int sensorId; private final Map<String,String> itemMap; public SensorState(int sensorId, Map<String,String> itemMap) { this.sensorId = sensorId; this.itemMap = ImmutableMap.copyOf(itemMap); //this.itemMap = itemMap; } public int getSensorId() { return sensorId; } public Map<String, String> getItemMap() { return Collections.unmodifiableMap(itemMap); } } public class ValueObject implements Serializable { private final int sensorId; private SensorState start; private SensorState end; public ValueObject(int sensorId) { this.sensorId = sensorId; } public int getSensorId() { return sensorId; } public void setStartState(SensorState start) { this.start = start; } public void setEndState(SensorState end) { this.end = end; } } ``` -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Hi @Vijay Srinivasaraghavan, as far as I know, kryo need register all implementation classes instead of abstract class or interface.
You can see the detail in de.javakaffee.kryoserializers.guava.ImmutableMapSerializer.registerSerializers. Maybe you can try do same thing to ExecutionConfig like ImmutableMapSerializer.registerSerializers: private enum DummyEnum { VALUE1, VALUE2 } private static void registerSerializers(ExecutionConfig conf) { Class<ImmutableMapSerializer> serializer = ImmutableMapSerializer.class; conf.registerTypeWithKryoSerializer(ImmutableMap.class, serializer); conf.registerTypeWithKryoSerializer(ImmutableMap.of().getClass(), serializer); Object o1 = new Object(); Object o2 = new Object(); conf.registerTypeWithKryoSerializer(ImmutableMap.of(o1, o1).getClass(), serializer); conf.registerTypeWithKryoSerializer(ImmutableMap.of(o1, o1, o2, o2).getClass(), serializer); Map<DummyEnum, Object> enumMap = new EnumMap<>(DummyEnum.class); for (DummyEnum e : DummyEnum.values()) { enumMap.put(e, o1); } conf.registerTypeWithKryoSerializer(ImmutableMap.copyOf(enumMap).getClass(), serializer); ImmutableTable<Object, Object, Object> denseImmutableTable = ImmutableTable.builder().put("a", 1, 1).put("b", 1, 1).build(); conf.registerTypeWithKryoSerializer(denseImmutableTable.rowMap().getClass(), serializer); conf.registerTypeWithKryoSerializer(((Map)denseImmutableTable.rowMap().get("a")).getClass(), serializer); conf.registerTypeWithKryoSerializer(denseImmutableTable.columnMap().getClass(), serializer); conf.registerTypeWithKryoSerializer(((Map)denseImmutableTable.columnMap().get(1)).getClass(), serializer); } Best, JingsongLee ------------------------------------------------------------------ From:vijikarthi <[hidden email]> Send Time:2019年4月17日(星期三) 04:45 To:dev <[hidden email]> Subject:Using guava ImmutableMap Serializer from magro/kyro-serializers Hello, I am using Flink 1.7.2. I wrote a small application which uses KeyedProcessFunction to maintain application state. The state value object is using Guava's (version 18.0.50) ImmutableMap to create a copy of the map instance as part of its constructor. I am using "de.javakaffee:kryo-serializers:0.45" to take handle the guava ImmutableMap serialization and configured the serializer using ` env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class, ImmutableMapSerializer.class)`. I am still seeing an issue with ImmutableMap serialization and I am not sure if it is using the serializer that I have configured? (may be I am not configuring it correctly??). Using standard java Map works properly. Appreciate any inputs? I have provided the error stack trace and the code snippet below. Regards Vijay Error StackTrace ``` Serialization trace: itemMap (com.flink.SensorState) start (com.flink.ValueObject) at com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:133) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:122) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:712) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:114) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:122) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:793) ~[kryo-5.0.0-RC1.jar:na] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) ~[flink-core-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) ~[flink-runtime_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) ~[flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ~[flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) [flink-streaming-java_2.12-1.7.2.jar:1.7.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) [flink-runtime_2.12-1.7.2.jar:1.7.2] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191] Caused by: java.lang.UnsupportedOperationException: null at org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:326) ~[flink-shaded-guava-18.0-5.0.jar:18.0-5.0] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:230) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:42) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:712) ~[kryo-5.0.0-RC1.jar:na] at com.esotericsoftware.kryo.serializers.ReflectField.read(ReflectField.java:114) ~[kryo-5.0.0-RC1.jar:na] ... 22 common frames omitted ``` ``` final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(CHECKPOINT_INTERVAL); env.setParallelism(PARALLELISM); env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class, ImmutableMapSerializer.class); int totalRecords = 1000; env.addSource(new StateDataProducer(totalRecords)) .keyBy(0) .process((KeyedProcessFunction) new VOBookmarker()) .print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } ``` ``` class SensorState implements Serializable { private final int sensorId; private final Map<String,String> itemMap; public SensorState(int sensorId, Map<String,String> itemMap) { this.sensorId = sensorId; this.itemMap = ImmutableMap.copyOf(itemMap); //this.itemMap = itemMap; } public int getSensorId() { return sensorId; } public Map<String, String> getItemMap() { return Collections.unmodifiableMap(itemMap); } } public class ValueObject implements Serializable { private final int sensorId; private SensorState start; private SensorState end; public ValueObject(int sensorId) { this.sensorId = sensorId; } public int getSensorId() { return sensorId; } public void setStartState(SensorState start) { this.start = start; } public void setEndState(SensorState end) { this.end = end; } } ``` -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Thanks JingsongLee. You are right. I have registered all the implementation
class and it worked. -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Free forum by Nabble | Edit this page |