Hi all,
Just run into a bit of a problem and I am not sure what the behavior should be and if this should be considered a bug? Or if there is some other way this should be handled? I have a streaming job with a stream that eventually closes, this job sinks to a StreamingFileSink. The problem I am experiencing is that any data written to the sink between the last checkpoint and the close of the stream is list. This happens (AFAICT) because the StreamingFileSink relies on checkpoints to commit files and closing the stream currently does not try and commit anything. It seems like just making close call `buckets.commitUpToCheckpoint(Long.MAX_VALUE)` would work pretty well assuming it is a an actual stream close, but could be problematic in the events of a savepoint/cancel and resuming later (it may only mean some files would be prematurely committed). Ideally, we would be able to differentiate between the two different types of close (an actual stream finishing vs a cancel), but at the moment that doesn't seem supported. If this considered a bug, please let me know and I will file a Jira, if not, what is the "correct" way to handle getting all the data out with any sinks that rely on a checkpoint to commit data? Thanks |
Hi Addison,
thanks for reporting this issue. I've pulled in Kostas who worked on the StreamingFileSink and knows the current behaviour as well as its limitations best. Cheers, Till On Wed, Nov 7, 2018 at 11:49 PM Addison Higham <[hidden email]> wrote: > Hi all, > > Just run into a bit of a problem and I am not sure what the behavior should > be and if this should be considered a bug? Or if there is some other way > this should be handled? > > I have a streaming job with a stream that eventually closes, this job sinks > to a StreamingFileSink. > The problem I am experiencing is that any data written to the sink between > the last checkpoint and the close of the stream is list. > > This happens (AFAICT) because the StreamingFileSink relies on checkpoints > to commit files and closing the stream currently does not try and commit > anything. > > It seems like just making close call > `buckets.commitUpToCheckpoint(Long.MAX_VALUE)` would work pretty well > assuming it is a an actual stream close, but could be problematic in the > events of a savepoint/cancel and resuming later (it may only mean some > files would be prematurely committed). Ideally, we would be able to > differentiate between the two different types of close (an actual stream > finishing vs a cancel), but at the moment that doesn't seem supported. > > If this considered a bug, please let me know and I will file a Jira, if > not, what is the "correct" way to handle getting all the data out with any > sinks that rely on a checkpoint to commit data? > > Thanks > |
Hi Addison,
unfortunately, there is a long-standing problem that user functions cannot differentiate between successful and erroneous shutdown [1]. I had this high on my private list of things that I finally want to see fixed in Flink 1.8. And your message further confirms this. Best, Aljoscha [1] https://issues.apache.org/jira/browse/FLINK-2646 > On 8. Nov 2018, at 13:39, Till Rohrmann <[hidden email]> wrote: > > Hi Addison, > > thanks for reporting this issue. I've pulled in Kostas who worked on the > StreamingFileSink and knows the current behaviour as well as its > limitations best. > > Cheers, > Till > > On Wed, Nov 7, 2018 at 11:49 PM Addison Higham <[hidden email]> wrote: > >> Hi all, >> >> Just run into a bit of a problem and I am not sure what the behavior should >> be and if this should be considered a bug? Or if there is some other way >> this should be handled? >> >> I have a streaming job with a stream that eventually closes, this job sinks >> to a StreamingFileSink. >> The problem I am experiencing is that any data written to the sink between >> the last checkpoint and the close of the stream is list. >> >> This happens (AFAICT) because the StreamingFileSink relies on checkpoints >> to commit files and closing the stream currently does not try and commit >> anything. >> >> It seems like just making close call >> `buckets.commitUpToCheckpoint(Long.MAX_VALUE)` would work pretty well >> assuming it is a an actual stream close, but could be problematic in the >> events of a savepoint/cancel and resuming later (it may only mean some >> files would be prematurely committed). Ideally, we would be able to >> differentiate between the two different types of close (an actual stream >> finishing vs a cancel), but at the moment that doesn't seem supported. >> >> If this considered a bug, please let me know and I will file a Jira, if >> not, what is the "correct" way to handle getting all the data out with any >> sinks that rely on a checkpoint to commit data? >> >> Thanks >> |
Free forum by Nabble | Edit this page |