Need input on creating s3 source DataStream to filter Json file before pass to actual reader

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Need input on creating s3 source DataStream to filter Json file before pass to actual reader

DEEP NARAYAN Singh
Hi Guys,

As part of flink s3 source reader can we do filter on json file name
containing date and time as mentioned below :
[image: image.png]
I am able to read all the json files under a specific date but my use case
is to read  selected hour and minutes json files based on date range as
shown in the below sample directory structure for multiple device ids.




*s3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:00:00_25700.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:05:00_25701.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:10:00_25702.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:15:00_25703.json*

How do we filter specific files for specific dates under specific hours and
minutes wise before passing them to the source reader .

Below is my current code snippet for creating Input stream:

public static DataStream<String>
createInputStream(StreamExecutionEnvironment environment, String directory,
Integer readerParallelism) {

TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path("
s3-success-messages/dev/location/device1/data/2020-08-09 "));
format.setNestedFileEnumeration(true);
format.setMinSplitSize(10);
return environment
.readFile(format, directory, FileProcessingMode.PROCESS_ONCE, -1,
FilePathFilter.createDefaultFilter())
.name(directory).setParallelism(readerParallelism);
}

Do we have any API's available as part of flink s3 source to filter the
json before push to the next operator to process based on selected date
range.

Thanks,
-Deep