Hi,
Recently we encounter with IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer. Test code description: 1. Use heap state backend 2. Create MapSateDescription by Class(String.class and HashMap.class) 3. Enable state TTL 4. Flink version: 1.6.1 Core test code: MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", String.class, HashMap.class); mapStateDescriptor.enableTimeToLive(ttlConfig); IllegalArgumentException was throwed here cause the value of originalSerializers.length is 1: protected CompositeSerializer<TtlValue<T>> createSerializerInstance( PrecomputedParameters precomputed, TypeSerializer<?> ... originalSerializers) { Preconditions.checkNotNull(originalSerializers); Preconditions.checkArgument(originalSerializers.length == 2); return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]); } We found some clues in flink source code, here what happed step by step: Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState when open operator , meanwhile, an CompositeSerializer was created. Cause KryoSerialzer.duplicate create another KryoSerialzer instance(see source code), CompositeSerializer.precomputed.stateful was set to “true”. Could someone tell me what stateful mean in precomputed? Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. CompositeSerializer was duplicate in this step. Source code: @Override public CompositeSerializer<T> duplicate() { return precomputed.stateful ? createSerializerInstance(precomputed, duplicateFieldSerializers(fieldSerializers)) : this; } protected CompositeSerializer<TtlValue<T>> createSerializerInstance( PrecomputedParameters precomputed, TypeSerializer<?> ... originalSerializers) { Preconditions.checkNotNull(originalSerializers); Preconditions.checkArgument(originalSerializers.length == 2); return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]); } We notice that the new TtlSerializer only contains fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed. Step3. Duplicate Serializer again whe snapshot, Source Code: CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) { super(owningStateTable); this.snapshotData = owningStateTable.snapshotTableArrays(); this.snapshotVersion = owningStateTable.getStateTableVersion(); this.numberOfEntriesInSnapshotData = owningStateTable.size(); // We create duplicates of the serializers for the async snapshot, because TypeSerializer // might be stateful and shared with the event processing thread. this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate(); this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate(); this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate(); this.partitionedStateTableSnapshot = null; } Stack: * new CopyOnWriteStateTableSnapshot * CopyOnWriteStateTable.snapshot() * HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState() * …… * HeapKeyedStateBackend.snapshot() IllegalArgumentException happens when duplicate the compositeSerializer created in step2, cause only one field serializer inside it. Did I clarify the problem? Is this a bug? We compare KryoSerializer with PojoSerializer,PojoSerilizer does not just return a new serializer instance, but return original instance in most case. Which one has bug, KryoSerilizer or TtlStateFactory::createSerializerInstance? Best, Quan Shi |
Hi Quan
Is the problem still there when running on 1.8? If there is still a problem when using 1.8, could you please share a minimal reproduce demo. Thanks Best, Congxian On May 6, 2019, 14:44 +0800, Shi Quan <[hidden email]>, wrote: > Hi, > Recently we encounter with IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer. Test code description: > > 1. Use heap state backend > 2. Create MapSateDescription by Class(String.class and HashMap.class) > 3. Enable state TTL > 4. Flink version: 1.6.1 > > Core test code: > > MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", String.class, HashMap.class); > mapStateDescriptor.enableTimeToLive(ttlConfig); > > IllegalArgumentException was throwed here cause the value of originalSerializers.length is 1: > > protected CompositeSerializer<TtlValue<T>> createSerializerInstance( > PrecomputedParameters precomputed, > TypeSerializer<?> ... originalSerializers) { > Preconditions.checkNotNull(originalSerializers); > Preconditions.checkArgument(originalSerializers.length == 2); > return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]); > } > > > We found some clues in flink source code, here what happed step by step: > > Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState when open operator , meanwhile, an CompositeSerializer was created. Cause KryoSerialzer.duplicate create another KryoSerialzer instance(see source code), CompositeSerializer.precomputed.stateful was set to “true”. Could someone tell me what stateful mean in precomputed? > > Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. CompositeSerializer was duplicate in this step. Source code: > > @Override > public CompositeSerializer<T> duplicate() { > return precomputed.stateful ? > createSerializerInstance(precomputed, duplicateFieldSerializers(fieldSerializers)) : this; > } > > > > protected CompositeSerializer<TtlValue<T>> createSerializerInstance( > PrecomputedParameters precomputed, > TypeSerializer<?> ... originalSerializers) { > Preconditions.checkNotNull(originalSerializers); > Preconditions.checkArgument(originalSerializers.length == 2); > return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]); > } > We notice that the new TtlSerializer only contains fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed. > > Step3. Duplicate Serializer again whe snapshot, Source Code: > > CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) { > > super(owningStateTable); > this.snapshotData = owningStateTable.snapshotTableArrays(); > this.snapshotVersion = owningStateTable.getStateTableVersion(); > this.numberOfEntriesInSnapshotData = owningStateTable.size(); > > // We create duplicates of the serializers for the async snapshot, because TypeSerializer > // might be stateful and shared with the event processing thread. > this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate(); > this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate(); > this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate(); > > this.partitionedStateTableSnapshot = null; > } > Stack: > > * new CopyOnWriteStateTableSnapshot > * CopyOnWriteStateTable.snapshot() > * HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState() > * …… > * HeapKeyedStateBackend.snapshot() > > > IllegalArgumentException happens when duplicate the compositeSerializer created in step2, cause only one field serializer inside it. > > Did I clarify the problem? Is this a bug? > We compare KryoSerializer with PojoSerializer,PojoSerilizer does not just return a new serializer instance, but return original instance in most case. Which one has bug, KryoSerilizer or TtlStateFactory::createSerializerInstance? > > Best, > Quan Shi > |
Thanks for reporting this issue Quan. I've pulled in Andrey who developed
this feature and might shed some light on the problem. Cheers, Till On Mon, May 6, 2019 at 11:04 AM Congxian Qiu <[hidden email]> wrote: > Hi Quan > Is the problem still there when running on 1.8? If there is still a > problem when using 1.8, could you please share a minimal reproduce demo. > Thanks > > Best, Congxian > On May 6, 2019, 14:44 +0800, Shi Quan <[hidden email]>, wrote: > > Hi, > > Recently we encounter with IllegalArgumentException when using state > with TTL enable and kryoSerializer as field Serializer. Test code > description: > > > > 1. Use heap state backend > > 2. Create MapSateDescription by Class(String.class and HashMap.class) > > 3. Enable state TTL > > 4. Flink version: 1.6.1 > > > > Core test code: > > > > MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", > String.class, HashMap.class); > > mapStateDescriptor.enableTimeToLive(ttlConfig); > > > > IllegalArgumentException was throwed here cause the value of > originalSerializers.length is 1: > > > > protected CompositeSerializer<TtlValue<T>> createSerializerInstance( > > PrecomputedParameters precomputed, > > TypeSerializer<?> ... originalSerializers) { > > Preconditions.checkNotNull(originalSerializers); > > Preconditions.checkArgument(originalSerializers.length == 2); > > return new TtlSerializer<>(precomputed, (TypeSerializer<T>) > originalSerializers[1]); > > } > > > > > > We found some clues in flink source code, here what happed step by step: > > > > Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState > when open operator , meanwhile, an CompositeSerializer was created. Cause > KryoSerialzer.duplicate create another KryoSerialzer instance(see source > code), CompositeSerializer.precomputed.stateful was set to “true”. Could > someone tell me what stateful mean in precomputed? > > > > Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. > CompositeSerializer was duplicate in this step. Source code: > > > > @Override > > public CompositeSerializer<T> duplicate() { > > return precomputed.stateful ? > > createSerializerInstance(precomputed, > duplicateFieldSerializers(fieldSerializers)) : this; > > } > > > > > > > > protected CompositeSerializer<TtlValue<T>> createSerializerInstance( > > PrecomputedParameters precomputed, > > TypeSerializer<?> ... originalSerializers) { > > Preconditions.checkNotNull(originalSerializers); > > Preconditions.checkArgument(originalSerializers.length == 2); > > return new TtlSerializer<>(precomputed, (TypeSerializer<T>) > originalSerializers[1]); > > } > > We notice that the new TtlSerializer only contains > fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed. > > > > Step3. Duplicate Serializer again whe snapshot, Source Code: > > > > CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> > owningStateTable) { > > > > super(owningStateTable); > > this.snapshotData = owningStateTable.snapshotTableArrays(); > > this.snapshotVersion = owningStateTable.getStateTableVersion(); > > this.numberOfEntriesInSnapshotData = owningStateTable.size(); > > > > // We create duplicates of the serializers for the async snapshot, > because TypeSerializer > > // might be stateful and shared with the event processing thread. > > this.localKeySerializer = > owningStateTable.keyContext.getKeySerializer().duplicate(); > > this.localNamespaceSerializer = > owningStateTable.metaInfo.getNamespaceSerializer().duplicate(); > > this.localStateSerializer = > owningStateTable.metaInfo.getStateSerializer().duplicate(); > > > > this.partitionedStateTableSnapshot = null; > > } > > Stack: > > > > * new CopyOnWriteStateTableSnapshot > > * CopyOnWriteStateTable.snapshot() > > * > HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState() > > * …… > > * HeapKeyedStateBackend.snapshot() > > > > > > IllegalArgumentException happens when duplicate the compositeSerializer > created in step2, cause only one field serializer inside it. > > > > Did I clarify the problem? Is this a bug? > > We compare KryoSerializer with PojoSerializer,PojoSerilizer does not > just return a new serializer instance, but return original instance in most > case. Which one has bug, KryoSerilizer or > TtlStateFactory::createSerializerInstance? > > > > Best, > > Quan Shi > > > |
Free forum by Nabble | Edit this page |