Question about processing a 3-level List data type in parquet

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Question about processing a 3-level List data type in parquet

Naehee Kim
Hi Flink Dev Community,

I've found RowConverter.java in flink-parquet module doesn't support
reading a 3-level list type in parquet though it is able to process a
2-level list type.

3-level

optional group my_list (LIST) {
  repeated group element {
    required binary str (UTF8);
  };
}


  2-level

optional group my_list (LIST) {
  repeated int32 element;
}

Reference:
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists

The parquet file I am testing with was written by Spark job and it has a
3-level list type. When I try to process the parquet file, it runs into
'java.lang.ClassCastException: Expected instance of group converter but got
"org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"'
error.

I've tested with Flink 1.9 and checked RowConverter.java still remains the
same in v1.11. To process a 3-level list, I think RowConverter.java should
be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level
list is able to be processed with BasicArrayTypeInfo.). I wonder if my
understanding is correct and if you have any plan to support a 3-level List
datatype in parquet.

For your reference, here are code snippet along with stack trace.

MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA);
RowTypeInfo rowTypeInfo = (RowTypeInfo)
ParquetSchemaConverter.fromParquetType(readSchema);
ParquetRowInputFormat parquetInputFormat = new
ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"),
readSchema);
DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat,
rowTypeInfo);

-- stack trace

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException:
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
        at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
        at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
        at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112)
        at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153)
        at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84)
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
Caught exception when processing split: null
        at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
Caused by: java.lang.ClassCastException: Expected instance of group
converter but got
"org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
        at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
        at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
        at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
        at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
        at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
        at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
        at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
        at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
        at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)

Thanks,

Naehee