http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-Limit-size-of-already-processed-files-in-File-Source-SplitEnumerator-tp51184p51218.html
thanks for starting this discussion. I am pulling in Arvid who works on
modification timestamps and the watermark. What is the benefit of keeping
> Hi!
>
> Currently Flink File Source relies on a Set<Path> pathsAlreadyProcessed in
> SplitEnumerator to decide which file has been processed and avoids
> reprocessing files if a file is already in this set. However this set could
> be ever growing and ultimately exceed memory size if there are new files
> continuously added to the input path.
>
> I submitted
https://issues.apache.org/jira/browse/FLINK-22792 and would
> like to be assigned to the ticket.
>
> Current proposed change as belows, would like to get an agreement on the
> approach taken.
>
> 1.
>
> Maintain fileWatermark updated by new files modification time in
> ContinuousFileSplitEnumerator
> 2.
>
> Change Set<Path> pathsAlreadyProcessed to a HashMap<Path, Long>
> pathsAlreadyProcessed where the key is same as before which is the file
> path of already processed files, and the value is file modification
> time,
> expose getModificationTime() method to FileSourceSplit.
>
>
> 1.
>
> Adding a fileExpireTime user configurable config, any files older
> than fileWatermark
> - fileExpireTime would get ignored.
> 2.
>
> When snapshotting splitEnumerator, remove files that are older than
> fileWatermark
> - fileExpireTime from the pathsAlreadyProcessed map.
> 3.
>
> Adding alreadyProcessedPaths map and fileWatermark to
> PendingSplitsCheckpoint, modify the current
> PendingSplitsCheckpointSerializer to add a version2 serializer that
> serialize the alreadyProcessedPaths map which included file modification
> time.
> 4.
>
> Subclass of PendingSplitsCheckpoint like
> ContinuousHivePendingSplitsCheckpoint would not be impacted by
> initializing an empty alreadyProcessedMap and 0 as initial watermark.
>
> Thanks!
>