Maciej Bryński created FLINK-11030:
-------------------------------------- Summary: Cannot use Avro logical types with ConfluentRegistryAvroDeserializationSchema Key: FLINK-11030 URL: https://issues.apache.org/jira/browse/FLINK-11030 Project: Flink Issue Type: Bug Affects Versions: 1.6.2 Reporter: Maciej Bryński I created Specific class for Kafka topic. Avro schema includes logicalTypes. Then I want to read data using following code: {code:scala} val deserializationSchema = ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[mySpecificClass], schemaRegistryUrl) val kafkaStream = env.addSource( new FlinkKafkaConsumer011(topic, deserializationSchema, kafkaProperties) ) kafkaStream.print() {code} Result: {code} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645) at TransactionEnrichment$.main(TransactionEnrichment.scala:50) at TransactionEnrichment.main(TransactionEnrichment.scala) Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime at platform_tbl_game_transactions_v1.Value.put(Value.java:222) at org.apache.avro.generic.GenericData.setField(GenericData.java:690) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) {code} When using Kafka Consumer there was a hack for this to use LogicalConverters. Unfortunately it's not working in flink. {code} SpecificData.get.addLogicalTypeConversion(new TimeConversions.TimestampConversion) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |