Alexander Trushev created FLINK-20208:
-----------------------------------------
Summary: Remove outdated in-progress files in StreamingFileSink
Key: FLINK-20208
URL:
https://issues.apache.org/jira/browse/FLINK-20208 Project: Flink
Issue Type: Improvement
Components: Connectors / FileSystem
Affects Versions: 1.11.2
Reporter: Alexander Trushev
Assume a job has StreamingFileSink with OnCheckpointRollingPolicy
In the case:
# Acknowledged checkpoint
# Event is written to new .part-X-Y.UUID1
# Job failure
# Job recovery from the checkpoint
# Event is written to new .part-X-Y.UUID2
we have the outdated part file .part-X-Y.UUID1. Where X - subtask index, Y - part counter.
*Proposal*
Add method
{code:java}
boolean shouldRemoveOutdatedParts()
{code}
to RollingPolicy.
Add configurable parameter to OnCheckpointRollingPolicy and to DefaultRollingPolicy that will be returned by shouldRemoveOutdatedParts() (by default false)
We can remove such outdated part files by the next algorithm while restoring job from a checkpoint
# After buckets state initializing check shouldRemoveOutdatedParts. If true then (2)
# For each inactive bucket scan bucket directory
# If three conditions are true then remove part file:
part filename contains "inprogress";
subtask index from filename equals to current subtask index;
part counter from filename more than or equals to current max part counter.
I propose to remove outdated files, because the similar proposal to overwrite outdated files has not been implemented
[
https://issues.apache.org/jira/browse/FLINK-11116|
https://vk.com/away.php?to=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11116&cc_key=]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)