|
Hi Guys,
As part of flink s3 source reader can we do filter on json file name
containing date and time as mentioned below :
[image: image.png]
I am able to read all the json files under a specific date but my use case
is to read selected hour and minutes json files based on date range as
shown in the below sample directory structure for multiple device ids.
*s3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:00:00_25700.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:05:00_25701.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:10:00_25702.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:15:00_25703.json*
How do we filter specific files for specific dates under specific hours and
minutes wise before passing them to the source reader .
Below is my current code snippet for creating Input stream:
public static DataStream<String>
createInputStream(StreamExecutionEnvironment environment, String directory,
Integer readerParallelism) {
TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path("
s3-success-messages/dev/location/device1/data/2020-08-09 "));
format.setNestedFileEnumeration(true);
format.setMinSplitSize(10);
return environment
.readFile(format, directory, FileProcessingMode.PROCESS_ONCE, -1,
FilePathFilter.createDefaultFilter())
.name(directory).setParallelism(readerParallelism);
}
Do we have any API's available as part of flink s3 source to filter the
json before push to the next operator to process based on selected date
range.
Thanks,
-Deep
|