Nico Kruber created FLINK-12577:
----------------------------------- Summary: Flink uses Kryo serializer if given a SpecificRecord type Key: FLINK-12577 URL: https://issues.apache.org/jira/browse/FLINK-12577 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.8.0, 1.7.2, 1.9.0 Reporter: Nico Kruber If a data stream does not return a *subclass* of {{SpecificRecord}}, Flink will actually infer a {{GenericTypeInfo}} and use kryo instead. Example: {code} StreamExecutionEnvironment env = ... env.getConfig().disableGenericTypes(); DataStream<Measurement> sourceStream = env.addSource(...); DataStream<SpecificRecord> test = sourceStream.map(value -> new AggregatedSensorStatistics()); public class AggregatedSensorStatistics extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {...} {code} This will lead to {code} Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type org.apache.avro.specific.SpecificRecord is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:208) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:541) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1537) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:89) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) at com.ververica.training.statemigration.StateMigrationJobBase.createAndExecuteJob(StateMigrationJobBase.java:68) at com.ververica.training.statemigration.avro.StateMigrationJob.main(StateMigrationJob.java:13) {code} You may want some flexibility in your types and thus not provide the exact one like {{AggregatedSensorStatistics}} in this example. I don't see any reason we should disallow that behaviour. Reason for this is that {{TypeExtractor#privateGetForClass()}} is having this code which only sees classes as Avro if they *extend* from {{SpecificRecord}}: {code} if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) { return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz); } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |