StreamingFile limitations
Hi community, I'm working toward the porting of our code from `BucketingSink<>` to `StreamingFileSink`. In this case we use the sink to write AVRO via Parquet and the suggested implementation of the Sink should be something like: ``` val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass) StreamingFileSink.forBulkFormat(basePath, parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner) ``` In this design the BucketAssigner is concatenated after the bulkFormat step. The problem that I'm having with this design is that I have an object that contains information that should be used to construct the path and a sub-object that contains the data to serialize. A simple example myClass |- country |- cityClass extends SpecificRecordBase) Let's say I receive myClass as a stream and I want to serialize the cityClass data via the logic above. The problem is that the `forBulkFormat(..)` needs to run on a subType of `SpecificRecordBase`, so myClass doesn't work. If I extract cityClass from myClass then I will not have country available in the `withBucketAssigner(..)` to be able to store the data in the right folder... Am I missing something or I do have to write my own version of the `ParquetBulkWriter<T>` class so to be able to handle `myClass`? Thanks for any idea and suggestion. Enrico |
Hi Enrico,
Sorry for the late reply. I think your understanding is correct. The best way to do it is to write your own ParquetBulkWriter and the corresponding factory. Out of curiosity, I guess that in the BucketingSink you were using the AvroKeyValueSinkWriter, right? Cheers, Kostas On Fri, Aug 30, 2019 at 10:23 AM Enrico Agnoli <[hidden email]> wrote: > > StreamingFile limitations > > Hi community, > > I'm working toward the porting of our code from `BucketingSink<>` to `StreamingFileSink`. > In this case we use the sink to write AVRO via Parquet and the suggested implementation of the Sink should be something like: > > ``` > val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass) > StreamingFileSink.forBulkFormat(basePath, parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner) > ``` > > In this design the BucketAssigner is concatenated after the bulkFormat step. The problem that I'm having with this design is that I have an object that contains information that should be used to construct the path and a sub-object that contains the data to serialize. A simple example > > myClass > |- country > |- cityClass extends SpecificRecordBase) > > Let's say I receive myClass as a stream and I want to serialize the cityClass data via the logic above. The problem is that the `forBulkFormat(..)` needs to run on a subType of `SpecificRecordBase`, so myClass doesn't work. > If I extract cityClass from myClass then I will not have country available in the `withBucketAssigner(..)` to be able to store the data in the right folder... > > > Am I missing something or I do have to write my own version of the `ParquetBulkWriter<T>` class so to be able to handle `myClass`? > > Thanks for any idea and suggestion. > Enrico |
Thanks for confirming.
We have a ``` public class ParquetSinkWriter implements Writer<myClass> ``` that handles the serialization of the data. We implemented it starting from: https://medium.com/hadoop-noob/flink-parquet-writer-d127f745b519 https://stackoverflow.com/questions/48098011/how-to-use-apache-flink-write-parquet-file-on-hdfs-by-datetime-partition On 2019/09/09 09:31:03, Kostas Kloudas <[hidden email]> wrote: > Hi Enrico, > > Sorry for the late reply. I think your understanding is correct. > The best way to do it is to write your own ParquetBulkWriter and the > corresponding factory. > > Out of curiosity, I guess that in the BucketingSink you were using the > AvroKeyValueSinkWriter, right? > > Cheers, > Kostas > > On Fri, Aug 30, 2019 at 10:23 AM Enrico Agnoli > <[hidden email]> wrote: > > > > StreamingFile limitations > > > > Hi community, > > > > I'm working toward the porting of our code from `BucketingSink<>` to `StreamingFileSink`. > > In this case we use the sink to write AVRO via Parquet and the suggested implementation of the Sink should be something like: > > > > ``` > > val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass) > > StreamingFileSink.forBulkFormat(basePath, parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner) > > ``` > > > > In this design the BucketAssigner is concatenated after the bulkFormat step. The problem that I'm having with this design is that I have an object that contains information that should be used to construct the path and a sub-object that contains the data to serialize. A simple example > > > > myClass > > |- country > > |- cityClass extends SpecificRecordBase) > > > > Let's say I receive myClass as a stream and I want to serialize the cityClass data via the logic above. The problem is that the `forBulkFormat(..)` needs to run on a subType of `SpecificRecordBase`, so myClass doesn't work. > > If I extract cityClass from myClass then I will not have country available in the `withBucketAssigner(..)` to be able to store the data in the right folder... > > > > > > Am I missing something or I do have to write my own version of the `ParquetBulkWriter<T>` class so to be able to handle `myClass`? > > > > Thanks for any idea and suggestion. > > Enrico > |
Free forum by Nabble | Edit this page |