[jira] [Created] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-17486) ClassCastException when copying AVRO SpecificRecord containing a decimal field

Shang Yuanchun (Jira)
Lorenzo Nicora created FLINK-17486:
--------------------------------------

             Summary: ClassCastException when copying AVRO SpecificRecord containing a decimal field
                 Key: FLINK-17486
                 URL: https://issues.apache.org/jira/browse/FLINK-17486
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.10.0
         Environment: Flink 1.10.0

AVRO 1.9.2

Java 1.8.0 (but also Java 14)

Scala binary 2.11
            Reporter: Lorenzo Nicora


When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} (logical type) field, copying the record fails with:

{{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.nio.ByteBuffer}}

 

This code reproduces the problem:

{{AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class);}}

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}

{{Sample s2 = serializer.copy(s1);}}

 

The AVRO SpecificRecord is generated using avro-maven-plugin from this IDL:

{{@namespace("example.avro")}}
{{protocol SampleProtocol {}}
{{  record Sample{}}
{{    string id;}}
{{    decimal(9,2) price;}}
{{    timestamp_ms eventTime;}}
{{   }}}
{{}}}

 

The deepCopy of the record happens behind the scenes when attaching an AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord and using Confluent Schema Registry. The assigned extracts the event time from the record and enabling bookmarking (not sure whether this is related)

A simplified version of the application is [here|[https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].

 

The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 1.8.2 (I'm using AVRO 1.9.2)

In fact, the following code doing deepCopy and only relying on AVRO does work: 

 

{{Sample s1 = Sample.newBuilder()}}
 {{  .setPrice(BigDecimal.valueOf(42.32))}}
 {{  .setId("A12345")}}
 {{  .build();}}
 {{Sample s2 = Sample.newBuilder(s1).build();}}

 

A simplified version of the Flink application causing the problem is [here|[https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]].

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)