Re: [DISCUSS] Limit size of already processed files in File Source SplitEnumerator

Posted by Till Rohrmann on
URL: http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/DISCUSS-Limit-size-of-already-processed-files-in-File-Source-SplitEnumerator-tp51184p51218.html

Hi Tianxin,

thanks for starting this discussion. I am pulling in Arvid who works on
Flink's connectors.

I think the problem you are describing can happen.

From what I understand you are proposing to keep track of the watermark of
processed file input splits and then filter out splits based on their
modification timestamps and the watermark. What is the benefit of keeping
for every split the modification timestamp in the map? Could it also work
if we sort the input splits according to their modification timestamps and
then remember the last processed split? That way we only remember a single
value and upon recovery, we only process those splits which have a newer
modification timestamp.

Cheers,
Till

On Tue, Jun 8, 2021 at 12:11 AM Tianxin Zhao <[hidden email]> wrote:

> Hi!
>
> Currently Flink File Source relies on a Set<Path> pathsAlreadyProcessed in
> SplitEnumerator to decide which file has been processed and avoids
> reprocessing files if a file is already in this set. However this set could
> be ever growing and ultimately exceed memory size if there are new files
> continuously added to the input path.
>
> I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would
> like to be assigned to the ticket.
>
> Current proposed change as belows, would like to get an agreement on the
> approach taken.
>
>    1.
>
>    Maintain fileWatermark updated by new files modification time in
>    ContinuousFileSplitEnumerator
>    2.
>
>    Change Set<Path> pathsAlreadyProcessed to a HashMap<Path, Long>
>    pathsAlreadyProcessed where the key is same as before which is the file
>    path of already processed files, and the value is file modification
> time,
>    expose getModificationTime() method to FileSourceSplit.
>
>
>    1.
>
>    Adding a fileExpireTime user configurable config, any files older
> than fileWatermark
>    - fileExpireTime would get ignored.
>    2.
>
>    When snapshotting splitEnumerator, remove files that are older than
> fileWatermark
>    - fileExpireTime from the pathsAlreadyProcessed map.
>    3.
>
>    Adding alreadyProcessedPaths map and fileWatermark to
>    PendingSplitsCheckpoint, modify the current
>    PendingSplitsCheckpointSerializer to add a version2 serializer that
>    serialize the alreadyProcessedPaths map which included file modification
>    time.
>    4.
>
>    Subclass of PendingSplitsCheckpoint like
>    ContinuousHivePendingSplitsCheckpoint would not be impacted by
>    initializing an empty alreadyProcessedMap and 0 as initial watermark.
>
> Thanks!
>