[Proposal] RichFsSinkFunction

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[Proposal] RichFsSinkFunction

Seth Wiesman

Hi all,

 

I have been working with Flink for a while at work now and in that time I have developed several extensions that I would like to contribute back. I wanted to reach out with what has been the most significant modification for and see if it is something that the community would be interested in.

 

 

RichFsSinkFunction

 

Currently my pipeline uses the BucketingSink to write out files to S3 which are then consumed by other processes. Outputting data to a file system is fundamentally different than outputting to something such as Kafka because data can only be consumed once at the file level. The BucketingSink moves files through three phases, inprogress, pending, and complete, and if you are interested in maintaining exactly once guarantees then you only want your external services to consume files once it reaches a complete state. One option is to write a _SUCCESS file to a bucket once all files in that bucket are done but that can be difficult to coordinate or may take a prohibitively long amount of time. In the case of the BasePathBucketer this will never happen. For my use case it is important to for external services to be able to consume files as soon as they become available. To solve this, we modified the bucketing sink to not be the end of the pipeline but instead forward the final path of files on once they reach their final state to a final operator.

 

I do not want to fundamentally change the concept of what a sink is, it should remain the end of the pipeline, instead this is simply to allow a custom ‘onClose’ step. To do this, paths can only be forwarded on to an operator of parallelism one whose only operation is to add a sink. From this other services can be notified that completed files exist.

 

To provide a motivating example, after writing files to S3 I need to load them into a redshift cluster. To do this I batch completed files for 1 minute of processing time and then write out a manifest file ( a list of completed files to load) and run a copy command. http://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html  .

 

I have below an example gist of what this looks like to use.  

 

Example: https://gist.github.com/sjwiesman/fc99c64f44a93cfc9c7aa62c070a9358

 

Seth Wiesman | Data Engineer

4 World Trade Center, 45th Floor, New York, NY 10007

 

Reply | Threaded
Open this post in threaded view
|

Re: [Proposal] RichFsSinkFunction

Fabian Hueske-2
Hi Seth,

first of all, thanks for sharing your use case and your plan to contribute
to Flink.

I guess processing files as soon as they are completed is a very common use
case.
Publishing the paths of completed files as a data stream sounds like a nice
idea.
IMO, it would be great to have this feature for the BucketingSink.

Would you mind to open a JIRA for this feature?

Thank you,
Fabian



2017-01-24 18:06 GMT+01:00 Seth Wiesman <[hidden email]>:

> Hi all,
>
>
>
> I have been working with Flink for a while at work now and in that time I
> have developed several extensions that I would like to contribute back. I
> wanted to reach out with what has been the most significant modification
> for and see if it is something that the community would be interested in.
>
>
>
>
>
> RichFsSinkFunction
>
>
>
> Currently my pipeline uses the BucketingSink to write out files to S3
> which are then consumed by other processes. Outputting data to a file
> system is fundamentally different than outputting to something such as
> Kafka because data can only be consumed once at the file level. The
> BucketingSink moves files through three phases, inprogress, pending, and
> complete, and if you are interested in maintaining exactly once guarantees
> then you only want your external services to consume files once it reaches
> a complete state. One option is to write a _SUCCESS file to a bucket once
> all files in that bucket are done but that can be difficult to coordinate
> or may take a prohibitively long amount of time. In the case of the
> BasePathBucketer this will never happen. For my use case it is important to
> for external services to be able to consume files as soon as they become
> available. To solve this, we modified the bucketing sink to not be the end
> of the pipeline but instead forward the final path of files on once they
> reach their final state to a final operator.
>
>
>
> I do not want to fundamentally change the concept of what a sink is, it
> should remain the end of the pipeline, instead this is simply to allow a
> custom ‘onClose’ step. To do this, paths can only be forwarded on to an
> operator of parallelism one whose only operation is to add a sink. From
> this other services can be notified that completed files exist.
>
>
>
> To provide a motivating example, after writing files to S3 I need to load
> them into a redshift cluster. To do this I batch completed files for 1
> minute of processing time and then write out a manifest file ( a list of
> completed files to load) and run a copy command.
> http://docs.aws.amazon.com/redshift/latest/dg/loading-
> data-files-using-manifest.html  .
>
>
>
> I have below an example gist of what this looks like to use.
>
>
>
> Example: https://gist.github.com/sjwiesman/fc99c64f44a93cfc9c7aa62c070a93
> 58
>
>
>
> <https://www.mediamath.com/mailto>
>
> *Seth Wiesman* *|* *Data Engineer*
>
> 4 World Trade Center, 45th Floor, New York, NY 10007
> <https://www.mediamath.com/mailto>
>
>
>