Hi!
Currently Flink File Source relies on a Set<Path> pathsAlreadyProcessed in SplitEnumerator to decide which file has been processed and avoids reprocessing files if a file is already in this set. However this set could be ever growing and ultimately exceed memory size if there are new files continuously added to the input path. I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would like to be assigned to the ticket. Current proposed change as belows, would like to get an agreement on the approach taken. 1. Maintain fileWatermark updated by new files modification time in ContinuousFileSplitEnumerator 2. Change Set<Path> pathsAlreadyProcessed to a HashMap<Path, Long> pathsAlreadyProcessed where the key is same as before which is the file path of already processed files, and the value is file modification time, expose getModificationTime() method to FileSourceSplit. 1. Adding a fileExpireTime user configurable config, any files older than fileWatermark - fileExpireTime would get ignored. 2. When snapshotting splitEnumerator, remove files that are older than fileWatermark - fileExpireTime from the pathsAlreadyProcessed map. 3. Adding alreadyProcessedPaths map and fileWatermark to PendingSplitsCheckpoint, modify the current PendingSplitsCheckpointSerializer to add a version2 serializer that serialize the alreadyProcessedPaths map which included file modification time. 4. Subclass of PendingSplitsCheckpoint like ContinuousHivePendingSplitsCheckpoint would not be impacted by initializing an empty alreadyProcessedMap and 0 as initial watermark. Thanks! |
Hi Tianxin,
thanks for starting this discussion. I am pulling in Arvid who works on Flink's connectors. I think the problem you are describing can happen. From what I understand you are proposing to keep track of the watermark of processed file input splits and then filter out splits based on their modification timestamps and the watermark. What is the benefit of keeping for every split the modification timestamp in the map? Could it also work if we sort the input splits according to their modification timestamps and then remember the last processed split? That way we only remember a single value and upon recovery, we only process those splits which have a newer modification timestamp. Cheers, Till On Tue, Jun 8, 2021 at 12:11 AM Tianxin Zhao <[hidden email]> wrote: > Hi! > > Currently Flink File Source relies on a Set<Path> pathsAlreadyProcessed in > SplitEnumerator to decide which file has been processed and avoids > reprocessing files if a file is already in this set. However this set could > be ever growing and ultimately exceed memory size if there are new files > continuously added to the input path. > > I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would > like to be assigned to the ticket. > > Current proposed change as belows, would like to get an agreement on the > approach taken. > > 1. > > Maintain fileWatermark updated by new files modification time in > ContinuousFileSplitEnumerator > 2. > > Change Set<Path> pathsAlreadyProcessed to a HashMap<Path, Long> > pathsAlreadyProcessed where the key is same as before which is the file > path of already processed files, and the value is file modification > time, > expose getModificationTime() method to FileSourceSplit. > > > 1. > > Adding a fileExpireTime user configurable config, any files older > than fileWatermark > - fileExpireTime would get ignored. > 2. > > When snapshotting splitEnumerator, remove files that are older than > fileWatermark > - fileExpireTime from the pathsAlreadyProcessed map. > 3. > > Adding alreadyProcessedPaths map and fileWatermark to > PendingSplitsCheckpoint, modify the current > PendingSplitsCheckpointSerializer to add a version2 serializer that > serialize the alreadyProcessedPaths map which included file modification > time. > 4. > > Subclass of PendingSplitsCheckpoint like > ContinuousHivePendingSplitsCheckpoint would not be impacted by > initializing an empty alreadyProcessedMap and 0 as initial watermark. > > Thanks! > |
It would really simplify a lot if the modification timestamp of each newly scanned file is increased.
We only need to record the file list corresponding to the largest timestamp. Timestamp of each scanned file 1. It is smaller than the maximum timestamp, which means it has been processed; 2. The timestamps are equal, so you need to see if it is in this file list, if it is, you don't need to process it, if it is not, you need to process it; 3. It is larger than the maximum timestamp and has not been processed If the maximum timestamp is dynamically restored from the file list every time it is started, the state compatibility issue can be ignored. BTW I haven't done any test, but I am actually a little curious, if a lot of files have been processed, is the scan itself already very slow? I mean maybe the bottleneck at the beginning might be scan? > 在 2021年6月8日,下午5:41,Till Rohrmann <[hidden email]> 写道: > > Hi Tianxin, > > thanks for starting this discussion. I am pulling in Arvid who works on > Flink's connectors. > > I think the problem you are describing can happen. > > From what I understand you are proposing to keep track of the watermark of > processed file input splits and then filter out splits based on their > modification timestamps and the watermark. What is the benefit of keeping > for every split the modification timestamp in the map? Could it also work > if we sort the input splits according to their modification timestamps and > then remember the last processed split? That way we only remember a single > value and upon recovery, we only process those splits which have a newer > modification timestamp. > > Cheers, > Till > >> On Tue, Jun 8, 2021 at 12:11 AM Tianxin Zhao <[hidden email]> wrote: >> >> Hi! >> >> Currently Flink File Source relies on a Set<Path> pathsAlreadyProcessed in >> SplitEnumerator to decide which file has been processed and avoids >> reprocessing files if a file is already in this set. However this set could >> be ever growing and ultimately exceed memory size if there are new files >> continuously added to the input path. >> >> I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would >> like to be assigned to the ticket. >> >> Current proposed change as belows, would like to get an agreement on the >> approach taken. >> >> 1. >> >> Maintain fileWatermark updated by new files modification time in >> ContinuousFileSplitEnumerator >> 2. >> >> Change Set<Path> pathsAlreadyProcessed to a HashMap<Path, Long> >> pathsAlreadyProcessed where the key is same as before which is the file >> path of already processed files, and the value is file modification >> time, >> expose getModificationTime() method to FileSourceSplit. >> >> >> 1. >> >> Adding a fileExpireTime user configurable config, any files older >> than fileWatermark >> - fileExpireTime would get ignored. >> 2. >> >> When snapshotting splitEnumerator, remove files that are older than >> fileWatermark >> - fileExpireTime from the pathsAlreadyProcessed map. >> 3. >> >> Adding alreadyProcessedPaths map and fileWatermark to >> PendingSplitsCheckpoint, modify the current >> PendingSplitsCheckpointSerializer to add a version2 serializer that >> serialize the alreadyProcessedPaths map which included file modification >> time. >> 4. >> >> Subclass of PendingSplitsCheckpoint like >> ContinuousHivePendingSplitsCheckpoint would not be impacted by >> initializing an empty alreadyProcessedMap and 0 as initial watermark. >> >> Thanks! >> |
Hi Tianxin,
I assigned you the ticket, so you could go ahead and create some POC PR. I would like to understand the issue first a bit better and then give some things to consider. In general, I see your point that in a potentially infinitely running application keeping track of all read entities will eventually lead to an OOM. 1. About which order of magnitude are we talking? Your average path should be less than 1kb, so 1 million paths, is 1 GB. I'm assuming you have hit that limit already and see memory issues on the job manager? 2. At that point, do we need to improve the scanning already? How long does that take? 3. Doesn't NonSplittingRecursiveEnumerator#enumerateSplits always return a full collection of files as well? So isn't the problem appearing there as well? For pruning, I'm not too sure. I fear that we always run into issues with filesystems that are not behaving as well as the local filesystem. 4. If we want to prune the set as you suggested, how can we be sure that we are not ignoring any file that is supposed to be processed? Consider a file X with modification time M that is only visible in the eventual consistent filesystem at time M2. If we have a checkpoint between M and M2, wouldn't we also prune X, even though it has not been read yet? 5. With all time-based proposals: what happens if you have any kind of time drift? As an alternative 6. Couldn't we move the information fully to a state backend, such that when the application grows, we can use the disk as well? However, I also saw that we currently do not allow user state in sources at all. 7. We could also compress the information (hash?). But that's probably just buying us a little time. In general, it might actually best to move the decision completely into user code land. Maybe we should change FileEnumerator#enumerateSplits to not return duplicates and allow it to be stateful. Then the user can make a conscious decision based on the used filesystem. We could provide pruning enumerator implementations and the user just has to pick the appropriate one if the default one doesn't work anymore. On Tue, Jun 8, 2021 at 12:54 PM Guowei Ma <[hidden email]> wrote: > It would really simplify a lot if the modification timestamp of each newly > scanned file is increased. > > We only need to record the file list corresponding to the largest > timestamp. Timestamp of each scanned file > 1. It is smaller than the maximum timestamp, which means it has been > processed; > 2. The timestamps are equal, so you need to see if it is in this file > list, if it is, you don't need to process it, if it is not, you need to > process it; > 3. It is larger than the maximum timestamp and has not been processed > > If the maximum timestamp is dynamically restored from the file list every > time it is started, the state compatibility issue can be ignored. > > > BTW I haven't done any test, but I am actually a little curious, if a lot > of files have been processed, is the scan itself already very slow? I mean > maybe the bottleneck at the beginning might be scan? > > > > > 在 2021年6月8日,下午5:41,Till Rohrmann <[hidden email]> 写道: > > > > Hi Tianxin, > > > > thanks for starting this discussion. I am pulling in Arvid who works on > > Flink's connectors. > > > > I think the problem you are describing can happen. > > > > From what I understand you are proposing to keep track of the watermark > of > > processed file input splits and then filter out splits based on their > > modification timestamps and the watermark. What is the benefit of keeping > > for every split the modification timestamp in the map? Could it also work > > if we sort the input splits according to their modification timestamps > and > > then remember the last processed split? That way we only remember a > single > > value and upon recovery, we only process those splits which have a newer > > modification timestamp. > > > > Cheers, > > Till > > > >> On Tue, Jun 8, 2021 at 12:11 AM Tianxin Zhao <[hidden email]> > wrote: > >> > >> Hi! > >> > >> Currently Flink File Source relies on a Set<Path> pathsAlreadyProcessed > in > >> SplitEnumerator to decide which file has been processed and avoids > >> reprocessing files if a file is already in this set. However this set > could > >> be ever growing and ultimately exceed memory size if there are new files > >> continuously added to the input path. > >> > >> I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would > >> like to be assigned to the ticket. > >> > >> Current proposed change as belows, would like to get an agreement on the > >> approach taken. > >> > >> 1. > >> > >> Maintain fileWatermark updated by new files modification time in > >> ContinuousFileSplitEnumerator > >> 2. > >> > >> Change Set<Path> pathsAlreadyProcessed to a HashMap<Path, Long> > >> pathsAlreadyProcessed where the key is same as before which is the > file > >> path of already processed files, and the value is file modification > >> time, > >> expose getModificationTime() method to FileSourceSplit. > >> > >> > >> 1. > >> > >> Adding a fileExpireTime user configurable config, any files older > >> than fileWatermark > >> - fileExpireTime would get ignored. > >> 2. > >> > >> When snapshotting splitEnumerator, remove files that are older than > >> fileWatermark > >> - fileExpireTime from the pathsAlreadyProcessed map. > >> 3. > >> > >> Adding alreadyProcessedPaths map and fileWatermark to > >> PendingSplitsCheckpoint, modify the current > >> PendingSplitsCheckpointSerializer to add a version2 serializer that > >> serialize the alreadyProcessedPaths map which included file > modification > >> time. > >> 4. > >> > >> Subclass of PendingSplitsCheckpoint like > >> ContinuousHivePendingSplitsCheckpoint would not be impacted by > >> initializing an empty alreadyProcessedMap and 0 as initial watermark. > >> > >> Thanks! > >> > |
Thanks Till, Guowei and Arvid for the insightful discussion!
1. Regarding size and scan performance We are in the POC stage and not hitting OOM issue yet, the issue is discovered by reading through FileSource implementation. Our order of magnitude is each path 200B and ~8000 files per day, this set would reach 1GB in 1.5+ years. The file path has a S3 TTL on it so scanning could be trivial but in current Flink FileSource implementation the Set of pathsAlreadyProcessed continuously grows regardless of files removed from the path. 2. Regarding data structure for file deduplication I like Guowei's idea of maintaining one max timestamp and a list of files corresponding to the max timestamp only. However Arvid's point on pruning and the example "Consider a file X with modification time M that is only visible in the eventual consistent filesystem at time M2" is a good example that relying only on timestamp is not enough. Ignoring files whose modification time before the max timestamp could end up in ignoring files in edge cases. We still want to decide whether a file has been processed or not by checking against the set of all recently processed files to be accurate. To avoid the set growing infinitely, we prune it at every checkpoint, only removing those that passed user specified TTL (should be in days/months granularity). Only files that the user decides to not process based on its modification time older than (watermark - TTL) and doesn't need to rely on this set to dedup would be removed from the set. Thus a Map of <Path, timestamp> makes sense to me, but would like to see your feedback. 3. Regarding of where the deduplication logic lies In general, it might actually best to move the decision completely into user code land. Maybe we should change FileEnumerator#enumerateSplits to not return duplicates and allow it to be stateful. This makes a lot of sense to me. The dedup logic seems to be related to FileEnumerator only, current Hive subclass are not making use of alreadyProcessedPaths in the Checkpoint. We could probably move the dedup map and watermark to file Checkpoint only. Then we have the PruningEnumerator be an extension of FileEnumerator. Thanks! On Tue, Jun 8, 2021 at 4:57 AM Arvid Heise <[hidden email]> wrote: > Hi Tianxin, > > I assigned you the ticket, so you could go ahead and create some POC PR. I > would like to understand the issue first a bit better and then give some > things to consider. In general, I see your point that in a potentially > infinitely running application keeping track of all read entities will > eventually lead to an OOM. > > 1. About which order of magnitude are we talking? Your average path should > be less than 1kb, so 1 million paths, is 1 GB. I'm assuming you have hit > that limit already and see memory issues on the job manager? > 2. At that point, do we need to improve the scanning already? How long does > that take? > 3. Doesn't NonSplittingRecursiveEnumerator#enumerateSplits always return a > full collection of files as well? So isn't the problem appearing there as > well? > > For pruning, I'm not too sure. I fear that we always run into issues with > filesystems that are not behaving as well as the local filesystem. > 4. If we want to prune the set as you suggested, how can we be sure that we > are not ignoring any file that is supposed to be processed? > Consider a file X with modification time M that is only visible in the > eventual consistent filesystem at time M2. If we have a checkpoint between > M and M2, wouldn't we also prune X, even though it has not been read yet? > 5. With all time-based proposals: what happens if you have any kind of time > drift? > > As an alternative > 6. Couldn't we move the information fully to a state backend, such that > when the application grows, we can use the disk as well? However, I also > saw that we currently do not allow user state in sources at all. > 7. We could also compress the information (hash?). But that's probably just > buying us a little time. > > In general, it might actually best to move the decision completely into > user code land. Maybe we should change FileEnumerator#enumerateSplits to > not return duplicates and allow it to be stateful. Then the user can make a > conscious decision based on the used filesystem. We could provide pruning > enumerator implementations and the user just has to pick the appropriate > one if the default one doesn't work anymore. > > > On Tue, Jun 8, 2021 at 12:54 PM Guowei Ma <[hidden email]> wrote: > > > It would really simplify a lot if the modification timestamp of each > newly > > scanned file is increased. > > > > We only need to record the file list corresponding to the largest > > timestamp. Timestamp of each scanned file > > 1. It is smaller than the maximum timestamp, which means it has been > > processed; > > 2. The timestamps are equal, so you need to see if it is in this file > > list, if it is, you don't need to process it, if it is not, you need to > > process it; > > 3. It is larger than the maximum timestamp and has not been processed > > > > If the maximum timestamp is dynamically restored from the file list > every > > time it is started, the state compatibility issue can be ignored. > > > > > > BTW I haven't done any test, but I am actually a little curious, if a lot > > of files have been processed, is the scan itself already very slow? I > mean > > maybe the bottleneck at the beginning might be scan? > > > > > > > > > 在 2021年6月8日,下午5:41,Till Rohrmann <[hidden email]> 写道: > > > > > > Hi Tianxin, > > > > > > thanks for starting this discussion. I am pulling in Arvid who works on > > > Flink's connectors. > > > > > > I think the problem you are describing can happen. > > > > > > From what I understand you are proposing to keep track of the watermark > > of > > > processed file input splits and then filter out splits based on their > > > modification timestamps and the watermark. What is the benefit of > keeping > > > for every split the modification timestamp in the map? Could it also > work > > > if we sort the input splits according to their modification timestamps > > and > > > then remember the last processed split? That way we only remember a > > single > > > value and upon recovery, we only process those splits which have a > newer > > > modification timestamp. > > > > > > Cheers, > > > Till > > > > > >> On Tue, Jun 8, 2021 at 12:11 AM Tianxin Zhao <[hidden email]> > > wrote: > > >> > > >> Hi! > > >> > > >> Currently Flink File Source relies on a Set<Path> > pathsAlreadyProcessed > > in > > >> SplitEnumerator to decide which file has been processed and avoids > > >> reprocessing files if a file is already in this set. However this set > > could > > >> be ever growing and ultimately exceed memory size if there are new > files > > >> continuously added to the input path. > > >> > > >> I submitted https://issues.apache.org/jira/browse/FLINK-22792 and > would > > >> like to be assigned to the ticket. > > >> > > >> Current proposed change as belows, would like to get an agreement on > the > > >> approach taken. > > >> > > >> 1. > > >> > > >> Maintain fileWatermark updated by new files modification time in > > >> ContinuousFileSplitEnumerator > > >> 2. > > >> > > >> Change Set<Path> pathsAlreadyProcessed to a HashMap<Path, Long> > > >> pathsAlreadyProcessed where the key is same as before which is the > > file > > >> path of already processed files, and the value is file modification > > >> time, > > >> expose getModificationTime() method to FileSourceSplit. > > >> > > >> > > >> 1. > > >> > > >> Adding a fileExpireTime user configurable config, any files older > > >> than fileWatermark > > >> - fileExpireTime would get ignored. > > >> 2. > > >> > > >> When snapshotting splitEnumerator, remove files that are older than > > >> fileWatermark > > >> - fileExpireTime from the pathsAlreadyProcessed map. > > >> 3. > > >> > > >> Adding alreadyProcessedPaths map and fileWatermark to > > >> PendingSplitsCheckpoint, modify the current > > >> PendingSplitsCheckpointSerializer to add a version2 serializer that > > >> serialize the alreadyProcessedPaths map which included file > > modification > > >> time. > > >> 4. > > >> > > >> Subclass of PendingSplitsCheckpoint like > > >> ContinuousHivePendingSplitsCheckpoint would not be impacted by > > >> initializing an empty alreadyProcessedMap and 0 as initial > watermark. > > >> > > >> Thanks! > > >> > > > |
Free forum by Nabble | Edit this page |