IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer

Shi Quan
Hi,
Recently we encounter with IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer. Test code description:

  1.  Use heap state backend
  2.  Create MapSateDescription by Class(String.class and HashMap.class)
  3.  Enable state TTL
  4.  Flink version: 1.6.1

Core test code:

MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", String.class, HashMap.class);
mapStateDescriptor.enableTimeToLive(ttlConfig);

IllegalArgumentException was throwed here cause the value of originalSerializers.length is 1:

protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
   PrecomputedParameters precomputed,
   TypeSerializer<?> ... originalSerializers) {
   Preconditions.checkNotNull(originalSerializers);
   Preconditions.checkArgument(originalSerializers.length == 2);
   return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]);
}


We found some clues in flink source code, here what happed step by step:

Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState when open operator , meanwhile, an CompositeSerializer was created.  Cause KryoSerialzer.duplicate create another KryoSerialzer  instance(see source code), CompositeSerializer.precomputed.stateful was set to “true”. Could someone tell me what stateful mean in precomputed?

Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. CompositeSerializer was duplicate in this step. Source code:

@Override
public CompositeSerializer<T> duplicate() {
   return precomputed.stateful ?
      createSerializerInstance(precomputed, duplicateFieldSerializers(fieldSerializers)) : this;
}



        protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
   PrecomputedParameters precomputed,
   TypeSerializer<?> ... originalSerializers) {
   Preconditions.checkNotNull(originalSerializers);
   Preconditions.checkArgument(originalSerializers.length == 2);
   return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]);
}
We notice that the new TtlSerializer only contains fieldSerializer(originalSerializer[1]),  LongSerializer for timestamp missed.

Step3. Duplicate Serializer again whe snapshot, Source Code:

CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {

   super(owningStateTable);
   this.snapshotData = owningStateTable.snapshotTableArrays();
   this.snapshotVersion = owningStateTable.getStateTableVersion();
   this.numberOfEntriesInSnapshotData = owningStateTable.size();

   // We create duplicates of the serializers for the async snapshot, because TypeSerializer
   // might be stateful and shared with the event processing thread.
   this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate();
   this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
   this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate();

   this.partitionedStateTableSnapshot = null;
}
Stack:

  *   new CopyOnWriteStateTableSnapshot
  *   CopyOnWriteStateTable.snapshot()
  *   HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState()
  *   ……
  *   HeapKeyedStateBackend.snapshot()


IllegalArgumentException happens when duplicate the compositeSerializer created in step2, cause only one field serializer inside it.

Did I clarify the problem? Is this a bug?
We compare KryoSerializer with PojoSerializer,PojoSerilizer does not just return a new serializer instance, but return original instance in most case.  Which one has bug, KryoSerilizer or TtlStateFactory::createSerializerInstance?

Best,
Quan Shi

Reply | Threaded
Open this post in threaded view
|

Re: IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer

Congxian Qiu
Hi Quan
Is the problem still there when running on 1.8?  If there is still a problem when using 1.8, could you please share a minimal reproduce demo. Thanks

Best, Congxian
On May 6, 2019, 14:44 +0800, Shi Quan <[hidden email]>, wrote:

> Hi,
> Recently we encounter with IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer. Test code description:
>
> 1. Use heap state backend
> 2. Create MapSateDescription by Class(String.class and HashMap.class)
> 3. Enable state TTL
> 4. Flink version: 1.6.1
>
> Core test code:
>
> MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test", String.class, HashMap.class);
> mapStateDescriptor.enableTimeToLive(ttlConfig);
>
> IllegalArgumentException was throwed here cause the value of originalSerializers.length is 1:
>
> protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> PrecomputedParameters precomputed,
> TypeSerializer<?> ... originalSerializers) {
> Preconditions.checkNotNull(originalSerializers);
> Preconditions.checkArgument(originalSerializers.length == 2);
> return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]);
> }
>
>
> We found some clues in flink source code, here what happed step by step:
>
> Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState when open operator , meanwhile, an CompositeSerializer was created. Cause KryoSerialzer.duplicate create another KryoSerialzer instance(see source code), CompositeSerializer.precomputed.stateful was set to “true”. Could someone tell me what stateful mean in precomputed?
>
> Step 2. Create newMetaInfo by ttlDescriptor when register SateTable. CompositeSerializer was duplicate in this step. Source code:
>
> @Override
> public CompositeSerializer<T> duplicate() {
> return precomputed.stateful ?
> createSerializerInstance(precomputed, duplicateFieldSerializers(fieldSerializers)) : this;
> }
>
>
>
> protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> PrecomputedParameters precomputed,
> TypeSerializer<?> ... originalSerializers) {
> Preconditions.checkNotNull(originalSerializers);
> Preconditions.checkArgument(originalSerializers.length == 2);
> return new TtlSerializer<>(precomputed, (TypeSerializer<T>) originalSerializers[1]);
> }
> We notice that the new TtlSerializer only contains fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed.
>
> Step3. Duplicate Serializer again whe snapshot, Source Code:
>
> CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {
>
> super(owningStateTable);
> this.snapshotData = owningStateTable.snapshotTableArrays();
> this.snapshotVersion = owningStateTable.getStateTableVersion();
> this.numberOfEntriesInSnapshotData = owningStateTable.size();
>
> // We create duplicates of the serializers for the async snapshot, because TypeSerializer
> // might be stateful and shared with the event processing thread.
> this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate();
> this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
> this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate();
>
> this.partitionedStateTableSnapshot = null;
> }
> Stack:
>
> * new CopyOnWriteStateTableSnapshot
> * CopyOnWriteStateTable.snapshot()
> * HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState()
> * ……
> * HeapKeyedStateBackend.snapshot()
>
>
> IllegalArgumentException happens when duplicate the compositeSerializer created in step2, cause only one field serializer inside it.
>
> Did I clarify the problem? Is this a bug?
> We compare KryoSerializer with PojoSerializer,PojoSerilizer does not just return a new serializer instance, but return original instance in most case. Which one has bug, KryoSerilizer or TtlStateFactory::createSerializerInstance?
>
> Best,
> Quan Shi
>
Reply | Threaded
Open this post in threaded view
|

Re: IllegalArgumentException when using state with TTL enable and kryoSerializer as field Serializer

Till Rohrmann
Thanks for reporting this issue Quan. I've pulled in Andrey who developed
this feature and might shed some light on the problem.

Cheers,
Till

On Mon, May 6, 2019 at 11:04 AM Congxian Qiu <[hidden email]> wrote:

> Hi Quan
> Is the problem still there when running on 1.8?  If there is still a
> problem when using 1.8, could you please share a minimal reproduce demo.
> Thanks
>
> Best, Congxian
> On May 6, 2019, 14:44 +0800, Shi Quan <[hidden email]>, wrote:
> > Hi,
> > Recently we encounter with IllegalArgumentException when using state
> with TTL enable and kryoSerializer as field Serializer. Test code
> description:
> >
> > 1. Use heap state backend
> > 2. Create MapSateDescription by Class(String.class and HashMap.class)
> > 3. Enable state TTL
> > 4. Flink version: 1.6.1
> >
> > Core test code:
> >
> > MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("test",
> String.class, HashMap.class);
> > mapStateDescriptor.enableTimeToLive(ttlConfig);
> >
> > IllegalArgumentException was throwed here cause the value of
> originalSerializers.length is 1:
> >
> > protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> > PrecomputedParameters precomputed,
> > TypeSerializer<?> ... originalSerializers) {
> > Preconditions.checkNotNull(originalSerializers);
> > Preconditions.checkArgument(originalSerializers.length == 2);
> > return new TtlSerializer<>(precomputed, (TypeSerializer<T>)
> originalSerializers[1]);
> > }
> >
> >
> > We found some clues in flink source code, here what happed step by step:
> >
> > Step 1. Create ttlDescriptor in method TtlStateFactory:: createMapState
> when open operator , meanwhile, an CompositeSerializer was created. Cause
> KryoSerialzer.duplicate create another KryoSerialzer instance(see source
> code), CompositeSerializer.precomputed.stateful was set to “true”. Could
> someone tell me what stateful mean in precomputed?
> >
> > Step 2. Create newMetaInfo by ttlDescriptor when register SateTable.
> CompositeSerializer was duplicate in this step. Source code:
> >
> > @Override
> > public CompositeSerializer<T> duplicate() {
> > return precomputed.stateful ?
> > createSerializerInstance(precomputed,
> duplicateFieldSerializers(fieldSerializers)) : this;
> > }
> >
> >
> >
> > protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
> > PrecomputedParameters precomputed,
> > TypeSerializer<?> ... originalSerializers) {
> > Preconditions.checkNotNull(originalSerializers);
> > Preconditions.checkArgument(originalSerializers.length == 2);
> > return new TtlSerializer<>(precomputed, (TypeSerializer<T>)
> originalSerializers[1]);
> > }
> > We notice that the new TtlSerializer only contains
> fieldSerializer(originalSerializer[1]), LongSerializer for timestamp missed.
> >
> > Step3. Duplicate Serializer again whe snapshot, Source Code:
> >
> > CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S>
> owningStateTable) {
> >
> > super(owningStateTable);
> > this.snapshotData = owningStateTable.snapshotTableArrays();
> > this.snapshotVersion = owningStateTable.getStateTableVersion();
> > this.numberOfEntriesInSnapshotData = owningStateTable.size();
> >
> > // We create duplicates of the serializers for the async snapshot,
> because TypeSerializer
> > // might be stateful and shared with the event processing thread.
> > this.localKeySerializer =
> owningStateTable.keyContext.getKeySerializer().duplicate();
> > this.localNamespaceSerializer =
> owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
> > this.localStateSerializer =
> owningStateTable.metaInfo.getStateSerializer().duplicate();
> >
> > this.partitionedStateTableSnapshot = null;
> > }
> > Stack:
> >
> > * new CopyOnWriteStateTableSnapshot
> > * CopyOnWriteStateTable.snapshot()
> > *
> HeapKeyedStateBackend$HeapSnapshotStrategy.processSnapshotMetaInfoForAllState()
> > * ……
> > * HeapKeyedStateBackend.snapshot()
> >
> >
> > IllegalArgumentException happens when duplicate the compositeSerializer
> created in step2, cause only one field serializer inside it.
> >
> > Did I clarify the problem? Is this a bug?
> > We compare KryoSerializer with PojoSerializer,PojoSerilizer does not
> just return a new serializer instance, but return original instance in most
> case. Which one has bug, KryoSerilizer or
> TtlStateFactory::createSerializerInstance?
> >
> > Best,
> > Quan Shi
> >
>