Adding a part suffix setter to the BucketingSink

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Adding a part suffix setter to the BucketingSink

lrao@lyft.com
Currently the BucketingSink allows addition of part prefix, pending prefix/suffix and in-progress prefix/suffix via setter methods. Can we also support setting part suffixes?
An instance where this maybe useful: I am currently writing GZIP compressed output to S3 using the BucketingSink and I would want the uploaded files to have a ".gz" or ".zip" extensions (if the files does not have such an extensionelse they are written as garbled bytes and don't get rendered correctly for reading). I don't see a way of doing this apart from setting a part file prefix with the required file extension.  

Thanks
Lakshmi
Reply | Threaded
Open this post in threaded view
|

Re: Adding a part suffix setter to the BucketingSink

lrao@lyft.com
Sorry, I meant "I don't see a way of doing this apart from setting a part file *suffix* with the required file extension. "


On 2018/03/29 14:55:43, [hidden email] <[hidden email]> wrote:
> Currently the BucketingSink allows addition of part prefix, pending prefix/suffix and in-progress prefix/suffix via setter methods. Can we also support setting part suffixes?
> An instance where this maybe useful: I am currently writing GZIP compressed output to S3 using the BucketingSink and I would want the uploaded files to have a ".gz" or ".zip" extensions (if the files does not have such an extensionelse they are written as garbled bytes and don't get rendered correctly for reading). I don't see a way of doing this apart from setting a part file prefix with the required file extension.  
>
> Thanks
> Lakshmi
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a part suffix setter to the BucketingSink

Aljoscha Krettek-2
So you want to be able to set a "global" suffix that should be appended to all different kinds of files that the sink writes, including intermediate files?

Aljoscha

> On 29. Mar 2018, at 16:59, [hidden email] wrote:
>
> Sorry, I meant "I don't see a way of doing this apart from setting a part file *suffix* with the required file extension. "
>
>
> On 2018/03/29 14:55:43, [hidden email] <[hidden email]> wrote:
>> Currently the BucketingSink allows addition of part prefix, pending prefix/suffix and in-progress prefix/suffix via setter methods. Can we also support setting part suffixes?
>> An instance where this maybe useful: I am currently writing GZIP compressed output to S3 using the BucketingSink and I would want the uploaded files to have a ".gz" or ".zip" extensions (if the files does not have such an extensionelse they are written as garbled bytes and don't get rendered correctly for reading). I don't see a way of doing this apart from setting a part file prefix with the required file extension.  
>>
>> Thanks
>> Lakshmi
>>

Reply | Threaded
Open this post in threaded view
|

Re: Adding a part suffix setter to the BucketingSink

lrao@lyft.com
I can see two ways of achieving this:

1. Setting a suffix* **only*** for the completed part files. I don't
necessarily think the suffix should be added for the intermediate files (as
intermediate files should not really be ready for consumption by a
downstream process?)
2. Be able to override this partPath name creation -
https://github.com/apache/flink/blob/release-1.4.0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L523
. That way any user who needs to set a custom/dynamic part file name can
still do so.

Do you think either or one of these options is feasible?

Thanks
Lakshmi

On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek <[hidden email]>
wrote:

> So you want to be able to set a "global" suffix that should be appended to
> all different kinds of files that the sink writes, including intermediate
> files?
>
> Aljoscha
>
> > On 29. Mar 2018, at 16:59, [hidden email] wrote:
> >
> > Sorry, I meant "I don't see a way of doing this apart from setting a
> part file *suffix* with the required file extension. "
> >
> >
> > On 2018/03/29 14:55:43, [hidden email] <[hidden email]> wrote:
> >> Currently the BucketingSink allows addition of part prefix, pending
> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also
> support setting part suffixes?
> >> An instance where this maybe useful: I am currently writing GZIP
> compressed output to S3 using the BucketingSink and I would want the
> uploaded files to have a ".gz" or ".zip" extensions (if the files does not
> have such an extensionelse they are written as garbled bytes and don't get
> rendered correctly for reading). I don't see a way of doing this apart from
> setting a part file prefix with the required file extension.
> >>
> >> Thanks
> >> Lakshmi
> >>
>
>


--
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft] <http://www.lyft.com/>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a part suffix setter to the BucketingSink

Kostas Kloudas
Hi Lakshmi,

Since Flink-1.5 you have the ability to set the part suffix.
As you said, you only want the .gzip to be the suffix of the final (or “completed”) part files, which is exactly what is currently supported.

If you want also intermediate files to have this suffix, then you can always set all the suffixes (in-progress, pending and final) to “.gzip”
but then you have to also set the appropriate preffixes so that Flink can distinguish completed from non-completed files (filenames
must not collide).

Also I would recommend to use the most recent stable version 1.5.3 which also includes this bug fix:
https://issues.apache.org/jira/browse/FLINK-9603 <https://issues.apache.org/jira/browse/FLINK-9603>

I hope this helps,
Kostas


> On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao <[hidden email]> wrote:
>
> I can see two ways of achieving this:
>
> 1. Setting a suffix* **only*** for the completed part files. I don't
> necessarily think the suffix should be added for the intermediate files (as
> intermediate files should not really be ready for consumption by a
> downstream process?)
> 2. Be able to override this partPath name creation -
> https://github.com/apache/flink/blob/release-1.4.0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L523
> . That way any user who needs to set a custom/dynamic part file name can
> still do so.
>
> Do you think either or one of these options is feasible?
>
> Thanks
> Lakshmi
>
> On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
>> So you want to be able to set a "global" suffix that should be appended to
>> all different kinds of files that the sink writes, including intermediate
>> files?
>>
>> Aljoscha
>>
>>> On 29. Mar 2018, at 16:59, [hidden email] wrote:
>>>
>>> Sorry, I meant "I don't see a way of doing this apart from setting a
>> part file *suffix* with the required file extension. "
>>>
>>>
>>> On 2018/03/29 14:55:43, [hidden email] <[hidden email]> wrote:
>>>> Currently the BucketingSink allows addition of part prefix, pending
>> prefix/suffix and in-progress prefix/suffix via setter methods. Can we also
>> support setting part suffixes?
>>>> An instance where this maybe useful: I am currently writing GZIP
>> compressed output to S3 using the BucketingSink and I would want the
>> uploaded files to have a ".gz" or ".zip" extensions (if the files does not
>> have such an extensionelse they are written as garbled bytes and don't get
>> rendered correctly for reading). I don't see a way of doing this apart from
>> setting a part file prefix with the required file extension.
>>>>
>>>> Thanks
>>>> Lakshmi
>>>>
>>
>>
>
>
> --
> *Lakshmi Gururaja Rao*
> SWE
> 217.778.7218 <+12177787218>
> [image: Lyft] <http://www.lyft.com/>

Reply | Threaded
Open this post in threaded view
|

Re: Adding a part suffix setter to the BucketingSink

vino yang
Hi Kostas, good job!

2018-07-12 19:40 GMT+08:00 Kostas Kloudas <[hidden email]>:

> Hi Lakshmi,
>
> Since Flink-1.5 you have the ability to set the part suffix.
> As you said, you only want the .gzip to be the suffix of the final (or
> “completed”) part files, which is exactly what is currently supported.
>
> If you want also intermediate files to have this suffix, then you can
> always set all the suffixes (in-progress, pending and final) to “.gzip”
> but then you have to also set the appropriate preffixes so that Flink can
> distinguish completed from non-completed files (filenames
> must not collide).
>
> Also I would recommend to use the most recent stable version 1.5.3 which
> also includes this bug fix:
> https://issues.apache.org/jira/browse/FLINK-9603 <
> https://issues.apache.org/jira/browse/FLINK-9603>
>
> I hope this helps,
> Kostas
>
>
> > On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao <[hidden email]> wrote:
> >
> > I can see two ways of achieving this:
> >
> > 1. Setting a suffix* **only*** for the completed part files. I don't
> > necessarily think the suffix should be added for the intermediate files
> (as
> > intermediate files should not really be ready for consumption by a
> > downstream process?)
> > 2. Be able to override this partPath name creation -
> > https://github.com/apache/flink/blob/release-1.4.0/
> flink-connectors/flink-connector-filesystem/src/main/
> java/org/apache/flink/streaming/connectors/fs/
> bucketing/BucketingSink.java#L523
> > . That way any user who needs to set a custom/dynamic part file name can
> > still do so.
> >
> > Do you think either or one of these options is feasible?
> >
> > Thanks
> > Lakshmi
> >
> > On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> >> So you want to be able to set a "global" suffix that should be appended
> to
> >> all different kinds of files that the sink writes, including
> intermediate
> >> files?
> >>
> >> Aljoscha
> >>
> >>> On 29. Mar 2018, at 16:59, [hidden email] wrote:
> >>>
> >>> Sorry, I meant "I don't see a way of doing this apart from setting a
> >> part file *suffix* with the required file extension. "
> >>>
> >>>
> >>> On 2018/03/29 14:55:43, [hidden email] <[hidden email]> wrote:
> >>>> Currently the BucketingSink allows addition of part prefix, pending
> >> prefix/suffix and in-progress prefix/suffix via setter methods. Can we
> also
> >> support setting part suffixes?
> >>>> An instance where this maybe useful: I am currently writing GZIP
> >> compressed output to S3 using the BucketingSink and I would want the
> >> uploaded files to have a ".gz" or ".zip" extensions (if the files does
> not
> >> have such an extensionelse they are written as garbled bytes and don't
> get
> >> rendered correctly for reading). I don't see a way of doing this apart
> from
> >> setting a part file prefix with the required file extension.
> >>>>
> >>>> Thanks
> >>>> Lakshmi
> >>>>
> >>
> >>
> >
> >
> > --
> > *Lakshmi Gururaja Rao*
> > SWE
> > 217.778.7218 <+12177787218>
> > [image: Lyft] <http://www.lyft.com/>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a part suffix setter to the BucketingSink

lrao@lyft.com
Hi Kostas,

Thank you for replying. I am already using the ability to set part suffix.
I was not aware of this issue - https://issues.apache.org/
jira/browse/FLINK-9603. Thanks for pointing out, I'll make sure to use the
1.5.3 version of the sink.

Thanks
Lakshmi


On Thu, Jul 12, 2018 at 4:55 AM, vino yang <[hidden email]> wrote:

> Hi Kostas, good job!
>
> 2018-07-12 19:40 GMT+08:00 Kostas Kloudas <[hidden email]>:
>
> > Hi Lakshmi,
> >
> > Since Flink-1.5 you have the ability to set the part suffix.
> > As you said, you only want the .gzip to be the suffix of the final (or
> > “completed”) part files, which is exactly what is currently supported.
> >
> > If you want also intermediate files to have this suffix, then you can
> > always set all the suffixes (in-progress, pending and final) to “.gzip”
> > but then you have to also set the appropriate preffixes so that Flink can
> > distinguish completed from non-completed files (filenames
> > must not collide).
> >
> > Also I would recommend to use the most recent stable version 1.5.3 which
> > also includes this bug fix:
> > https://issues.apache.org/jira/browse/FLINK-9603 <
> > https://issues.apache.org/jira/browse/FLINK-9603>
> >
> > I hope this helps,
> > Kostas
> >
> >
> > > On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao <[hidden email]>
> wrote:
> > >
> > > I can see two ways of achieving this:
> > >
> > > 1. Setting a suffix* **only*** for the completed part files. I don't
> > > necessarily think the suffix should be added for the intermediate files
> > (as
> > > intermediate files should not really be ready for consumption by a
> > > downstream process?)
> > > 2. Be able to override this partPath name creation -
> > > https://github.com/apache/flink/blob/release-1.4.0/
> > flink-connectors/flink-connector-filesystem/src/main/
> > java/org/apache/flink/streaming/connectors/fs/
> > bucketing/BucketingSink.java#L523
> > > . That way any user who needs to set a custom/dynamic part file name
> can
> > > still do so.
> > >
> > > Do you think either or one of these options is feasible?
> > >
> > > Thanks
> > > Lakshmi
> > >
> > > On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek <[hidden email]
> >
> > > wrote:
> > >
> > >> So you want to be able to set a "global" suffix that should be
> appended
> > to
> > >> all different kinds of files that the sink writes, including
> > intermediate
> > >> files?
> > >>
> > >> Aljoscha
> > >>
> > >>> On 29. Mar 2018, at 16:59, [hidden email] wrote:
> > >>>
> > >>> Sorry, I meant "I don't see a way of doing this apart from setting a
> > >> part file *suffix* with the required file extension. "
> > >>>
> > >>>
> > >>> On 2018/03/29 14:55:43, [hidden email] <[hidden email]> wrote:
> > >>>> Currently the BucketingSink allows addition of part prefix, pending
> > >> prefix/suffix and in-progress prefix/suffix via setter methods. Can we
> > also
> > >> support setting part suffixes?
> > >>>> An instance where this maybe useful: I am currently writing GZIP
> > >> compressed output to S3 using the BucketingSink and I would want the
> > >> uploaded files to have a ".gz" or ".zip" extensions (if the files does
> > not
> > >> have such an extensionelse they are written as garbled bytes and don't
> > get
> > >> rendered correctly for reading). I don't see a way of doing this apart
> > from
> > >> setting a part file prefix with the required file extension.
> > >>>>
> > >>>> Thanks
> > >>>> Lakshmi
> > >>>>
> > >>
> > >>
> > >
> > >
> > > --
> > > *Lakshmi Gururaja Rao*
> > > SWE
> > > 217.778.7218 <+12177787218>
> > > [image: Lyft] <http://www.lyft.com/>
> >
> >
>



--
*Lakshmi Gururaja Rao*
SWE
217.778.7218 <+12177787218>
[image: Lyft] <http://www.lyft.com/>
Reply | Threaded
Open this post in threaded view
|

Re: Adding a part suffix setter to the BucketingSink

Kostas Kloudas
Hi Lakshmi,

I meant Flink 1.5.1 (not 1.5.3) which was recently released.

Cheers,
Kostas

> On Jul 12, 2018, at 7:34 PM, Lakshmi Gururaja Rao <[hidden email]> wrote:
>
> Hi Kostas,
>
> Thank you for replying. I am already using the ability to set part suffix.
> I was not aware of this issue - https://issues.apache.org/
> jira/browse/FLINK-9603. Thanks for pointing out, I'll make sure to use the
> 1.5.3 version of the sink.
>
> Thanks
> Lakshmi
>
>
> On Thu, Jul 12, 2018 at 4:55 AM, vino yang <[hidden email]> wrote:
>
>> Hi Kostas, good job!
>>
>> 2018-07-12 19:40 GMT+08:00 Kostas Kloudas <[hidden email]>:
>>
>>> Hi Lakshmi,
>>>
>>> Since Flink-1.5 you have the ability to set the part suffix.
>>> As you said, you only want the .gzip to be the suffix of the final (or
>>> “completed”) part files, which is exactly what is currently supported.
>>>
>>> If you want also intermediate files to have this suffix, then you can
>>> always set all the suffixes (in-progress, pending and final) to “.gzip”
>>> but then you have to also set the appropriate preffixes so that Flink can
>>> distinguish completed from non-completed files (filenames
>>> must not collide).
>>>
>>> Also I would recommend to use the most recent stable version 1.5.3 which
>>> also includes this bug fix:
>>> https://issues.apache.org/jira/browse/FLINK-9603 <
>>> https://issues.apache.org/jira/browse/FLINK-9603>
>>>
>>> I hope this helps,
>>> Kostas
>>>
>>>
>>>> On Apr 5, 2018, at 6:23 PM, Lakshmi Gururaja Rao <[hidden email]>
>> wrote:
>>>>
>>>> I can see two ways of achieving this:
>>>>
>>>> 1. Setting a suffix* **only*** for the completed part files. I don't
>>>> necessarily think the suffix should be added for the intermediate files
>>> (as
>>>> intermediate files should not really be ready for consumption by a
>>>> downstream process?)
>>>> 2. Be able to override this partPath name creation -
>>>> https://github.com/apache/flink/blob/release-1.4.0/
>>> flink-connectors/flink-connector-filesystem/src/main/
>>> java/org/apache/flink/streaming/connectors/fs/
>>> bucketing/BucketingSink.java#L523
>>>> . That way any user who needs to set a custom/dynamic part file name
>> can
>>>> still do so.
>>>>
>>>> Do you think either or one of these options is feasible?
>>>>
>>>> Thanks
>>>> Lakshmi
>>>>
>>>> On Tue, Apr 3, 2018 at 12:57 AM, Aljoscha Krettek <[hidden email]
>>>
>>>> wrote:
>>>>
>>>>> So you want to be able to set a "global" suffix that should be
>> appended
>>> to
>>>>> all different kinds of files that the sink writes, including
>>> intermediate
>>>>> files?
>>>>>
>>>>> Aljoscha
>>>>>
>>>>>> On 29. Mar 2018, at 16:59, [hidden email] wrote:
>>>>>>
>>>>>> Sorry, I meant "I don't see a way of doing this apart from setting a
>>>>> part file *suffix* with the required file extension. "
>>>>>>
>>>>>>
>>>>>> On 2018/03/29 14:55:43, [hidden email] <[hidden email]> wrote:
>>>>>>> Currently the BucketingSink allows addition of part prefix, pending
>>>>> prefix/suffix and in-progress prefix/suffix via setter methods. Can we
>>> also
>>>>> support setting part suffixes?
>>>>>>> An instance where this maybe useful: I am currently writing GZIP
>>>>> compressed output to S3 using the BucketingSink and I would want the
>>>>> uploaded files to have a ".gz" or ".zip" extensions (if the files does
>>> not
>>>>> have such an extensionelse they are written as garbled bytes and don't
>>> get
>>>>> rendered correctly for reading). I don't see a way of doing this apart
>>> from
>>>>> setting a part file prefix with the required file extension.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Lakshmi
>>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> *Lakshmi Gururaja Rao*
>>>> SWE
>>>> 217.778.7218 <+12177787218>
>>>> [image: Lyft] <http://www.lyft.com/>
>>>
>>>
>>
>
>
>
> --
> *Lakshmi Gururaja Rao*
> SWE
> 217.778.7218 <+12177787218>
> [image: Lyft] <http://www.lyft.com/>