Hi Flink Dev Community,
I've found RowConverter.java in flink-parquet module doesn't support reading a 3-level list type in parquet though it is able to process a 2-level list type. 3-level optional group my_list (LIST) { repeated group element { required binary str (UTF8); }; } 2-level optional group my_list (LIST) { repeated int32 element; } Reference: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists The parquet file I am testing with was written by Spark job and it has a 3-level list type. When I try to process the parquet file, it runs into 'java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"' error. I've tested with Flink 1.9 and checked RowConverter.java still remains the same in v1.11. To process a 3-level list, I think RowConverter.java should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if my understanding is correct and if you have any plan to support a 3-level List datatype in parquet. For your reference, here are code snippet along with stack trace. MessageType readSchema = (new AvroSchemaConverter()).convert(REPORTING_SCHEMA); RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(readSchema); ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new Path("file:///test-file.snappy.parquet"), readSchema); DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, rowTypeInfo); -- stack trace Job execution failed. org.apache.flink.runtime.client.JobExecutionException: at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626) at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489) at com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112) at com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153) at com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84) Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception when processing split: null at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357) Caused by: java.lang.ClassCastException: Expected instance of group converter but got "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter" at org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34) at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227) at org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207) at org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233) at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333) Thanks, Naehee |
Free forum by Nabble | Edit this page |