[jira] [Created] (FLINK-17804) Follow the spec when decoding Parquet logical DECIMAL type

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-17804) Follow the spec when decoding Parquet logical DECIMAL type

Shang Yuanchun (Jira)
Sergii Mikhtoniuk created FLINK-17804:
-----------------------------------------

             Summary: Follow the spec when decoding Parquet logical DECIMAL type
                 Key: FLINK-17804
                 URL: https://issues.apache.org/jira/browse/FLINK-17804
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.10.1
            Reporter: Sergii Mikhtoniuk


When reading a Parquet file (produced by Spark 2.4.0 with default configuration) Flink's {{ParquetRowInputFormat}} fails with {{NumberFormatException}}.

After debugging this it seems that Flink doesn't follow the Parquet spec on [encoding DECIMAL logical type|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal]

The Parquet schema for this field is:
{code}
optional fixed_len_byte_array(9) price_usd (DECIMAL(19,4));
{code}

If I understand the spec correctly, it says that the value should contain a binary representation of an unscaled decimal. Flink's [RowConverter|https://github.com/apache/flink/blob/release-1.10.1/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java#L202] however treats it as a base-10 UTF-8 string.

What Flink essentially is doing:
{code}
val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
val decimal = new java.math.BigDecimal(new String(binary, "UTF-8").toCharArray)
{code}

What I think spec suggests:
{code}
val binary = Array[Byte](0, 0, 0, 0, 0, 11, -97, 118, -64)
val unscaled = new java.math.BigInteger(binary)
val decimal = new java.math.BigDecimal(unscaled)
{code}

Error stacktrace:
 {code}
java.lang.NumberFormatException
        at java.math.BigDecimal.<init>(BigDecimal.java:497)
        at java.math.BigDecimal.<init>(BigDecimal.java:383)
        at java.math.BigDecimal.<init>(BigDecimal.java:680)
        at org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter.addBinary(RowConverter.java:202)
        at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
        at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
        at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:235)
        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
        at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)
{code}

Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)