Need help in creating Flink Streaming s3 Job for multiple path reader one by one

classic Classic list List threaded Threaded
20 messages Options
Reply | Threaded
Open this post in threaded view
|

Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi Guys,
I need one help, any leads will be highly appreciated.I have written a
flink streaming job to read the data from s3 bucket and push to kafka.
Below is the working source that deal with single s3 path:
TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
format.setNestedFileEnumeration(true);
DataStream<String> inputStream = environment.readFile(format,
"s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
FilePathFilter.createDefaultFilter());
inputStream.addSink(kafka);

But my requirement is get the list of paths and pass them one by one to
this environment.readFile() method.How we can achieve this.

Thanks,
Satya
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi Guys,

Sorry to bother you again, but someone could help me here? Any help in this
regard will be much appreciated.

Regards,
Satya

On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]> wrote:

> Hi Guys,
> I need one help, any leads will be highly appreciated.I have written a
> flink streaming job to read the data from s3 bucket and push to kafka.
> Below is the working source that deal with single s3 path:
> TextInputFormat format = new TextInputFormat(new
> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
> format.setNestedFileEnumeration(true);
> DataStream<String> inputStream = environment.readFile(format,
> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
> FilePathFilter.createDefaultFilter());
> inputStream.addSink(kafka);
>
> But my requirement is get the list of paths and pass them one by one to
> this environment.readFile() method.How we can achieve this.
>
> Thanks,
> Satya
>


--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi Guys,

Got stuck with it please help me here
Regards,
Satya

On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <[hidden email]> wrote:

> Hi Guys,
>
> Sorry to bother you again, but someone could help me here? Any help in
> this regard will be much appreciated.
>
> Regards,
> Satya
>
> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]>
> wrote:
>
>> Hi Guys,
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> I need one help, any leads will be highly appreciated.I have written a
>> flink streaming job to read the data from s3 bucket and push to kafka.
>> Below is the working source that deal with single s3 path:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> TextInputFormat format = new TextInputFormat(new
>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>
>>
>>
>>
>> format.setNestedFileEnumeration(true);
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> DataStream<String> inputStream = environment.readFile(format,
>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>
>>
>>
>>
>> FilePathFilter.createDefaultFilter());
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> inputStream.addSink(kafka);
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> But my requirement is get the list of paths and pass them one by one to
>> this environment.readFile() method.How we can achieve this.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Thanks,
>>
>>
>>
>>
>> Satya
>>
>>
>>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
>
> --
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Chesnay Schepler-3
Do you know the list of directories when you submit the job?

If so, then you can iterate over them, create a source for each
directory, union them, and apply the sink to the union.

private static DataStream<String>createInputStream(StreamExecutionEnvironment environment, String directory) {
    TextInputFormat format =new TextInputFormat(new org.apache.flink.core.fs.Path(directory)); format.setNestedFileEnumeration(true); return environment.readFile(format, directory, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter()); }

public static void runJob()throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); List<String> directories =getDirectories(); DataStream<String> joinedStreams =null; for (String directory : directories) {
       DataStream<String> inputStream =createInputStream(environment, directory); if (joinedStreams ==null) {
          joinedStreams = inputStream; }else {
          joinedStreams.union(inputStream); }
    }
    // add a sanity check that there was at least 1 directory

    joinedStreams.addSink(kafka); }



On 10/1/2020 9:08 AM, Satyaa Dixit wrote:

> Hi Guys,
>
> Got stuck with it please help me here
> Regards,
> Satya
>
> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <[hidden email]> wrote:
>
>> Hi Guys,
>>
>> Sorry to bother you again, but someone could help me here? Any help in
>> this regard will be much appreciated.
>>
>> Regards,
>> Satya
>>
>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]>
>> wrote:
>>
>>> Hi Guys,
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> I need one help, any leads will be highly appreciated.I have written a
>>> flink streaming job to read the data from s3 bucket and push to kafka.
>>> Below is the working source that deal with single s3 path:
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> TextInputFormat format = new TextInputFormat(new
>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>
>>>
>>>
>>>
>>> format.setNestedFileEnumeration(true);
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> DataStream<String> inputStream = environment.readFile(format,
>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>
>>>
>>>
>>>
>>> FilePathFilter.createDefaultFilter());
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> inputStream.addSink(kafka);
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> But my requirement is get the list of paths and pass them one by one to
>>> this environment.readFile() method.How we can achieve this.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>> Satya
>>>
>>>
>>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>>
>> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>

Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Chesnay Schepler-3
You could also try using streams to make it a little more concise:

directories.stream()
    .map(directory ->createInputStream(environment, directory))
    .reduce(DataStream::union)
    .map(joinedStream -> joinedStream.addSink(kafka));


On 10/1/2020 9:48 AM, Chesnay Schepler wrote:

> Do you know the list of directories when you submit the job?
>
> If so, then you can iterate over them, create a source for each
> directory, union them, and apply the sink to the union.
>
> private static
> DataStream<String>createInputStream(StreamExecutionEnvironment
> environment, String directory) {
>    TextInputFormat format =new TextInputFormat(new
> org.apache.flink.core.fs.Path(directory));
> format.setNestedFileEnumeration(true); return
> environment.readFile(format, directory,
> FileProcessingMode.PROCESS_ONCE, -1,
> FilePathFilter.createDefaultFilter()); }
>
> public static void runJob()throws Exception {
>    StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
> directories =getDirectories(); DataStream<String> joinedStreams =null;
> for (String directory : directories) {
>       DataStream<String> inputStream =createInputStream(environment,
> directory); if (joinedStreams ==null) {
>          joinedStreams = inputStream; }else {
>          joinedStreams.union(inputStream); }
>    }
>    // add a sanity check that there was at least 1 directory
>
>    joinedStreams.addSink(kafka); }
>
>
>
> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>> Hi Guys,
>>
>> Got stuck with it please help me here
>> Regards,
>> Satya
>>
>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <[hidden email]>
>> wrote:
>>
>>> Hi Guys,
>>>
>>> Sorry to bother you again, but someone could help me here? Any help in
>>> this regard will be much appreciated.
>>>
>>> Regards,
>>> Satya
>>>
>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]>
>>> wrote:
>>>
>>>> Hi Guys,
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> I need one help, any leads will be highly appreciated.I have written a
>>>> flink streaming job to read the data from s3 bucket and push to kafka.
>>>> Below is the working source that deal with single s3 path:
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> TextInputFormat format = new TextInputFormat(new
>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>
>>>>
>>>>
>>>>
>>>> format.setNestedFileEnumeration(true);
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> DataStream<String> inputStream = environment.readFile(format,
>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>>
>>>>
>>>>
>>>>
>>>> FilePathFilter.createDefaultFilter());
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> inputStream.addSink(kafka);
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> But my requirement is get the list of paths and pass them one by
>>>> one to
>>>> this environment.readFile() method.How we can achieve this.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>>
>>>>
>>>>
>>>> Satya
>>>>
>>>>
>>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>>
>>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Thank you @Chesnay let me try this change .

On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <[hidden email]> wrote:

> You could also try using streams to make it a little more concise:
>
> directories.stream()
>    .map(directory -> createInputStream(environment, directory))
>    .reduce(DataStream::union)
>    .map(joinedStream -> joinedStream.addSink(kafka));
>
>
> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>
> Do you know the list of directories when you submit the job?
>
> If so, then you can iterate over them, create a source for each directory,
> union them, and apply the sink to the union.
>
> private static
> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
> String directory) {
>    TextInputFormat format =new TextInputFormat(new
> org.apache.flink.core.fs.Path(directory));
> format.setNestedFileEnumeration(true); return environment.readFile(format,
> directory, FileProcessingMode.PROCESS_ONCE, -1,
> FilePathFilter.createDefaultFilter()); }
>
> public static void runJob()throws Exception {
>    StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
> directories =getDirectories(); DataStream<String> joinedStreams =null; for
> (String directory : directories) {
>       DataStream<String> inputStream =createInputStream(environment,
> directory); if (joinedStreams ==null) {
>          joinedStreams = inputStream; }else {
>          joinedStreams.union(inputStream); }
>    }
>    // add a sanity check that there was at least 1 directory
>
>    joinedStreams.addSink(kafka); }
>
>
>
> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>
> Hi Guys,
>
> Got stuck with it please help me here
> Regards,
> Satya
>
> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <[hidden email]>
> <[hidden email]> wrote:
>
> Hi Guys,
>
> Sorry to bother you again, but someone could help me here? Any help in
> this regard will be much appreciated.
>
> Regards,
> Satya
>
> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]>
> <[hidden email]>
> wrote:
>
> Hi Guys,
>
>
>
>
>
>
>
>
>
>
>
>
> I need one help, any leads will be highly appreciated.I have written a
> flink streaming job to read the data from s3 bucket and push to kafka.
> Below is the working source that deal with single s3 path:
>
>
>
>
>
>
>
>
>
>
>
>
> TextInputFormat format = new TextInputFormat(new
> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>
>
>
>
> format.setNestedFileEnumeration(true);
>
>
>
>
>
>
>
>
>
>
>
>
> DataStream<String> inputStream = environment.readFile(format,
> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>
>
>
>
> FilePathFilter.createDefaultFilter());
>
>
>
>
>
>
>
>
>
>
>
>
> inputStream.addSink(kafka);
>
>
>
>
>
>
>
>
>
> But my requirement is get the list of paths and pass them one by one to
> this environment.readFile() method.How we can achieve this.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Thanks,
>
>
>
>
> Satya
>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
>
> --
>
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
>
>
>
>

--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi @[hidden email] <[hidden email]> ,

Thanks for your support, it was really helpful.
Do you know the list of directories when you submit the job? [Yes we do
have]
The impletemation is progress and will get back to you if any further
challenges we may face.
Appreciate your support in this regard.

Regards,
Satya

On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[hidden email]> wrote:

> Thank you @Chesnay let me try this change .
>
> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <[hidden email]>
> wrote:
>
>> You could also try using streams to make it a little more concise:
>>
>> directories.stream()
>>    .map(directory -> createInputStream(environment, directory))
>>    .reduce(DataStream::union)
>>    .map(joinedStream -> joinedStream.addSink(kafka));
>>
>>
>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>
>> Do you know the list of directories when you submit the job?
>>
>> If so, then you can iterate over them, create a source for each
>> directory, union them, and apply the sink to the union.
>>
>> private static
>> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
>> String directory) {
>>    TextInputFormat format =new TextInputFormat(new
>> org.apache.flink.core.fs.Path(directory));
>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>> FilePathFilter.createDefaultFilter()); }
>>
>> public static void runJob()throws Exception {
>>    StreamExecutionEnvironment environment =
>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>> (String directory : directories) {
>>       DataStream<String> inputStream =createInputStream(environment,
>> directory); if (joinedStreams ==null) {
>>          joinedStreams = inputStream; }else {
>>          joinedStreams.union(inputStream); }
>>    }
>>    // add a sanity check that there was at least 1 directory
>>
>>    joinedStreams.addSink(kafka); }
>>
>>
>>
>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>
>> Hi Guys,
>>
>> Got stuck with it please help me here
>> Regards,
>> Satya
>>
>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <[hidden email]>
>> <[hidden email]> wrote:
>>
>> Hi Guys,
>>
>> Sorry to bother you again, but someone could help me here? Any help in
>> this regard will be much appreciated.
>>
>> Regards,
>> Satya
>>
>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]>
>> <[hidden email]>
>> wrote:
>>
>> Hi Guys,
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> I need one help, any leads will be highly appreciated.I have written a
>> flink streaming job to read the data from s3 bucket and push to kafka.
>> Below is the working source that deal with single s3 path:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> TextInputFormat format = new TextInputFormat(new
>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>
>>
>>
>>
>> format.setNestedFileEnumeration(true);
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> DataStream<String> inputStream = environment.readFile(format,
>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>
>>
>>
>>
>> FilePathFilter.createDefaultFilter());
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> inputStream.addSink(kafka);
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> But my requirement is get the list of paths and pass them one by one to
>> this environment.readFile() method.How we can achieve this.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Thanks,
>>
>>
>>
>>
>> Satya
>>
>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>>
>> --
>>
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>>
>>
>>
>>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>


--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi  Shesnay/Team,

Thank you so much for the reply.In the continuation of the previous email, below is the block diagram where I am reading the file  from s3 and pushing it to kafka.Now with the current setup, I have total 4 directory based on the readfile method  from flink environment ,we are  creating 4 readers parallely to process the data from s3 .

Below are my Questions:
1. Can we restrict the no. of readers to process the  data parallely.  e.g let's say if  we have a thousand of directory , in that case i want to restrict the no. of readers to 10 and ten parallel threads will continue with 100 sequential reading of the directory per thread to consume the data .

2.In between the two flink operators i.e s3 reader and kafka sink , i just want to implement one more operator in order to transform the data which i am reading from s3 bucket and then want to push into the kafka sink. Below is my working code.Here i am finding  difficulties to implement  map operator in order to transform the union of datastreams  by applying union method over each directory's reader before pushing to kafka.

List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);

s3PathList.stream()
.map(directory -> S3Service.customInputStream(environment, directory, readerParallelism))
.reduce(DataStream::union)
.map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName));


Something like this I'm trying to do in order to achieve the above use case by applying FlatMap, it could be map as well:
s3PathList.stream()
.map(directory -> S3Service.customInputStream(environment, directory, readerParallelism))
.reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream, String>() {
   @Override
   public void flatMap(String value, Collector<String> out) throws Exception {
    FinalJsonMessage m=objectMapper.readValue(value, FinalJsonMessage.class);
    System.out.println("Json string:: ------"+m);
     //transformation logic
     out.collect(value);
   }
 })
.map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName));

Request your support on the same.
Regards,
Satya

On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <[hidden email]> wrote:

Thanks for your support, it was really helpful.
Do you know the list of directories when you submit the job? [Yes we do have]
The impletemation is progress and will get back to you if any further challenges we may face.
Appreciate your support in this regard.

Regards,
Satya

On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[hidden email]> wrote:
Thank you @Chesnay let me try this change .

On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <[hidden email]> wrote:
You could also try using streams to make it a little more concise:

directories.stream()
   .map(directory -> createInputStream(environment, directory))
   .reduce(DataStream::union)
   .map(joinedStream -> joinedStream.addSink(kafka));

On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
Do you know the list of directories when you submit the job?

If so, then you can iterate over them, create a source for each directory, union them, and apply the sink to the union.

private static DataStream<String>createInputStream(StreamExecutionEnvironment environment, String directory) {
   TextInputFormat format =new TextInputFormat(new org.apache.flink.core.fs.Path(directory)); format.setNestedFileEnumeration(true); return environment.readFile(format, directory, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter()); }

public static void runJob()throws Exception {
   StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); List<String> directories =getDirectories(); DataStream<String> joinedStreams =null; for (String directory : directories) {
      DataStream<String> inputStream =createInputStream(environment, directory); if (joinedStreams ==null) {
         joinedStreams = inputStream; }else {
         joinedStreams.union(inputStream); }
   }
   // add a sanity check that there was at least 1 directory

   joinedStreams.addSink(kafka); }



On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
Hi Guys,

Got stuck with it please help me here
Regards,
Satya

On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit [hidden email] wrote:

Hi Guys,

Sorry to bother you again, but someone could help me here? Any help in
this regard will be much appreciated.

Regards,
Satya

On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit [hidden email]
wrote:

Hi Guys,












I need one help, any leads will be highly appreciated.I have written a
flink streaming job to read the data from s3 bucket and push to kafka.
Below is the working source that deal with single s3 path:












TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));




format.setNestedFileEnumeration(true);












DataStream<String> inputStream = environment.readFile(format,
"s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,




FilePathFilter.createDefaultFilter());












inputStream.addSink(kafka);









But my requirement is get the list of paths and pass them one by one to
this environment.readFile() method.How we can achieve this.

















Thanks,




Satya



--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913


--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913






--
--------------------------
Best Regards
Satya Prakash 
(M)+91-9845111913


--
--------------------------
Best Regards
Satya Prakash 
(M)+91-9845111913


--
--------------------------
Best Regards
Satya Prakash 
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi Team,

Could you please help me here. I’m sorry for asking on such short notice but my work has stopped due to this.


Regards,
Satya

On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]> wrote:
Hi  Shesnay/Team,

Thank you so much for the reply.In the continuation of the previous email, below is the block diagram where I am reading the file  from s3 and pushing it to kafka.Now with the current setup, I have total 4 directory based on the readfile method  from flink environment ,we are  creating 4 readers parallely to process the data from s3 .

Below are my Questions:
1. Can we restrict the no. of readers to process the  data parallely.  e.g let's say if  we have a thousand of directory , in that case i want to restrict the no. of readers to 10 and ten parallel threads will continue with 100 sequential reading of the directory per thread to consume the data .

2.In between the two flink operators i.e s3 reader and kafka sink , i just want to implement one more operator in order to transform the data which i am reading from s3 bucket and then want to push into the kafka sink. Below is my working code.Here i am finding  difficulties to implement  map operator in order to transform the union of datastreams  by applying union method over each directory's reader before pushing to kafka.

List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);

s3PathList.stream()
.map(directory -> S3Service.customInputStream(environment, directory, readerParallelism))
.reduce(DataStream::union)
.map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName));


Something like this I'm trying to do in order to achieve the above use case by applying FlatMap, it could be map as well:
s3PathList.stream()
.map(directory -> S3Service.customInputStream(environment, directory, readerParallelism))
.reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream, String>() {
   @Override
   public void flatMap(String value, Collector<String> out) throws Exception {
    FinalJsonMessage m=objectMapper.readValue(value, FinalJsonMessage.class);
    System.out.println("Json string:: ------"+m);
     //transformation logic
     out.collect(value);
   }
 })
.map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName));

Request your support on the same.
Regards,
Satya

On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <[hidden email]> wrote:

Thanks for your support, it was really helpful.
Do you know the list of directories when you submit the job? [Yes we do have]
The impletemation is progress and will get back to you if any further challenges we may face.
Appreciate your support in this regard.

Regards,
Satya

On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[hidden email]> wrote:
Thank you @Chesnay let me try this change .

On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <[hidden email]> wrote:
You could also try using streams to make it a little more concise:

directories.stream()
   .map(directory -> createInputStream(environment, directory))
   .reduce(DataStream::union)
   .map(joinedStream -> joinedStream.addSink(kafka));

On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
Do you know the list of directories when you submit the job?

If so, then you can iterate over them, create a source for each directory, union them, and apply the sink to the union.

private static DataStream<String>createInputStream(StreamExecutionEnvironment environment, String directory) {
   TextInputFormat format =new TextInputFormat(new org.apache.flink.core.fs.Path(directory)); format.setNestedFileEnumeration(true); return environment.readFile(format, directory, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter()); }

public static void runJob()throws Exception {
   StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); List<String> directories =getDirectories(); DataStream<String> joinedStreams =null; for (String directory : directories) {
      DataStream<String> inputStream =createInputStream(environment, directory); if (joinedStreams ==null) {
         joinedStreams = inputStream; }else {
         joinedStreams.union(inputStream); }
   }
   // add a sanity check that there was at least 1 directory

   joinedStreams.addSink(kafka); }



On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
Hi Guys,

Got stuck with it please help me here
Regards,
Satya

On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit [hidden email] wrote:

Hi Guys,

Sorry to bother you again, but someone could help me here? Any help in
this regard will be much appreciated.

Regards,
Satya

On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit [hidden email]
wrote:

Hi Guys,












I need one help, any leads will be highly appreciated.I have written a
flink streaming job to read the data from s3 bucket and push to kafka.
Below is the working source that deal with single s3 path:












TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));




format.setNestedFileEnumeration(true);












DataStream<String> inputStream = environment.readFile(format,
"s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,




FilePathFilter.createDefaultFilter());












inputStream.addSink(kafka);









But my requirement is get the list of paths and pass them one by one to
this environment.readFile() method.How we can achieve this.

















Thanks,




Satya



--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913


--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913






--
--------------------------
Best Regards
Satya Prakash 
(M)+91-9845111913


--
--------------------------
Best Regards
Satya Prakash 
(M)+91-9845111913


--
--------------------------
Best Regards
Satya Prakash 
(M)+91-9845111913
--
--------------------------
Best Regards
Satya Prakash 
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Chesnay Schepler-3
1) There's no mechanism in the API to restrict the number of  number of
readers across several sources. I can't quite think of a way to achieve
this; maybe Kostas has an idea.

2) You're mixing  up the Java Streams and Finks DataStream API.

Try this:

s3PathList.stream()
.map(...)
.reduce(...)
.map(joinedStream -> stream.map(new FlatMapFunction...))
.map(joinedStream-> joinedStream.addSink...)

On 10/12/2020 6:05 AM, Satyaa Dixit wrote:

> Hi Team,
>
> Could you please help me here. I’m sorry for asking on such short
> notice but my work has stopped due to this.
>
>
> Regards,
> Satya
>
> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Shesnay/Team,
>
>     Thank you so much for the reply.In the continuation of the
>     previous email, below is the block diagram where I am reading the
>     file  from s3 and pushing it to kafka.Now with the current setup,
>     I have total 4 directory based on the readfile method  from flink
>     environment ,we are  creating 4 readers parallely to process the
>     data from s3 .
>
>     Below are my Questions:
>     1. Can we restrict the no. of readers to process the data
>     parallely.  e.g let's say if  we have a thousand of directory , in
>     that case i want to restrict the no. of readers to 10 and ten
>     parallel threads will continue with 100 sequential reading of the
>     directory per thread to consume the data .
>
>     2.In between the two flink operators i.e s3 reader and kafka sink
>     , i just want to implement one more operator in order to
>     transform the data which i am reading from s3 bucket and then want
>     to push into the kafka sink. Below is my working code.Here i am
>     finding  difficulties to implement  map operator in order to
>     transform the union of datastreams  by applying union method over
>     each directory's reader before pushing to kafka.
>
>     List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);
>
>     s3PathList.stream()
>     .map(directory -> S3Service.customInputStream(environment,
>     directory, readerParallelism))
>     .reduce(DataStream::union)
>     .map(joinedStream ->
>     joinedStream.addSink(kafkaProducer).name("Publish to " +
>     kafkaTopicName));
>
>
>     *Something like this I'm trying to do in order to achieve the
>     above use case by applying FlatMap, it could be map as well:*
>     s3PathList.stream()
>     .map(directory -> S3Service.customInputStream(environment,
>     directory, readerParallelism))
>     .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>     String>() {
>        @Override
>        public void flatMap(String value, Collector<String> out) throws
>     Exception {
>         FinalJsonMessage m=objectMapper.readValue(value,
>     FinalJsonMessage.class);
>         System.out.println("Json string:: ------"+m);
>        //transformation logic
>          out.collect(value);
>        }
>      })
>     .map(joinedStream ->
>     joinedStream.addSink(kafkaProducer).name("Publish to " +
>     kafkaTopicName));
>     image.png
>     Request your support on the same.
>     Regards,
>     Satya
>
>     On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit
>     <[hidden email] <mailto:[hidden email]>> wrote:
>
>         Hi @[hidden email] <mailto:[hidden email]> ,
>
>         Thanks for your support, it was really helpful.
>         Do you know the list of directories when you submit the
>         job? [Yes we do have]
>         The impletemation is progress and will get back to you if any
>         further challenges we may face.
>         Appreciate your support in this regard.
>
>         Regards,
>         Satya
>
>         On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Thank you @Chesnay let me try this change .
>
>             On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler
>             <[hidden email] <mailto:[hidden email]>> wrote:
>
>                 You could also try using streams to make it a little
>                 more concise:
>
>                 directories.stream()
>                     .map(directory ->createInputStream(environment, directory))
>                     .reduce(DataStream::union)
>                     .map(joinedStream -> joinedStream.addSink(kafka));
>
>
>                 On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>                 Do you know the list of directories when you submit
>>                 the job?
>>
>>                 If so, then you can iterate over them, create a
>>                 source for each directory, union them, and apply the
>>                 sink to the union.
>>
>>                 private static
>>                 DataStream<String>createInputStream(StreamExecutionEnvironment
>>                 environment, String directory) {
>>                    TextInputFormat format =new TextInputFormat(new
>>                 org.apache.flink.core.fs.Path(directory));
>>                 format.setNestedFileEnumeration(true); return
>>                 environment.readFile(format, directory,
>>                 FileProcessingMode.PROCESS_ONCE, -1,
>>                 FilePathFilter.createDefaultFilter()); }
>>
>>                 public static void runJob()throws Exception {
>>                    StreamExecutionEnvironment environment =
>>                 StreamExecutionEnvironment.getExecutionEnvironment();
>>                 List<String> directories =getDirectories();
>>                 DataStream<String> joinedStreams =null; for (String
>>                 directory : directories) {
>>                       DataStream<String> inputStream
>>                 =createInputStream(environment, directory); if
>>                 (joinedStreams ==null) {
>>                          joinedStreams = inputStream; }else {
>>                          joinedStreams.union(inputStream); }
>>                    }
>>                    // add a sanity check that there was at least 1
>>                 directory
>>
>>                    joinedStreams.addSink(kafka); }
>>
>>
>>
>>                 On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>                 Hi Guys,
>>>
>>>                 Got stuck with it please help me here
>>>                 Regards,
>>>                 Satya
>>>
>>>                 On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>>>                 <[hidden email]>
>>>                 <mailto:[hidden email]> wrote:
>>>
>>>>                 Hi Guys,
>>>>
>>>>                 Sorry to bother you again, but someone could help
>>>>                 me here? Any help in
>>>>                 this regard will be much appreciated.
>>>>
>>>>                 Regards,
>>>>                 Satya
>>>>
>>>>                 On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit
>>>>                 <[hidden email]> <mailto:[hidden email]>
>>>>                 wrote:
>>>>
>>>>>                 Hi Guys,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 I need one help, any leads will be highly
>>>>>                 appreciated.I have written a
>>>>>                 flink streaming job to read the data from s3
>>>>>                 bucket and push to kafka.
>>>>>                 Below is the working source that deal with single
>>>>>                 s3 path:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 TextInputFormat format = new TextInputFormat(new
>>>>>                 org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 format.setNestedFileEnumeration(true);
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 DataStream<String> inputStream =
>>>>>                 environment.readFile(format,
>>>>>                 "s3a://directory/2020-09-03/",
>>>>>                 FileProcessingMode.PROCESS_ONCE, -1,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 FilePathFilter.createDefaultFilter());
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 inputStream.addSink(kafka);
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 But my requirement is get the list of paths and
>>>>>                 pass them one by one to
>>>>>                 this environment.readFile() method.How we can
>>>>>                 achieve this.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 Thanks,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 Satya
>>>>>
>>>>>
>>>>>
>>>>                 --
>>>>                 --------------------------
>>>>                 Best Regards
>>>>                 Satya Prakash
>>>>                 (M)+91-9845111913
>>>>
>>>>
>>>>                 --
>>>                 --------------------------
>>>                 Best Regards
>>>                 Satya Prakash
>>>                 (M)+91-9845111913
>>>
>>
>>
>
>
>
>             --
>             --------------------------
>             Best Regards
>             Satya Prakash
>             (M)+91-9845111913
>
>
>
>         --
>         --------------------------
>         Best Regards
>         Satya Prakash
>         (M)+91-9845111913
>
>
>
>     --
>     --------------------------
>     Best Regards
>     Satya Prakash
>     (M)+91-9845111913
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913


Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Thanks, I'll check it out.

On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <[hidden email]> wrote:

> 1) There's no mechanism in the API to restrict the number of  number of
> readers across several sources. I can't quite think of a way to achieve
> this; maybe Kostas has an idea.
>
> 2) You're mixing  up the Java Streams and Finks DataStream API.
>
> Try this:
>
> s3PathList.stream()
> .map(...)
> .reduce(...)
> .map(joinedStream -> stream.map(new FlatMapFunction...))
> .map(joinedStream->  joinedStream.addSink...)
>
> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>
> Hi Team,
>
> Could you please help me here. I’m sorry for asking on such short notice
> but my work has stopped due to this.
>
>
> Regards,
> Satya
>
> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]> wrote:
>
>> Hi  Shesnay/Team,
>>
>> Thank you so much for the reply.In the continuation of the previous
>> email, below is the block diagram where I am reading the file  from s3 and
>> pushing it to kafka.Now with the current setup, I have total 4 directory
>> based on the readfile method  from flink environment ,we are  creating 4
>> readers parallely to process the data from s3 .
>>
>> Below are my Questions:
>> 1. Can we restrict the no. of readers to process the  data parallely.
>> e.g let's say if  we have a thousand of directory , in that case i want to
>> restrict the no. of readers to 10 and ten parallel threads will continue
>> with 100 sequential reading of the directory per thread to consume the data
>> .
>>
>> 2.In between the two flink operators i.e s3 reader and kafka sink , i
>> just want to implement one more operator in order to transform the data
>> which i am reading from s3 bucket and then want to push into the kafka
>> sink. Below is my working code.Here i am finding  difficulties to
>> implement  map operator in order to transform the union of datastreams  by
>> applying union method over each directory's reader before pushing to kafka.
>>
>> List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);
>>
>> s3PathList.stream()
>> .map(directory -> S3Service.customInputStream(environment, directory,
>> readerParallelism))
>> .reduce(DataStream::union)
>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to
>> " + kafkaTopicName));
>>
>>
>> *Something like this I'm trying to do in order to achieve the above use
>> case by applying FlatMap, it could be map as well:*
>> s3PathList.stream()
>> .map(directory -> S3Service.customInputStream(environment, directory,
>> readerParallelism))
>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>> String>() {
>>    @Override
>>    public void flatMap(String value, Collector<String> out) throws
>> Exception {
>>     FinalJsonMessage m=objectMapper.readValue(value,
>> FinalJsonMessage.class);
>>     System.out.println("Json string:: ------"+m);
>>      //transformation logic
>>      out.collect(value);
>>    }
>>  })
>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish to
>> " + kafkaTopicName));
>> [image: image.png]
>> Request your support on the same.
>> Regards,
>> Satya
>>
>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <[hidden email]>
>> wrote:
>>
>>> Hi @[hidden email] <[hidden email]> ,
>>>
>>> Thanks for your support, it was really helpful.
>>> Do you know the list of directories when you submit the job? [Yes we do
>>> have]
>>> The impletemation is progress and will get back to you if any further
>>> challenges we may face.
>>> Appreciate your support in this regard.
>>>
>>> Regards,
>>> Satya
>>>
>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[hidden email]>
>>> wrote:
>>>
>>>> Thank you @Chesnay let me try this change .
>>>>
>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <[hidden email]>
>>>> wrote:
>>>>
>>>>> You could also try using streams to make it a little more concise:
>>>>>
>>>>> directories.stream()
>>>>>    .map(directory -> createInputStream(environment, directory))
>>>>>    .reduce(DataStream::union)
>>>>>    .map(joinedStream -> joinedStream.addSink(kafka));
>>>>>
>>>>>
>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Do you know the list of directories when you submit the job?
>>>>>
>>>>> If so, then you can iterate over them, create a source for each
>>>>> directory, union them, and apply the sink to the union.
>>>>>
>>>>> private static
>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
>>>>> String directory) {
>>>>>    TextInputFormat format =new TextInputFormat(new
>>>>> org.apache.flink.core.fs.Path(directory));
>>>>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>>>> FilePathFilter.createDefaultFilter()); }
>>>>>
>>>>> public static void runJob()throws Exception {
>>>>>    StreamExecutionEnvironment environment =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>>>>> (String directory : directories) {
>>>>>       DataStream<String> inputStream =createInputStream(environment,
>>>>> directory); if (joinedStreams ==null) {
>>>>>          joinedStreams = inputStream; }else {
>>>>>          joinedStreams.union(inputStream); }
>>>>>    }
>>>>>    // add a sanity check that there was at least 1 directory
>>>>>
>>>>>    joinedStreams.addSink(kafka); }
>>>>>
>>>>>
>>>>>
>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>>>
>>>>> Hi Guys,
>>>>>
>>>>> Got stuck with it please help me here
>>>>> Regards,
>>>>> Satya
>>>>>
>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <[hidden email]>
>>>>> <[hidden email]> wrote:
>>>>>
>>>>> Hi Guys,
>>>>>
>>>>> Sorry to bother you again, but someone could help me here? Any help in
>>>>> this regard will be much appreciated.
>>>>>
>>>>> Regards,
>>>>> Satya
>>>>>
>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]>
>>>>> <[hidden email]>
>>>>> wrote:
>>>>>
>>>>> Hi Guys,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I need one help, any leads will be highly appreciated.I have written a
>>>>> flink streaming job to read the data from s3 bucket and push to kafka.
>>>>> Below is the working source that deal with single s3 path:
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> TextInputFormat format = new TextInputFormat(new
>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> format.setNestedFileEnumeration(true);
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> DataStream<String> inputStream = environment.readFile(format,
>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> FilePathFilter.createDefaultFilter());
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> inputStream.addSink(kafka);
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> But my requirement is get the list of paths and pass them one by one
>>>>> to
>>>>> this environment.readFile() method.How we can achieve this.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Satya
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> --------------------------
>>>>> Best Regards
>>>>> Satya Prakash
>>>>> (M)+91-9845111913
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> --------------------------
>>>>> Best Regards
>>>>> Satya Prakash
>>>>> (M)+91-9845111913
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> --------------------------
>>>> Best Regards
>>>> Satya Prakash
>>>> (M)+91-9845111913
>>>>
>>>
>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
>
>

--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi Chesnay/Team

Thank you so much.I have tried with the solution but it is not working as
expected showing compilation issues and tried all the ways .Please find
below code snippet :

s3PathList.stream()
.map(directory -> S3Service.customCreateInputStream(environment, directory,
readerParallelism))
.reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
IntermidiateOperator()).map(joinedStream ->
joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName));

public static class IntermidiateOperator implements FlatMapFunction<String,
String> {
private static final ObjectMapper objectMapper1 = new ObjectMapper();

@Override
public void flatMap(String value, Collector<String> out) throws Exception {
Test m = objectMapper1.readValue(value, Test.class);
System.out.println("Json string:: ------" + m);
// logger.info("Json string:: ------"+m);
out.collect(value);
}
}

Also just to clarify one doubt , How to handle *FileNotFoundException* as
part of flink reader during runtime if in case directory is not available
in s3. How to avoid job failure in that use case.

Regards,
Satya

On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <[hidden email]> wrote:

> Thanks, I'll check it out.
>
> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <[hidden email]>
> wrote:
>
>> 1) There's no mechanism in the API to restrict the number of  number of
>> readers across several sources. I can't quite think of a way to achieve
>> this; maybe Kostas has an idea.
>>
>> 2) You're mixing  up the Java Streams and Finks DataStream API.
>>
>> Try this:
>>
>> s3PathList.stream()
>> .map(...)
>> .reduce(...)
>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>> .map(joinedStream->  joinedStream.addSink...)
>>
>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>>
>> Hi Team,
>>
>> Could you please help me here. I’m sorry for asking on such short notice
>> but my work has stopped due to this.
>>
>>
>> Regards,
>> Satya
>>
>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]>
>> wrote:
>>
>>> Hi  Shesnay/Team,
>>>
>>> Thank you so much for the reply.In the continuation of the previous
>>> email, below is the block diagram where I am reading the file  from s3 and
>>> pushing it to kafka.Now with the current setup, I have total 4 directory
>>> based on the readfile method  from flink environment ,we are  creating 4
>>> readers parallely to process the data from s3 .
>>>
>>> Below are my Questions:
>>> 1. Can we restrict the no. of readers to process the  data parallely.
>>> e.g let's say if  we have a thousand of directory , in that case i want to
>>> restrict the no. of readers to 10 and ten parallel threads will continue
>>> with 100 sequential reading of the directory per thread to consume the data
>>> .
>>>
>>> 2.In between the two flink operators i.e s3 reader and kafka sink , i
>>> just want to implement one more operator in order to transform the data
>>> which i am reading from s3 bucket and then want to push into the kafka
>>> sink. Below is my working code.Here i am finding  difficulties to
>>> implement  map operator in order to transform the union of datastreams  by
>>> applying union method over each directory's reader before pushing to kafka.
>>>
>>> List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);
>>>
>>> s3PathList.stream()
>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>> readerParallelism))
>>> .reduce(DataStream::union)
>>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish
>>> to " + kafkaTopicName));
>>>
>>>
>>> *Something like this I'm trying to do in order to achieve the above use
>>> case by applying FlatMap, it could be map as well:*
>>> s3PathList.stream()
>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>> readerParallelism))
>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>>> String>() {
>>>    @Override
>>>    public void flatMap(String value, Collector<String> out) throws
>>> Exception {
>>>     FinalJsonMessage m=objectMapper.readValue(value,
>>> FinalJsonMessage.class);
>>>     System.out.println("Json string:: ------"+m);
>>>      //transformation logic
>>>      out.collect(value);
>>>    }
>>>  })
>>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish
>>> to " + kafkaTopicName));
>>> [image: image.png]
>>> Request your support on the same.
>>> Regards,
>>> Satya
>>>
>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <[hidden email]>
>>> wrote:
>>>
>>>> Hi @[hidden email] <[hidden email]> ,
>>>>
>>>> Thanks for your support, it was really helpful.
>>>> Do you know the list of directories when you submit the job? [Yes we
>>>> do have]
>>>> The impletemation is progress and will get back to you if any further
>>>> challenges we may face.
>>>> Appreciate your support in this regard.
>>>>
>>>> Regards,
>>>> Satya
>>>>
>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thank you @Chesnay let me try this change .
>>>>>
>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> You could also try using streams to make it a little more concise:
>>>>>>
>>>>>> directories.stream()
>>>>>>    .map(directory -> createInputStream(environment, directory))
>>>>>>    .reduce(DataStream::union)
>>>>>>    .map(joinedStream -> joinedStream.addSink(kafka));
>>>>>>
>>>>>>
>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>>>>
>>>>>> Do you know the list of directories when you submit the job?
>>>>>>
>>>>>> If so, then you can iterate over them, create a source for each
>>>>>> directory, union them, and apply the sink to the union.
>>>>>>
>>>>>> private static
>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
>>>>>> String directory) {
>>>>>>    TextInputFormat format =new TextInputFormat(new
>>>>>> org.apache.flink.core.fs.Path(directory));
>>>>>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>>>>> FilePathFilter.createDefaultFilter()); }
>>>>>>
>>>>>> public static void runJob()throws Exception {
>>>>>>    StreamExecutionEnvironment environment =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>>>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>>>>>> (String directory : directories) {
>>>>>>       DataStream<String> inputStream =createInputStream(environment,
>>>>>> directory); if (joinedStreams ==null) {
>>>>>>          joinedStreams = inputStream; }else {
>>>>>>          joinedStreams.union(inputStream); }
>>>>>>    }
>>>>>>    // add a sanity check that there was at least 1 directory
>>>>>>
>>>>>>    joinedStreams.addSink(kafka); }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>>>>
>>>>>> Hi Guys,
>>>>>>
>>>>>> Got stuck with it please help me here
>>>>>> Regards,
>>>>>> Satya
>>>>>>
>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit <[hidden email]>
>>>>>> <[hidden email]> wrote:
>>>>>>
>>>>>> Hi Guys,
>>>>>>
>>>>>> Sorry to bother you again, but someone could help me here? Any help
>>>>>> in
>>>>>> this regard will be much appreciated.
>>>>>>
>>>>>> Regards,
>>>>>> Satya
>>>>>>
>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]>
>>>>>> <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Guys,
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I need one help, any leads will be highly appreciated.I have written
>>>>>> a
>>>>>> flink streaming job to read the data from s3 bucket and push to
>>>>>> kafka.
>>>>>> Below is the working source that deal with single s3 path:
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> TextInputFormat format = new TextInputFormat(new
>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> format.setNestedFileEnumeration(true);
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> DataStream<String> inputStream = environment.readFile(format,
>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> FilePathFilter.createDefaultFilter());
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> inputStream.addSink(kafka);
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> But my requirement is get the list of paths and pass them one by one
>>>>>> to
>>>>>> this environment.readFile() method.How we can achieve this.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Satya
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> --------------------------
>>>>>> Best Regards
>>>>>> Satya Prakash
>>>>>> (M)+91-9845111913
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> --------------------------
>>>>>> Best Regards
>>>>>> Satya Prakash
>>>>>> (M)+91-9845111913
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> --------------------------
>>>>> Best Regards
>>>>> Satya Prakash
>>>>> (M)+91-9845111913
>>>>>
>>>>
>>>>
>>>> --
>>>> --------------------------
>>>> Best Regards
>>>> Satya Prakash
>>>> (M)+91-9845111913
>>>>
>>>
>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>>
>>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>


--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi Chesnay/Team,

Thanks, we got the fix for our problem but got stuck with the below issue,
request your support.


How to catch FileNotFoundException during runtime,if any directory is
missing in s3 as part of the below source code to avoid job failure.


s3PathList.stream().map(directory ->
S3Service.createInputStream(environment, directory, readerParallelism))
.reduce(DataStream::union).map(joinedStream ->
joinedStream.addSink(kafkaProducer));





Regards,

Satya

On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <[hidden email]> wrote:

> Hi Chesnay/Team
>
> Thank you so much.I have tried with the solution but it is not working as
> expected showing compilation issues and tried all the ways .Please find
> below code snippet :
>
> s3PathList.stream()
> .map(directory -> S3Service.customCreateInputStream(environment,
> directory, readerParallelism))
> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
> IntermidiateOperator()).map(joinedStream ->
> joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName));
>
> public static class IntermidiateOperator implements
> FlatMapFunction<String, String> {
> private static final ObjectMapper objectMapper1 = new ObjectMapper();
>
> @Override
> public void flatMap(String value, Collector<String> out) throws Exception {
> Test m = objectMapper1.readValue(value, Test.class);
> System.out.println("Json string:: ------" + m);
> // logger.info("Json string:: ------"+m);
> out.collect(value);
> }
> }
>
> Also just to clarify one doubt , How to handle *FileNotFoundException* as
> part of flink reader during runtime if in case directory is not available
> in s3. How to avoid job failure in that use case.
>
> Regards,
> Satya
>
> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <[hidden email]>
> wrote:
>
>> Thanks, I'll check it out.
>>
>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <[hidden email]>
>> wrote:
>>
>>> 1) There's no mechanism in the API to restrict the number of  number of
>>> readers across several sources. I can't quite think of a way to achieve
>>> this; maybe Kostas has an idea.
>>>
>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
>>>
>>> Try this:
>>>
>>> s3PathList.stream()
>>> .map(...)
>>> .reduce(...)
>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>>> .map(joinedStream->  joinedStream.addSink...)
>>>
>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>>>
>>> Hi Team,
>>>
>>> Could you please help me here. I’m sorry for asking on such short notice
>>> but my work has stopped due to this.
>>>
>>>
>>> Regards,
>>> Satya
>>>
>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]>
>>> wrote:
>>>
>>>> Hi  Shesnay/Team,
>>>>
>>>> Thank you so much for the reply.In the continuation of the previous
>>>> email, below is the block diagram where I am reading the file  from s3 and
>>>> pushing it to kafka.Now with the current setup, I have total 4 directory
>>>> based on the readfile method  from flink environment ,we are  creating 4
>>>> readers parallely to process the data from s3 .
>>>>
>>>> Below are my Questions:
>>>> 1. Can we restrict the no. of readers to process the  data parallely.
>>>> e.g let's say if  we have a thousand of directory , in that case i want to
>>>> restrict the no. of readers to 10 and ten parallel threads will continue
>>>> with 100 sequential reading of the directory per thread to consume the data
>>>> .
>>>>
>>>> 2.In between the two flink operators i.e s3 reader and kafka sink , i
>>>> just want to implement one more operator in order to transform the data
>>>> which i am reading from s3 bucket and then want to push into the kafka
>>>> sink. Below is my working code.Here i am finding  difficulties to
>>>> implement  map operator in order to transform the union of datastreams  by
>>>> applying union method over each directory's reader before pushing to kafka.
>>>>
>>>> List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);
>>>>
>>>> s3PathList.stream()
>>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>>> readerParallelism))
>>>> .reduce(DataStream::union)
>>>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish
>>>> to " + kafkaTopicName));
>>>>
>>>>
>>>> *Something like this I'm trying to do in order to achieve the above use
>>>> case by applying FlatMap, it could be map as well:*
>>>> s3PathList.stream()
>>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>>> readerParallelism))
>>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>>>> String>() {
>>>>    @Override
>>>>    public void flatMap(String value, Collector<String> out) throws
>>>> Exception {
>>>>     FinalJsonMessage m=objectMapper.readValue(value,
>>>> FinalJsonMessage.class);
>>>>     System.out.println("Json string:: ------"+m);
>>>>      //transformation logic
>>>>      out.collect(value);
>>>>    }
>>>>  })
>>>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish
>>>> to " + kafkaTopicName));
>>>> [image: image.png]
>>>> Request your support on the same.
>>>> Regards,
>>>> Satya
>>>>
>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi @[hidden email] <[hidden email]> ,
>>>>>
>>>>> Thanks for your support, it was really helpful.
>>>>> Do you know the list of directories when you submit the job? [Yes we
>>>>> do have]
>>>>> The impletemation is progress and will get back to you if any further
>>>>> challenges we may face.
>>>>> Appreciate your support in this regard.
>>>>>
>>>>> Regards,
>>>>> Satya
>>>>>
>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Thank you @Chesnay let me try this change .
>>>>>>
>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> You could also try using streams to make it a little more concise:
>>>>>>>
>>>>>>> directories.stream()
>>>>>>>    .map(directory -> createInputStream(environment, directory))
>>>>>>>    .reduce(DataStream::union)
>>>>>>>    .map(joinedStream -> joinedStream.addSink(kafka));
>>>>>>>
>>>>>>>
>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>>>>>
>>>>>>> Do you know the list of directories when you submit the job?
>>>>>>>
>>>>>>> If so, then you can iterate over them, create a source for each
>>>>>>> directory, union them, and apply the sink to the union.
>>>>>>>
>>>>>>> private static
>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
>>>>>>> String directory) {
>>>>>>>    TextInputFormat format =new TextInputFormat(new
>>>>>>> org.apache.flink.core.fs.Path(directory));
>>>>>>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>>>>>> FilePathFilter.createDefaultFilter()); }
>>>>>>>
>>>>>>> public static void runJob()throws Exception {
>>>>>>>    StreamExecutionEnvironment environment =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>>>>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>>>>>>> (String directory : directories) {
>>>>>>>       DataStream<String> inputStream =createInputStream(environment,
>>>>>>> directory); if (joinedStreams ==null) {
>>>>>>>          joinedStreams = inputStream; }else {
>>>>>>>          joinedStreams.union(inputStream); }
>>>>>>>    }
>>>>>>>    // add a sanity check that there was at least 1 directory
>>>>>>>
>>>>>>>    joinedStreams.addSink(kafka); }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>>>>>
>>>>>>> Hi Guys,
>>>>>>>
>>>>>>> Got stuck with it please help me here
>>>>>>> Regards,
>>>>>>> Satya
>>>>>>>
>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>>>>>>> <[hidden email]> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hi Guys,
>>>>>>>
>>>>>>> Sorry to bother you again, but someone could help me here? Any help
>>>>>>> in
>>>>>>> this regard will be much appreciated.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Satya
>>>>>>>
>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]>
>>>>>>> <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Guys,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I need one help, any leads will be highly appreciated.I have written
>>>>>>> a
>>>>>>> flink streaming job to read the data from s3 bucket and push to
>>>>>>> kafka.
>>>>>>> Below is the working source that deal with single s3 path:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> TextInputFormat format = new TextInputFormat(new
>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> format.setNestedFileEnumeration(true);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> DataStream<String> inputStream = environment.readFile(format,
>>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> FilePathFilter.createDefaultFilter());
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> inputStream.addSink(kafka);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> But my requirement is get the list of paths and pass them one by one
>>>>>>> to
>>>>>>> this environment.readFile() method.How we can achieve this.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Satya
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> --------------------------
>>>>>>> Best Regards
>>>>>>> Satya Prakash
>>>>>>> (M)+91-9845111913
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> --------------------------
>>>>>>> Best Regards
>>>>>>> Satya Prakash
>>>>>>> (M)+91-9845111913
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> --------------------------
>>>>>> Best Regards
>>>>>> Satya Prakash
>>>>>> (M)+91-9845111913
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> --------------------------
>>>>> Best Regards
>>>>> Satya Prakash
>>>>> (M)+91-9845111913
>>>>>
>>>>
>>>>
>>>> --
>>>> --------------------------
>>>> Best Regards
>>>> Satya Prakash
>>>> (M)+91-9845111913
>>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>>
>>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>


--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Chesnay Schepler-3
hmm...I don't see an easy way.
You may have to replicated StreamExecutionEnvironment#createFileInput and
create a custom ContinuousFileMonitoringFunction that ignores missing
files in it's run() method.

Alternatively, use some library to check the existence of the S3
directories before creating the sources.

On 10/15/2020 11:49 AM, Satyaa Dixit wrote:

> Hi Chesnay/Team,
>
> Thanks, we got the fix for our problem but got stuck with the below issue,
> request your support.
>
>
> How to catch FileNotFoundException during runtime,if any directory is
> missing in s3 as part of the below source code to avoid job failure.
>
>
> s3PathList.stream().map(directory ->
> S3Service.createInputStream(environment, directory, readerParallelism))
> .reduce(DataStream::union).map(joinedStream ->
> joinedStream.addSink(kafkaProducer));
>
>
>
>
>
> Regards,
>
> Satya
>
> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <[hidden email]> wrote:
>
>> Hi Chesnay/Team
>>
>> Thank you so much.I have tried with the solution but it is not working as
>> expected showing compilation issues and tried all the ways .Please find
>> below code snippet :
>>
>> s3PathList.stream()
>> .map(directory -> S3Service.customCreateInputStream(environment,
>> directory, readerParallelism))
>> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
>> IntermidiateOperator()).map(joinedStream ->
>> joinedStream.addSink(kafkaProducer).name("Publish to " + kafkaTopicName));
>>
>> public static class IntermidiateOperator implements
>> FlatMapFunction<String, String> {
>> private static final ObjectMapper objectMapper1 = new ObjectMapper();
>>
>> @Override
>> public void flatMap(String value, Collector<String> out) throws Exception {
>> Test m = objectMapper1.readValue(value, Test.class);
>> System.out.println("Json string:: ------" + m);
>> // logger.info("Json string:: ------"+m);
>> out.collect(value);
>> }
>> }
>>
>> Also just to clarify one doubt , How to handle *FileNotFoundException* as
>> part of flink reader during runtime if in case directory is not available
>> in s3. How to avoid job failure in that use case.
>>
>> Regards,
>> Satya
>>
>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <[hidden email]>
>> wrote:
>>
>>> Thanks, I'll check it out.
>>>
>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <[hidden email]>
>>> wrote:
>>>
>>>> 1) There's no mechanism in the API to restrict the number of  number of
>>>> readers across several sources. I can't quite think of a way to achieve
>>>> this; maybe Kostas has an idea.
>>>>
>>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
>>>>
>>>> Try this:
>>>>
>>>> s3PathList.stream()
>>>> .map(...)
>>>> .reduce(...)
>>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>>>> .map(joinedStream->  joinedStream.addSink...)
>>>>
>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>>>>
>>>> Hi Team,
>>>>
>>>> Could you please help me here. I’m sorry for asking on such short notice
>>>> but my work has stopped due to this.
>>>>
>>>>
>>>> Regards,
>>>> Satya
>>>>
>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi  Shesnay/Team,
>>>>>
>>>>> Thank you so much for the reply.In the continuation of the previous
>>>>> email, below is the block diagram where I am reading the file  from s3 and
>>>>> pushing it to kafka.Now with the current setup, I have total 4 directory
>>>>> based on the readfile method  from flink environment ,we are  creating 4
>>>>> readers parallely to process the data from s3 .
>>>>>
>>>>> Below are my Questions:
>>>>> 1. Can we restrict the no. of readers to process the  data parallely.
>>>>> e.g let's say if  we have a thousand of directory , in that case i want to
>>>>> restrict the no. of readers to 10 and ten parallel threads will continue
>>>>> with 100 sequential reading of the directory per thread to consume the data
>>>>> .
>>>>>
>>>>> 2.In between the two flink operators i.e s3 reader and kafka sink , i
>>>>> just want to implement one more operator in order to transform the data
>>>>> which i am reading from s3 bucket and then want to push into the kafka
>>>>> sink. Below is my working code.Here i am finding  difficulties to
>>>>> implement  map operator in order to transform the union of datastreams  by
>>>>> applying union method over each directory's reader before pushing to kafka.
>>>>>
>>>>> List<String> s3PathList = S3Service.getListOfS3Paths(finalParameters);
>>>>>
>>>>> s3PathList.stream()
>>>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>>>> readerParallelism))
>>>>> .reduce(DataStream::union)
>>>>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish
>>>>> to " + kafkaTopicName));
>>>>>
>>>>>
>>>>> *Something like this I'm trying to do in order to achieve the above use
>>>>> case by applying FlatMap, it could be map as well:*
>>>>> s3PathList.stream()
>>>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>>>> readerParallelism))
>>>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>>>>> String>() {
>>>>>     @Override
>>>>>     public void flatMap(String value, Collector<String> out) throws
>>>>> Exception {
>>>>>      FinalJsonMessage m=objectMapper.readValue(value,
>>>>> FinalJsonMessage.class);
>>>>>      System.out.println("Json string:: ------"+m);
>>>>>       //transformation logic
>>>>>       out.collect(value);
>>>>>     }
>>>>>   })
>>>>> .map(joinedStream -> joinedStream.addSink(kafkaProducer).name("Publish
>>>>> to " + kafkaTopicName));
>>>>> [image: image.png]
>>>>> Request your support on the same.
>>>>> Regards,
>>>>> Satya
>>>>>
>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi @[hidden email] <[hidden email]> ,
>>>>>>
>>>>>> Thanks for your support, it was really helpful.
>>>>>> Do you know the list of directories when you submit the job? [Yes we
>>>>>> do have]
>>>>>> The impletemation is progress and will get back to you if any further
>>>>>> challenges we may face.
>>>>>> Appreciate your support in this regard.
>>>>>>
>>>>>> Regards,
>>>>>> Satya
>>>>>>
>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thank you @Chesnay let me try this change .
>>>>>>>
>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> You could also try using streams to make it a little more concise:
>>>>>>>>
>>>>>>>> directories.stream()
>>>>>>>>     .map(directory -> createInputStream(environment, directory))
>>>>>>>>     .reduce(DataStream::union)
>>>>>>>>     .map(joinedStream -> joinedStream.addSink(kafka));
>>>>>>>>
>>>>>>>>
>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>>>>>>
>>>>>>>> Do you know the list of directories when you submit the job?
>>>>>>>>
>>>>>>>> If so, then you can iterate over them, create a source for each
>>>>>>>> directory, union them, and apply the sink to the union.
>>>>>>>>
>>>>>>>> private static
>>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment environment,
>>>>>>>> String directory) {
>>>>>>>>     TextInputFormat format =new TextInputFormat(new
>>>>>>>> org.apache.flink.core.fs.Path(directory));
>>>>>>>> format.setNestedFileEnumeration(true); return environment.readFile(format,
>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>>>>>>> FilePathFilter.createDefaultFilter()); }
>>>>>>>>
>>>>>>>> public static void runJob()throws Exception {
>>>>>>>>     StreamExecutionEnvironment environment =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>>>>>>> directories =getDirectories(); DataStream<String> joinedStreams =null; for
>>>>>>>> (String directory : directories) {
>>>>>>>>        DataStream<String> inputStream =createInputStream(environment,
>>>>>>>> directory); if (joinedStreams ==null) {
>>>>>>>>           joinedStreams = inputStream; }else {
>>>>>>>>           joinedStreams.union(inputStream); }
>>>>>>>>     }
>>>>>>>>     // add a sanity check that there was at least 1 directory
>>>>>>>>
>>>>>>>>     joinedStreams.addSink(kafka); }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>>>>>>
>>>>>>>> Hi Guys,
>>>>>>>>
>>>>>>>> Got stuck with it please help me here
>>>>>>>> Regards,
>>>>>>>> Satya
>>>>>>>>
>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>>>>>>>> <[hidden email]> <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hi Guys,
>>>>>>>>
>>>>>>>> Sorry to bother you again, but someone could help me here? Any help
>>>>>>>> in
>>>>>>>> this regard will be much appreciated.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Satya
>>>>>>>>
>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <[hidden email]>
>>>>>>>> <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Guys,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I need one help, any leads will be highly appreciated.I have written
>>>>>>>> a
>>>>>>>> flink streaming job to read the data from s3 bucket and push to
>>>>>>>> kafka.
>>>>>>>> Below is the working source that deal with single s3 path:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> TextInputFormat format = new TextInputFormat(new
>>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> format.setNestedFileEnumeration(true);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> DataStream<String> inputStream = environment.readFile(format,
>>>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE, -1,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> FilePathFilter.createDefaultFilter());
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> inputStream.addSink(kafka);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> But my requirement is get the list of paths and pass them one by one
>>>>>>>> to
>>>>>>>> this environment.readFile() method.How we can achieve this.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Satya
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> --------------------------
>>>>>>>> Best Regards
>>>>>>>> Satya Prakash
>>>>>>>> (M)+91-9845111913
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> --------------------------
>>>>>>>> Best Regards
>>>>>>>> Satya Prakash
>>>>>>>> (M)+91-9845111913
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>> --
>>>>>>> --------------------------
>>>>>>> Best Regards
>>>>>>> Satya Prakash
>>>>>>> (M)+91-9845111913
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> --------------------------
>>>>>> Best Regards
>>>>>> Satya Prakash
>>>>>> (M)+91-9845111913
>>>>>>
>>>>>
>>>>> --
>>>>> --------------------------
>>>>> Best Regards
>>>>> Satya Prakash
>>>>> (M)+91-9845111913
>>>>>
>>>> --
>>>> --------------------------
>>>> Best Regards
>>>> Satya Prakash
>>>> (M)+91-9845111913
>>>>
>>>>
>>>>
>>> --
>>> --------------------------
>>> Best Regards
>>> Satya Prakash
>>> (M)+91-9845111913
>>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi Chesnay,
Thanks for your support.It helped a lot. I need one more help on how to do
checkpointing as part of the s3 reader source in case if some failure
happens due to OutOfMemoryError exception or it could be any other failure,
and want to recover the data from last reader splitted offset during
restart the job in continuation of the previous job in order to avoid
duplicate data.

Thanks,
Satya

On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler <[hidden email]> wrote:

> hmm...I don't see an easy way.
> You may have to replicated StreamExecutionEnvironment#createFileInput and
> create a custom ContinuousFileMonitoringFunction that ignores missing
> files in it's run() method.
>
> Alternatively, use some library to check the existence of the S3
> directories before creating the sources.
>
> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
> > Hi Chesnay/Team,
> >
> > Thanks, we got the fix for our problem but got stuck with the below
> issue,
> > request your support.
> >
> >
> > How to catch FileNotFoundException during runtime,if any directory is
> > missing in s3 as part of the below source code to avoid job failure.
> >
> >
> > s3PathList.stream().map(directory ->
> > S3Service.createInputStream(environment, directory, readerParallelism))
> > .reduce(DataStream::union).map(joinedStream ->
> > joinedStream.addSink(kafkaProducer));
> >
> >
> >
> >
> >
> > Regards,
> >
> > Satya
> >
> > On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <[hidden email]>
> wrote:
> >
> >> Hi Chesnay/Team
> >>
> >> Thank you so much.I have tried with the solution but it is not working
> as
> >> expected showing compilation issues and tried all the ways .Please find
> >> below code snippet :
> >>
> >> s3PathList.stream()
> >> .map(directory -> S3Service.customCreateInputStream(environment,
> >> directory, readerParallelism))
> >> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
> >> IntermidiateOperator()).map(joinedStream ->
> >> joinedStream.addSink(kafkaProducer).name("Publish to " +
> kafkaTopicName));
> >>
> >> public static class IntermidiateOperator implements
> >> FlatMapFunction<String, String> {
> >> private static final ObjectMapper objectMapper1 = new ObjectMapper();
> >>
> >> @Override
> >> public void flatMap(String value, Collector<String> out) throws
> Exception {
> >> Test m = objectMapper1.readValue(value, Test.class);
> >> System.out.println("Json string:: ------" + m);
> >> // logger.info("Json string:: ------"+m);
> >> out.collect(value);
> >> }
> >> }
> >>
> >> Also just to clarify one doubt , How to handle *FileNotFoundException*
> as
> >> part of flink reader during runtime if in case directory is not
> available
> >> in s3. How to avoid job failure in that use case.
> >>
> >> Regards,
> >> Satya
> >>
> >> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <[hidden email]>
> >> wrote:
> >>
> >>> Thanks, I'll check it out.
> >>>
> >>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <[hidden email]>
> >>> wrote:
> >>>
> >>>> 1) There's no mechanism in the API to restrict the number of  number
> of
> >>>> readers across several sources. I can't quite think of a way to
> achieve
> >>>> this; maybe Kostas has an idea.
> >>>>
> >>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
> >>>>
> >>>> Try this:
> >>>>
> >>>> s3PathList.stream()
> >>>> .map(...)
> >>>> .reduce(...)
> >>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
> >>>> .map(joinedStream->  joinedStream.addSink...)
> >>>>
> >>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
> >>>>
> >>>> Hi Team,
> >>>>
> >>>> Could you please help me here. I’m sorry for asking on such short
> notice
> >>>> but my work has stopped due to this.
> >>>>
> >>>>
> >>>> Regards,
> >>>> Satya
> >>>>
> >>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]>
> >>>> wrote:
> >>>>
> >>>>> Hi  Shesnay/Team,
> >>>>>
> >>>>> Thank you so much for the reply.In the continuation of the previous
> >>>>> email, below is the block diagram where I am reading the file  from
> s3 and
> >>>>> pushing it to kafka.Now with the current setup, I have total 4
> directory
> >>>>> based on the readfile method  from flink environment ,we are
> creating 4
> >>>>> readers parallely to process the data from s3 .
> >>>>>
> >>>>> Below are my Questions:
> >>>>> 1. Can we restrict the no. of readers to process the  data parallely.
> >>>>> e.g let's say if  we have a thousand of directory , in that case i
> want to
> >>>>> restrict the no. of readers to 10 and ten parallel threads will
> continue
> >>>>> with 100 sequential reading of the directory per thread to consume
> the data
> >>>>> .
> >>>>>
> >>>>> 2.In between the two flink operators i.e s3 reader and kafka sink , i
> >>>>> just want to implement one more operator in order to transform the
> data
> >>>>> which i am reading from s3 bucket and then want to push into the
> kafka
> >>>>> sink. Below is my working code.Here i am finding  difficulties to
> >>>>> implement  map operator in order to transform the union of
> datastreams  by
> >>>>> applying union method over each directory's reader before pushing to
> kafka.
> >>>>>
> >>>>> List<String> s3PathList =
> S3Service.getListOfS3Paths(finalParameters);
> >>>>>
> >>>>> s3PathList.stream()
> >>>>> .map(directory -> S3Service.customInputStream(environment, directory,
> >>>>> readerParallelism))
> >>>>> .reduce(DataStream::union)
> >>>>> .map(joinedStream ->
> joinedStream.addSink(kafkaProducer).name("Publish
> >>>>> to " + kafkaTopicName));
> >>>>>
> >>>>>
> >>>>> *Something like this I'm trying to do in order to achieve the above
> use
> >>>>> case by applying FlatMap, it could be map as well:*
> >>>>> s3PathList.stream()
> >>>>> .map(directory -> S3Service.customInputStream(environment, directory,
> >>>>> readerParallelism))
> >>>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
> >>>>> String>() {
> >>>>>     @Override
> >>>>>     public void flatMap(String value, Collector<String> out) throws
> >>>>> Exception {
> >>>>>      FinalJsonMessage m=objectMapper.readValue(value,
> >>>>> FinalJsonMessage.class);
> >>>>>      System.out.println("Json string:: ------"+m);
> >>>>>       //transformation logic
> >>>>>       out.collect(value);
> >>>>>     }
> >>>>>   })
> >>>>> .map(joinedStream ->
> joinedStream.addSink(kafkaProducer).name("Publish
> >>>>> to " + kafkaTopicName));
> >>>>> [image: image.png]
> >>>>> Request your support on the same.
> >>>>> Regards,
> >>>>> Satya
> >>>>>
> >>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <[hidden email]>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi @[hidden email] <[hidden email]> ,
> >>>>>>
> >>>>>> Thanks for your support, it was really helpful.
> >>>>>> Do you know the list of directories when you submit the job? [Yes we
> >>>>>> do have]
> >>>>>> The impletemation is progress and will get back to you if any
> further
> >>>>>> challenges we may face.
> >>>>>> Appreciate your support in this regard.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Satya
> >>>>>>
> >>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[hidden email]>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thank you @Chesnay let me try this change .
> >>>>>>>
> >>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <
> [hidden email]>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> You could also try using streams to make it a little more concise:
> >>>>>>>>
> >>>>>>>> directories.stream()
> >>>>>>>>     .map(directory -> createInputStream(environment, directory))
> >>>>>>>>     .reduce(DataStream::union)
> >>>>>>>>     .map(joinedStream -> joinedStream.addSink(kafka));
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
> >>>>>>>>
> >>>>>>>> Do you know the list of directories when you submit the job?
> >>>>>>>>
> >>>>>>>> If so, then you can iterate over them, create a source for each
> >>>>>>>> directory, union them, and apply the sink to the union.
> >>>>>>>>
> >>>>>>>> private static
> >>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment
> environment,
> >>>>>>>> String directory) {
> >>>>>>>>     TextInputFormat format =new TextInputFormat(new
> >>>>>>>> org.apache.flink.core.fs.Path(directory));
> >>>>>>>> format.setNestedFileEnumeration(true); return
> environment.readFile(format,
> >>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
> >>>>>>>> FilePathFilter.createDefaultFilter()); }
> >>>>>>>>
> >>>>>>>> public static void runJob()throws Exception {
> >>>>>>>>     StreamExecutionEnvironment environment =
> >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
> >>>>>>>> directories =getDirectories(); DataStream<String> joinedStreams
> =null; for
> >>>>>>>> (String directory : directories) {
> >>>>>>>>        DataStream<String> inputStream
> =createInputStream(environment,
> >>>>>>>> directory); if (joinedStreams ==null) {
> >>>>>>>>           joinedStreams = inputStream; }else {
> >>>>>>>>           joinedStreams.union(inputStream); }
> >>>>>>>>     }
> >>>>>>>>     // add a sanity check that there was at least 1 directory
> >>>>>>>>
> >>>>>>>>     joinedStreams.addSink(kafka); }
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
> >>>>>>>>
> >>>>>>>> Hi Guys,
> >>>>>>>>
> >>>>>>>> Got stuck with it please help me here
> >>>>>>>> Regards,
> >>>>>>>> Satya
> >>>>>>>>
> >>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
> >>>>>>>> <[hidden email]> <[hidden email]> wrote:
> >>>>>>>>
> >>>>>>>> Hi Guys,
> >>>>>>>>
> >>>>>>>> Sorry to bother you again, but someone could help me here? Any
> help
> >>>>>>>> in
> >>>>>>>> this regard will be much appreciated.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Satya
> >>>>>>>>
> >>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <
> [hidden email]>
> >>>>>>>> <[hidden email]>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi Guys,
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> I need one help, any leads will be highly appreciated.I have
> written
> >>>>>>>> a
> >>>>>>>> flink streaming job to read the data from s3 bucket and push to
> >>>>>>>> kafka.
> >>>>>>>> Below is the working source that deal with single s3 path:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> TextInputFormat format = new TextInputFormat(new
> >>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> format.setNestedFileEnumeration(true);
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> DataStream<String> inputStream = environment.readFile(format,
> >>>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE,
> -1,
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> FilePathFilter.createDefaultFilter());
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> inputStream.addSink(kafka);
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> But my requirement is get the list of paths and pass them one by
> one
> >>>>>>>> to
> >>>>>>>> this environment.readFile() method.How we can achieve this.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Satya
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> --------------------------
> >>>>>>>> Best Regards
> >>>>>>>> Satya Prakash
> >>>>>>>> (M)+91-9845111913
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>>
> >>>>>>>> --------------------------
> >>>>>>>> Best Regards
> >>>>>>>> Satya Prakash
> >>>>>>>> (M)+91-9845111913
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>> --
> >>>>>>> --------------------------
> >>>>>>> Best Regards
> >>>>>>> Satya Prakash
> >>>>>>> (M)+91-9845111913
> >>>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> --------------------------
> >>>>>> Best Regards
> >>>>>> Satya Prakash
> >>>>>> (M)+91-9845111913
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> --------------------------
> >>>>> Best Regards
> >>>>> Satya Prakash
> >>>>> (M)+91-9845111913
> >>>>>
> >>>> --
> >>>> --------------------------
> >>>> Best Regards
> >>>> Satya Prakash
> >>>> (M)+91-9845111913
> >>>>
> >>>>
> >>>>
> >>> --
> >>> --------------------------
> >>> Best Regards
> >>> Satya Prakash
> >>> (M)+91-9845111913
> >>>
> >>
> >> --
> >> --------------------------
> >> Best Regards
> >> Satya Prakash
> >> (M)+91-9845111913
> >>
> >
>
>

--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Chesnay Schepler-3
The existing ContinuousFileMonitoringFunction and
ContinuousFileReaderOperator already take care of that.
Unless you aren't re-implementing them from scratch you shouldn't have
to do anything.

On 10/22/2020 1:47 PM, Satyaa Dixit wrote:

> Hi Chesnay,
> Thanks for your support.It helped a lot. I need one more help on how to do
> checkpointing as part of the s3 reader source in case if some failure
> happens due to OutOfMemoryError exception or it could be any other failure,
> and want to recover the data from last reader splitted offset during
> restart the job in continuation of the previous job in order to avoid
> duplicate data.
>
> Thanks,
> Satya
>
> On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler <[hidden email]> wrote:
>
>> hmm...I don't see an easy way.
>> You may have to replicated StreamExecutionEnvironment#createFileInput and
>> create a custom ContinuousFileMonitoringFunction that ignores missing
>> files in it's run() method.
>>
>> Alternatively, use some library to check the existence of the S3
>> directories before creating the sources.
>>
>> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
>>> Hi Chesnay/Team,
>>>
>>> Thanks, we got the fix for our problem but got stuck with the below
>> issue,
>>> request your support.
>>>
>>>
>>> How to catch FileNotFoundException during runtime,if any directory is
>>> missing in s3 as part of the below source code to avoid job failure.
>>>
>>>
>>> s3PathList.stream().map(directory ->
>>> S3Service.createInputStream(environment, directory, readerParallelism))
>>> .reduce(DataStream::union).map(joinedStream ->
>>> joinedStream.addSink(kafkaProducer));
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>>
>>> Satya
>>>
>>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <[hidden email]>
>> wrote:
>>>> Hi Chesnay/Team
>>>>
>>>> Thank you so much.I have tried with the solution but it is not working
>> as
>>>> expected showing compilation issues and tried all the ways .Please find
>>>> below code snippet :
>>>>
>>>> s3PathList.stream()
>>>> .map(directory -> S3Service.customCreateInputStream(environment,
>>>> directory, readerParallelism))
>>>> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
>>>> IntermidiateOperator()).map(joinedStream ->
>>>> joinedStream.addSink(kafkaProducer).name("Publish to " +
>> kafkaTopicName));
>>>> public static class IntermidiateOperator implements
>>>> FlatMapFunction<String, String> {
>>>> private static final ObjectMapper objectMapper1 = new ObjectMapper();
>>>>
>>>> @Override
>>>> public void flatMap(String value, Collector<String> out) throws
>> Exception {
>>>> Test m = objectMapper1.readValue(value, Test.class);
>>>> System.out.println("Json string:: ------" + m);
>>>> // logger.info("Json string:: ------"+m);
>>>> out.collect(value);
>>>> }
>>>> }
>>>>
>>>> Also just to clarify one doubt , How to handle *FileNotFoundException*
>> as
>>>> part of flink reader during runtime if in case directory is not
>> available
>>>> in s3. How to avoid job failure in that use case.
>>>>
>>>> Regards,
>>>> Satya
>>>>
>>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <[hidden email]>
>>>> wrote:
>>>>
>>>>> Thanks, I'll check it out.
>>>>>
>>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> 1) There's no mechanism in the API to restrict the number of  number
>> of
>>>>>> readers across several sources. I can't quite think of a way to
>> achieve
>>>>>> this; maybe Kostas has an idea.
>>>>>>
>>>>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
>>>>>>
>>>>>> Try this:
>>>>>>
>>>>>> s3PathList.stream()
>>>>>> .map(...)
>>>>>> .reduce(...)
>>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>>>>>> .map(joinedStream->  joinedStream.addSink...)
>>>>>>
>>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> Could you please help me here. I’m sorry for asking on such short
>> notice
>>>>>> but my work has stopped due to this.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Satya
>>>>>>
>>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi  Shesnay/Team,
>>>>>>>
>>>>>>> Thank you so much for the reply.In the continuation of the previous
>>>>>>> email, below is the block diagram where I am reading the file  from
>> s3 and
>>>>>>> pushing it to kafka.Now with the current setup, I have total 4
>> directory
>>>>>>> based on the readfile method  from flink environment ,we are
>> creating 4
>>>>>>> readers parallely to process the data from s3 .
>>>>>>>
>>>>>>> Below are my Questions:
>>>>>>> 1. Can we restrict the no. of readers to process the  data parallely.
>>>>>>> e.g let's say if  we have a thousand of directory , in that case i
>> want to
>>>>>>> restrict the no. of readers to 10 and ten parallel threads will
>> continue
>>>>>>> with 100 sequential reading of the directory per thread to consume
>> the data
>>>>>>> .
>>>>>>>
>>>>>>> 2.In between the two flink operators i.e s3 reader and kafka sink , i
>>>>>>> just want to implement one more operator in order to transform the
>> data
>>>>>>> which i am reading from s3 bucket and then want to push into the
>> kafka
>>>>>>> sink. Below is my working code.Here i am finding  difficulties to
>>>>>>> implement  map operator in order to transform the union of
>> datastreams  by
>>>>>>> applying union method over each directory's reader before pushing to
>> kafka.
>>>>>>> List<String> s3PathList =
>> S3Service.getListOfS3Paths(finalParameters);
>>>>>>> s3PathList.stream()
>>>>>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>>>>>> readerParallelism))
>>>>>>> .reduce(DataStream::union)
>>>>>>> .map(joinedStream ->
>> joinedStream.addSink(kafkaProducer).name("Publish
>>>>>>> to " + kafkaTopicName));
>>>>>>>
>>>>>>>
>>>>>>> *Something like this I'm trying to do in order to achieve the above
>> use
>>>>>>> case by applying FlatMap, it could be map as well:*
>>>>>>> s3PathList.stream()
>>>>>>> .map(directory -> S3Service.customInputStream(environment, directory,
>>>>>>> readerParallelism))
>>>>>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>>>>>>> String>() {
>>>>>>>      @Override
>>>>>>>      public void flatMap(String value, Collector<String> out) throws
>>>>>>> Exception {
>>>>>>>       FinalJsonMessage m=objectMapper.readValue(value,
>>>>>>> FinalJsonMessage.class);
>>>>>>>       System.out.println("Json string:: ------"+m);
>>>>>>>        //transformation logic
>>>>>>>        out.collect(value);
>>>>>>>      }
>>>>>>>    })
>>>>>>> .map(joinedStream ->
>> joinedStream.addSink(kafkaProducer).name("Publish
>>>>>>> to " + kafkaTopicName));
>>>>>>> [image: image.png]
>>>>>>> Request your support on the same.
>>>>>>> Regards,
>>>>>>> Satya
>>>>>>>
>>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <[hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi @[hidden email] <[hidden email]> ,
>>>>>>>>
>>>>>>>> Thanks for your support, it was really helpful.
>>>>>>>> Do you know the list of directories when you submit the job? [Yes we
>>>>>>>> do have]
>>>>>>>> The impletemation is progress and will get back to you if any
>> further
>>>>>>>> challenges we may face.
>>>>>>>> Appreciate your support in this regard.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Satya
>>>>>>>>
>>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thank you @Chesnay let me try this change .
>>>>>>>>>
>>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <
>> [hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You could also try using streams to make it a little more concise:
>>>>>>>>>>
>>>>>>>>>> directories.stream()
>>>>>>>>>>      .map(directory -> createInputStream(environment, directory))
>>>>>>>>>>      .reduce(DataStream::union)
>>>>>>>>>>      .map(joinedStream -> joinedStream.addSink(kafka));
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>>>>>>>>>
>>>>>>>>>> Do you know the list of directories when you submit the job?
>>>>>>>>>>
>>>>>>>>>> If so, then you can iterate over them, create a source for each
>>>>>>>>>> directory, union them, and apply the sink to the union.
>>>>>>>>>>
>>>>>>>>>> private static
>>>>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment
>> environment,
>>>>>>>>>> String directory) {
>>>>>>>>>>      TextInputFormat format =new TextInputFormat(new
>>>>>>>>>> org.apache.flink.core.fs.Path(directory));
>>>>>>>>>> format.setNestedFileEnumeration(true); return
>> environment.readFile(format,
>>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>>>>>>>>> FilePathFilter.createDefaultFilter()); }
>>>>>>>>>>
>>>>>>>>>> public static void runJob()throws Exception {
>>>>>>>>>>      StreamExecutionEnvironment environment =
>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>>>>>>>>>> directories =getDirectories(); DataStream<String> joinedStreams
>> =null; for
>>>>>>>>>> (String directory : directories) {
>>>>>>>>>>         DataStream<String> inputStream
>> =createInputStream(environment,
>>>>>>>>>> directory); if (joinedStreams ==null) {
>>>>>>>>>>            joinedStreams = inputStream; }else {
>>>>>>>>>>            joinedStreams.union(inputStream); }
>>>>>>>>>>      }
>>>>>>>>>>      // add a sanity check that there was at least 1 directory
>>>>>>>>>>
>>>>>>>>>>      joinedStreams.addSink(kafka); }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Guys,
>>>>>>>>>>
>>>>>>>>>> Got stuck with it please help me here
>>>>>>>>>> Regards,
>>>>>>>>>> Satya
>>>>>>>>>>
>>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>>>>>>>>>> <[hidden email]> <[hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Guys,
>>>>>>>>>>
>>>>>>>>>> Sorry to bother you again, but someone could help me here? Any
>> help
>>>>>>>>>> in
>>>>>>>>>> this regard will be much appreciated.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Satya
>>>>>>>>>>
>>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <
>> [hidden email]>
>>>>>>>>>> <[hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Guys,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I need one help, any leads will be highly appreciated.I have
>> written
>>>>>>>>>> a
>>>>>>>>>> flink streaming job to read the data from s3 bucket and push to
>>>>>>>>>> kafka.
>>>>>>>>>> Below is the working source that deal with single s3 path:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> TextInputFormat format = new TextInputFormat(new
>>>>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> format.setNestedFileEnumeration(true);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> DataStream<String> inputStream = environment.readFile(format,
>>>>>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE,
>> -1,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> FilePathFilter.createDefaultFilter());
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> inputStream.addSink(kafka);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> But my requirement is get the list of paths and pass them one by
>> one
>>>>>>>>>> to
>>>>>>>>>> this environment.readFile() method.How we can achieve this.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Satya
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> --------------------------
>>>>>>>>>> Best Regards
>>>>>>>>>> Satya Prakash
>>>>>>>>>> (M)+91-9845111913
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> --------------------------
>>>>>>>>>> Best Regards
>>>>>>>>>> Satya Prakash
>>>>>>>>>> (M)+91-9845111913
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> --------------------------
>>>>>>>>> Best Regards
>>>>>>>>> Satya Prakash
>>>>>>>>> (M)+91-9845111913
>>>>>>>>>
>>>>>>>> --
>>>>>>>> --------------------------
>>>>>>>> Best Regards
>>>>>>>> Satya Prakash
>>>>>>>> (M)+91-9845111913
>>>>>>>>
>>>>>>> --
>>>>>>> --------------------------
>>>>>>> Best Regards
>>>>>>> Satya Prakash
>>>>>>> (M)+91-9845111913
>>>>>>>
>>>>>> --
>>>>>> --------------------------
>>>>>> Best Regards
>>>>>> Satya Prakash
>>>>>> (M)+91-9845111913
>>>>>>
>>>>>>
>>>>>>
>>>>> --
>>>>> --------------------------
>>>>> Best Regards
>>>>> Satya Prakash
>>>>> (M)+91-9845111913
>>>>>
>>>> --
>>>> --------------------------
>>>> Best Regards
>>>> Satya Prakash
>>>> (M)+91-9845111913
>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi Chesnay & Team,

I'm using already using "ContinuousFileMonitoringFunction"  but still I'm
not able to achieve the below use case. For example once job started and it
process half of the data and in between job got failed because of below
exception. how to avoid this exception? could you please help us on this
too.





*org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception when processing split: [110]
s3://messages/qa/test-data/DEMO/0077/data/2020-09-03/10/2020-09-03_10:01:51_53807.json
mod@ 1599127313000 : 0 + 345 at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
Caused by: java.lang.OutOfMemoryError: Java heap space*

If I start the job back again, it is causing the data duplication. How to
fix this negative use case with flink s3 reader source using checkpointing?

Regards,

Satya

On Thu, Oct 22, 2020 at 7:05 PM Chesnay Schepler <[hidden email]> wrote:

> The existing ContinuousFileMonitoringFunction and
> ContinuousFileReaderOperator already take care of that.
> Unless you aren't re-implementing them from scratch you shouldn't have
> to do anything.
>
> On 10/22/2020 1:47 PM, Satyaa Dixit wrote:
> > Hi Chesnay,
> > Thanks for your support.It helped a lot. I need one more help on how to
> do
> > checkpointing as part of the s3 reader source in case if some failure
> > happens due to OutOfMemoryError exception or it could be any other
> failure,
> > and want to recover the data from last reader splitted offset during
> > restart the job in continuation of the previous job in order to avoid
> > duplicate data.
> >
> > Thanks,
> > Satya
> >
> > On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler <[hidden email]>
> wrote:
> >
> >> hmm...I don't see an easy way.
> >> You may have to replicated StreamExecutionEnvironment#createFileInput
> and
> >> create a custom ContinuousFileMonitoringFunction that ignores missing
> >> files in it's run() method.
> >>
> >> Alternatively, use some library to check the existence of the S3
> >> directories before creating the sources.
> >>
> >> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
> >>> Hi Chesnay/Team,
> >>>
> >>> Thanks, we got the fix for our problem but got stuck with the below
> >> issue,
> >>> request your support.
> >>>
> >>>
> >>> How to catch FileNotFoundException during runtime,if any directory is
> >>> missing in s3 as part of the below source code to avoid job failure.
> >>>
> >>>
> >>> s3PathList.stream().map(directory ->
> >>> S3Service.createInputStream(environment, directory, readerParallelism))
> >>> .reduce(DataStream::union).map(joinedStream ->
> >>> joinedStream.addSink(kafkaProducer));
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> Regards,
> >>>
> >>> Satya
> >>>
> >>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <[hidden email]>
> >> wrote:
> >>>> Hi Chesnay/Team
> >>>>
> >>>> Thank you so much.I have tried with the solution but it is not working
> >> as
> >>>> expected showing compilation issues and tried all the ways .Please
> find
> >>>> below code snippet :
> >>>>
> >>>> s3PathList.stream()
> >>>> .map(directory -> S3Service.customCreateInputStream(environment,
> >>>> directory, readerParallelism))
> >>>> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
> >>>> IntermidiateOperator()).map(joinedStream ->
> >>>> joinedStream.addSink(kafkaProducer).name("Publish to " +
> >> kafkaTopicName));
> >>>> public static class IntermidiateOperator implements
> >>>> FlatMapFunction<String, String> {
> >>>> private static final ObjectMapper objectMapper1 = new ObjectMapper();
> >>>>
> >>>> @Override
> >>>> public void flatMap(String value, Collector<String> out) throws
> >> Exception {
> >>>> Test m = objectMapper1.readValue(value, Test.class);
> >>>> System.out.println("Json string:: ------" + m);
> >>>> // logger.info("Json string:: ------"+m);
> >>>> out.collect(value);
> >>>> }
> >>>> }
> >>>>
> >>>> Also just to clarify one doubt , How to handle *FileNotFoundException*
> >> as
> >>>> part of flink reader during runtime if in case directory is not
> >> available
> >>>> in s3. How to avoid job failure in that use case.
> >>>>
> >>>> Regards,
> >>>> Satya
> >>>>
> >>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <[hidden email]>
> >>>> wrote:
> >>>>
> >>>>> Thanks, I'll check it out.
> >>>>>
> >>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <[hidden email]
> >
> >>>>> wrote:
> >>>>>
> >>>>>> 1) There's no mechanism in the API to restrict the number of  number
> >> of
> >>>>>> readers across several sources. I can't quite think of a way to
> >> achieve
> >>>>>> this; maybe Kostas has an idea.
> >>>>>>
> >>>>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
> >>>>>>
> >>>>>> Try this:
> >>>>>>
> >>>>>> s3PathList.stream()
> >>>>>> .map(...)
> >>>>>> .reduce(...)
> >>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
> >>>>>> .map(joinedStream->  joinedStream.addSink...)
> >>>>>>
> >>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
> >>>>>>
> >>>>>> Hi Team,
> >>>>>>
> >>>>>> Could you please help me here. I’m sorry for asking on such short
> >> notice
> >>>>>> but my work has stopped due to this.
> >>>>>>
> >>>>>>
> >>>>>> Regards,
> >>>>>> Satya
> >>>>>>
> >>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi  Shesnay/Team,
> >>>>>>>
> >>>>>>> Thank you so much for the reply.In the continuation of the previous
> >>>>>>> email, below is the block diagram where I am reading the file  from
> >> s3 and
> >>>>>>> pushing it to kafka.Now with the current setup, I have total 4
> >> directory
> >>>>>>> based on the readfile method  from flink environment ,we are
> >> creating 4
> >>>>>>> readers parallely to process the data from s3 .
> >>>>>>>
> >>>>>>> Below are my Questions:
> >>>>>>> 1. Can we restrict the no. of readers to process the  data
> parallely.
> >>>>>>> e.g let's say if  we have a thousand of directory , in that case i
> >> want to
> >>>>>>> restrict the no. of readers to 10 and ten parallel threads will
> >> continue
> >>>>>>> with 100 sequential reading of the directory per thread to consume
> >> the data
> >>>>>>> .
> >>>>>>>
> >>>>>>> 2.In between the two flink operators i.e s3 reader and kafka sink
> , i
> >>>>>>> just want to implement one more operator in order to transform the
> >> data
> >>>>>>> which i am reading from s3 bucket and then want to push into the
> >> kafka
> >>>>>>> sink. Below is my working code.Here i am finding  difficulties to
> >>>>>>> implement  map operator in order to transform the union of
> >> datastreams  by
> >>>>>>> applying union method over each directory's reader before pushing
> to
> >> kafka.
> >>>>>>> List<String> s3PathList =
> >> S3Service.getListOfS3Paths(finalParameters);
> >>>>>>> s3PathList.stream()
> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
> directory,
> >>>>>>> readerParallelism))
> >>>>>>> .reduce(DataStream::union)
> >>>>>>> .map(joinedStream ->
> >> joinedStream.addSink(kafkaProducer).name("Publish
> >>>>>>> to " + kafkaTopicName));
> >>>>>>>
> >>>>>>>
> >>>>>>> *Something like this I'm trying to do in order to achieve the above
> >> use
> >>>>>>> case by applying FlatMap, it could be map as well:*
> >>>>>>> s3PathList.stream()
> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
> directory,
> >>>>>>> readerParallelism))
> >>>>>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
> >>>>>>> String>() {
> >>>>>>>      @Override
> >>>>>>>      public void flatMap(String value, Collector<String> out)
> throws
> >>>>>>> Exception {
> >>>>>>>       FinalJsonMessage m=objectMapper.readValue(value,
> >>>>>>> FinalJsonMessage.class);
> >>>>>>>       System.out.println("Json string:: ------"+m);
> >>>>>>>        //transformation logic
> >>>>>>>        out.collect(value);
> >>>>>>>      }
> >>>>>>>    })
> >>>>>>> .map(joinedStream ->
> >> joinedStream.addSink(kafkaProducer).name("Publish
> >>>>>>> to " + kafkaTopicName));
> >>>>>>> [image: image.png]
> >>>>>>> Request your support on the same.
> >>>>>>> Regards,
> >>>>>>> Satya
> >>>>>>>
> >>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <
> [hidden email]>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi @[hidden email] <[hidden email]> ,
> >>>>>>>>
> >>>>>>>> Thanks for your support, it was really helpful.
> >>>>>>>> Do you know the list of directories when you submit the job? [Yes
> we
> >>>>>>>> do have]
> >>>>>>>> The impletemation is progress and will get back to you if any
> >> further
> >>>>>>>> challenges we may face.
> >>>>>>>> Appreciate your support in this regard.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Satya
> >>>>>>>>
> >>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <
> [hidden email]>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thank you @Chesnay let me try this change .
> >>>>>>>>>
> >>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <
> >> [hidden email]>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> You could also try using streams to make it a little more
> concise:
> >>>>>>>>>>
> >>>>>>>>>> directories.stream()
> >>>>>>>>>>      .map(directory -> createInputStream(environment,
> directory))
> >>>>>>>>>>      .reduce(DataStream::union)
> >>>>>>>>>>      .map(joinedStream -> joinedStream.addSink(kafka));
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
> >>>>>>>>>>
> >>>>>>>>>> Do you know the list of directories when you submit the job?
> >>>>>>>>>>
> >>>>>>>>>> If so, then you can iterate over them, create a source for each
> >>>>>>>>>> directory, union them, and apply the sink to the union.
> >>>>>>>>>>
> >>>>>>>>>> private static
> >>>>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment
> >> environment,
> >>>>>>>>>> String directory) {
> >>>>>>>>>>      TextInputFormat format =new TextInputFormat(new
> >>>>>>>>>> org.apache.flink.core.fs.Path(directory));
> >>>>>>>>>> format.setNestedFileEnumeration(true); return
> >> environment.readFile(format,
> >>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
> >>>>>>>>>> FilePathFilter.createDefaultFilter()); }
> >>>>>>>>>>
> >>>>>>>>>> public static void runJob()throws Exception {
> >>>>>>>>>>      StreamExecutionEnvironment environment =
> >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
> List<String>
> >>>>>>>>>> directories =getDirectories(); DataStream<String> joinedStreams
> >> =null; for
> >>>>>>>>>> (String directory : directories) {
> >>>>>>>>>>         DataStream<String> inputStream
> >> =createInputStream(environment,
> >>>>>>>>>> directory); if (joinedStreams ==null) {
> >>>>>>>>>>            joinedStreams = inputStream; }else {
> >>>>>>>>>>            joinedStreams.union(inputStream); }
> >>>>>>>>>>      }
> >>>>>>>>>>      // add a sanity check that there was at least 1 directory
> >>>>>>>>>>
> >>>>>>>>>>      joinedStreams.addSink(kafka); }
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi Guys,
> >>>>>>>>>>
> >>>>>>>>>> Got stuck with it please help me here
> >>>>>>>>>> Regards,
> >>>>>>>>>> Satya
> >>>>>>>>>>
> >>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
> >>>>>>>>>> <[hidden email]> <[hidden email]> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi Guys,
> >>>>>>>>>>
> >>>>>>>>>> Sorry to bother you again, but someone could help me here? Any
> >> help
> >>>>>>>>>> in
> >>>>>>>>>> this regard will be much appreciated.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Satya
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <
> >> [hidden email]>
> >>>>>>>>>> <[hidden email]>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi Guys,
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I need one help, any leads will be highly appreciated.I have
> >> written
> >>>>>>>>>> a
> >>>>>>>>>> flink streaming job to read the data from s3 bucket and push to
> >>>>>>>>>> kafka.
> >>>>>>>>>> Below is the working source that deal with single s3 path:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> TextInputFormat format = new TextInputFormat(new
> >>>>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> format.setNestedFileEnumeration(true);
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> DataStream<String> inputStream = environment.readFile(format,
> >>>>>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE,
> >> -1,
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> FilePathFilter.createDefaultFilter());
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> inputStream.addSink(kafka);
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> But my requirement is get the list of paths and pass them one by
> >> one
> >>>>>>>>>> to
> >>>>>>>>>> this environment.readFile() method.How we can achieve this.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Satya
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> --------------------------
> >>>>>>>>>> Best Regards
> >>>>>>>>>> Satya Prakash
> >>>>>>>>>> (M)+91-9845111913
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>>
> >>>>>>>>>> --------------------------
> >>>>>>>>>> Best Regards
> >>>>>>>>>> Satya Prakash
> >>>>>>>>>> (M)+91-9845111913
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> --------------------------
> >>>>>>>>> Best Regards
> >>>>>>>>> Satya Prakash
> >>>>>>>>> (M)+91-9845111913
> >>>>>>>>>
> >>>>>>>> --
> >>>>>>>> --------------------------
> >>>>>>>> Best Regards
> >>>>>>>> Satya Prakash
> >>>>>>>> (M)+91-9845111913
> >>>>>>>>
> >>>>>>> --
> >>>>>>> --------------------------
> >>>>>>> Best Regards
> >>>>>>> Satya Prakash
> >>>>>>> (M)+91-9845111913
> >>>>>>>
> >>>>>> --
> >>>>>> --------------------------
> >>>>>> Best Regards
> >>>>>> Satya Prakash
> >>>>>> (M)+91-9845111913
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>> --
> >>>>> --------------------------
> >>>>> Best Regards
> >>>>> Satya Prakash
> >>>>> (M)+91-9845111913
> >>>>>
> >>>> --
> >>>> --------------------------
> >>>> Best Regards
> >>>> Satya Prakash
> >>>> (M)+91-9845111913
> >>>>
> >>
>
>

--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
In continuation of the above email, I have tried below code also but it is
restarting the job from the beginning.

environment.enableCheckpointing(30000L);
environment.disableOperatorChaining();
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
environment.getCheckpointConfig().setPreferCheckpointForRecovery(true);

environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, //
max failures per interval
Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate
Time.of(60, TimeUnit.SECONDS) // delay
));

Please have a look into this as well.

Regards,
Satya


On Tue, Oct 27, 2020 at 4:46 PM Satyaa Dixit <[hidden email]> wrote:

> Hi Chesnay & Team,
>
> I'm using already using "ContinuousFileMonitoringFunction"  but still I'm
> not able to achieve the below use case. For example once job started and it
> process half of the data and in between job got failed because of below
> exception. how to avoid this exception? could you please help us on this
> too.
>
>
>
>
>
> *org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception when processing split: [110]
> s3://messages/qa/test-data/DEMO/0077/data/2020-09-03/10/2020-09-03_10:01:51_53807.json
> mod@ 1599127313000 : 0 + 345 at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
> Caused by: java.lang.OutOfMemoryError: Java heap space*
>
> If I start the job back again, it is causing the data duplication. How to
> fix this negative use case with flink s3 reader source using checkpointing?
>
> Regards,
>
> Satya
>
> On Thu, Oct 22, 2020 at 7:05 PM Chesnay Schepler <[hidden email]>
> wrote:
>
>> The existing ContinuousFileMonitoringFunction and
>> ContinuousFileReaderOperator already take care of that.
>> Unless you aren't re-implementing them from scratch you shouldn't have
>> to do anything.
>>
>> On 10/22/2020 1:47 PM, Satyaa Dixit wrote:
>> > Hi Chesnay,
>> > Thanks for your support.It helped a lot. I need one more help on how to
>> do
>> > checkpointing as part of the s3 reader source in case if some failure
>> > happens due to OutOfMemoryError exception or it could be any other
>> failure,
>> > and want to recover the data from last reader splitted offset during
>> > restart the job in continuation of the previous job in order to avoid
>> > duplicate data.
>> >
>> > Thanks,
>> > Satya
>> >
>> > On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler <[hidden email]>
>> wrote:
>> >
>> >> hmm...I don't see an easy way.
>> >> You may have to replicated StreamExecutionEnvironment#createFileInput
>> and
>> >> create a custom ContinuousFileMonitoringFunction that ignores missing
>> >> files in it's run() method.
>> >>
>> >> Alternatively, use some library to check the existence of the S3
>> >> directories before creating the sources.
>> >>
>> >> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
>> >>> Hi Chesnay/Team,
>> >>>
>> >>> Thanks, we got the fix for our problem but got stuck with the below
>> >> issue,
>> >>> request your support.
>> >>>
>> >>>
>> >>> How to catch FileNotFoundException during runtime,if any directory is
>> >>> missing in s3 as part of the below source code to avoid job failure.
>> >>>
>> >>>
>> >>> s3PathList.stream().map(directory ->
>> >>> S3Service.createInputStream(environment, directory,
>> readerParallelism))
>> >>> .reduce(DataStream::union).map(joinedStream ->
>> >>> joinedStream.addSink(kafkaProducer));
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> Regards,
>> >>>
>> >>> Satya
>> >>>
>> >>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <[hidden email]>
>> >> wrote:
>> >>>> Hi Chesnay/Team
>> >>>>
>> >>>> Thank you so much.I have tried with the solution but it is not
>> working
>> >> as
>> >>>> expected showing compilation issues and tried all the ways .Please
>> find
>> >>>> below code snippet :
>> >>>>
>> >>>> s3PathList.stream()
>> >>>> .map(directory -> S3Service.customCreateInputStream(environment,
>> >>>> directory, readerParallelism))
>> >>>> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
>> >>>> IntermidiateOperator()).map(joinedStream ->
>> >>>> joinedStream.addSink(kafkaProducer).name("Publish to " +
>> >> kafkaTopicName));
>> >>>> public static class IntermidiateOperator implements
>> >>>> FlatMapFunction<String, String> {
>> >>>> private static final ObjectMapper objectMapper1 = new ObjectMapper();
>> >>>>
>> >>>> @Override
>> >>>> public void flatMap(String value, Collector<String> out) throws
>> >> Exception {
>> >>>> Test m = objectMapper1.readValue(value, Test.class);
>> >>>> System.out.println("Json string:: ------" + m);
>> >>>> // logger.info("Json string:: ------"+m);
>> >>>> out.collect(value);
>> >>>> }
>> >>>> }
>> >>>>
>> >>>> Also just to clarify one doubt , How to handle
>> *FileNotFoundException*
>> >> as
>> >>>> part of flink reader during runtime if in case directory is not
>> >> available
>> >>>> in s3. How to avoid job failure in that use case.
>> >>>>
>> >>>> Regards,
>> >>>> Satya
>> >>>>
>> >>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <[hidden email]
>> >
>> >>>> wrote:
>> >>>>
>> >>>>> Thanks, I'll check it out.
>> >>>>>
>> >>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <
>> [hidden email]>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> 1) There's no mechanism in the API to restrict the number of
>> number
>> >> of
>> >>>>>> readers across several sources. I can't quite think of a way to
>> >> achieve
>> >>>>>> this; maybe Kostas has an idea.
>> >>>>>>
>> >>>>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
>> >>>>>>
>> >>>>>> Try this:
>> >>>>>>
>> >>>>>> s3PathList.stream()
>> >>>>>> .map(...)
>> >>>>>> .reduce(...)
>> >>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>> >>>>>> .map(joinedStream->  joinedStream.addSink...)
>> >>>>>>
>> >>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>> >>>>>>
>> >>>>>> Hi Team,
>> >>>>>>
>> >>>>>> Could you please help me here. I’m sorry for asking on such short
>> >> notice
>> >>>>>> but my work has stopped due to this.
>> >>>>>>
>> >>>>>>
>> >>>>>> Regards,
>> >>>>>> Satya
>> >>>>>>
>> >>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <[hidden email]
>> >
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Hi  Shesnay/Team,
>> >>>>>>>
>> >>>>>>> Thank you so much for the reply.In the continuation of the
>> previous
>> >>>>>>> email, below is the block diagram where I am reading the file
>> from
>> >> s3 and
>> >>>>>>> pushing it to kafka.Now with the current setup, I have total 4
>> >> directory
>> >>>>>>> based on the readfile method  from flink environment ,we are
>> >> creating 4
>> >>>>>>> readers parallely to process the data from s3 .
>> >>>>>>>
>> >>>>>>> Below are my Questions:
>> >>>>>>> 1. Can we restrict the no. of readers to process the  data
>> parallely.
>> >>>>>>> e.g let's say if  we have a thousand of directory , in that case i
>> >> want to
>> >>>>>>> restrict the no. of readers to 10 and ten parallel threads will
>> >> continue
>> >>>>>>> with 100 sequential reading of the directory per thread to consume
>> >> the data
>> >>>>>>> .
>> >>>>>>>
>> >>>>>>> 2.In between the two flink operators i.e s3 reader and kafka sink
>> , i
>> >>>>>>> just want to implement one more operator in order to transform the
>> >> data
>> >>>>>>> which i am reading from s3 bucket and then want to push into the
>> >> kafka
>> >>>>>>> sink. Below is my working code.Here i am finding  difficulties to
>> >>>>>>> implement  map operator in order to transform the union of
>> >> datastreams  by
>> >>>>>>> applying union method over each directory's reader before pushing
>> to
>> >> kafka.
>> >>>>>>> List<String> s3PathList =
>> >> S3Service.getListOfS3Paths(finalParameters);
>> >>>>>>> s3PathList.stream()
>> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
>> directory,
>> >>>>>>> readerParallelism))
>> >>>>>>> .reduce(DataStream::union)
>> >>>>>>> .map(joinedStream ->
>> >> joinedStream.addSink(kafkaProducer).name("Publish
>> >>>>>>> to " + kafkaTopicName));
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> *Something like this I'm trying to do in order to achieve the
>> above
>> >> use
>> >>>>>>> case by applying FlatMap, it could be map as well:*
>> >>>>>>> s3PathList.stream()
>> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
>> directory,
>> >>>>>>> readerParallelism))
>> >>>>>>> .reduce(DataStream::union).flatMap(new FlatMapFunction<DataStream,
>> >>>>>>> String>() {
>> >>>>>>>      @Override
>> >>>>>>>      public void flatMap(String value, Collector<String> out)
>> throws
>> >>>>>>> Exception {
>> >>>>>>>       FinalJsonMessage m=objectMapper.readValue(value,
>> >>>>>>> FinalJsonMessage.class);
>> >>>>>>>       System.out.println("Json string:: ------"+m);
>> >>>>>>>        //transformation logic
>> >>>>>>>        out.collect(value);
>> >>>>>>>      }
>> >>>>>>>    })
>> >>>>>>> .map(joinedStream ->
>> >> joinedStream.addSink(kafkaProducer).name("Publish
>> >>>>>>> to " + kafkaTopicName));
>> >>>>>>> [image: image.png]
>> >>>>>>> Request your support on the same.
>> >>>>>>> Regards,
>> >>>>>>> Satya
>> >>>>>>>
>> >>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <
>> [hidden email]>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi @[hidden email] <[hidden email]> ,
>> >>>>>>>>
>> >>>>>>>> Thanks for your support, it was really helpful.
>> >>>>>>>> Do you know the list of directories when you submit the job?
>> [Yes we
>> >>>>>>>> do have]
>> >>>>>>>> The impletemation is progress and will get back to you if any
>> >> further
>> >>>>>>>> challenges we may face.
>> >>>>>>>> Appreciate your support in this regard.
>> >>>>>>>>
>> >>>>>>>> Regards,
>> >>>>>>>> Satya
>> >>>>>>>>
>> >>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <
>> [hidden email]>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Thank you @Chesnay let me try this change .
>> >>>>>>>>>
>> >>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <
>> >> [hidden email]>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> You could also try using streams to make it a little more
>> concise:
>> >>>>>>>>>>
>> >>>>>>>>>> directories.stream()
>> >>>>>>>>>>      .map(directory -> createInputStream(environment,
>> directory))
>> >>>>>>>>>>      .reduce(DataStream::union)
>> >>>>>>>>>>      .map(joinedStream -> joinedStream.addSink(kafka));
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Do you know the list of directories when you submit the job?
>> >>>>>>>>>>
>> >>>>>>>>>> If so, then you can iterate over them, create a source for each
>> >>>>>>>>>> directory, union them, and apply the sink to the union.
>> >>>>>>>>>>
>> >>>>>>>>>> private static
>> >>>>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment
>> >> environment,
>> >>>>>>>>>> String directory) {
>> >>>>>>>>>>      TextInputFormat format =new TextInputFormat(new
>> >>>>>>>>>> org.apache.flink.core.fs.Path(directory));
>> >>>>>>>>>> format.setNestedFileEnumeration(true); return
>> >> environment.readFile(format,
>> >>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>> >>>>>>>>>> FilePathFilter.createDefaultFilter()); }
>> >>>>>>>>>>
>> >>>>>>>>>> public static void runJob()throws Exception {
>> >>>>>>>>>>      StreamExecutionEnvironment environment =
>> >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>> List<String>
>> >>>>>>>>>> directories =getDirectories(); DataStream<String> joinedStreams
>> >> =null; for
>> >>>>>>>>>> (String directory : directories) {
>> >>>>>>>>>>         DataStream<String> inputStream
>> >> =createInputStream(environment,
>> >>>>>>>>>> directory); if (joinedStreams ==null) {
>> >>>>>>>>>>            joinedStreams = inputStream; }else {
>> >>>>>>>>>>            joinedStreams.union(inputStream); }
>> >>>>>>>>>>      }
>> >>>>>>>>>>      // add a sanity check that there was at least 1 directory
>> >>>>>>>>>>
>> >>>>>>>>>>      joinedStreams.addSink(kafka); }
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Guys,
>> >>>>>>>>>>
>> >>>>>>>>>> Got stuck with it please help me here
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Satya
>> >>>>>>>>>>
>> >>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>> >>>>>>>>>> <[hidden email]> <[hidden email]> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Guys,
>> >>>>>>>>>>
>> >>>>>>>>>> Sorry to bother you again, but someone could help me here? Any
>> >> help
>> >>>>>>>>>> in
>> >>>>>>>>>> this regard will be much appreciated.
>> >>>>>>>>>>
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Satya
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <
>> >> [hidden email]>
>> >>>>>>>>>> <[hidden email]>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Guys,
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> I need one help, any leads will be highly appreciated.I have
>> >> written
>> >>>>>>>>>> a
>> >>>>>>>>>> flink streaming job to read the data from s3 bucket and push to
>> >>>>>>>>>> kafka.
>> >>>>>>>>>> Below is the working source that deal with single s3 path:
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> TextInputFormat format = new TextInputFormat(new
>> >>>>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> format.setNestedFileEnumeration(true);
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> DataStream<String> inputStream = environment.readFile(format,
>> >>>>>>>>>> "s3a://directory/2020-09-03/", FileProcessingMode.PROCESS_ONCE,
>> >> -1,
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> FilePathFilter.createDefaultFilter());
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> inputStream.addSink(kafka);
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> But my requirement is get the list of paths and pass them one
>> by
>> >> one
>> >>>>>>>>>> to
>> >>>>>>>>>> this environment.readFile() method.How we can achieve this.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks,
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Satya
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> --
>> >>>>>>>>>> --------------------------
>> >>>>>>>>>> Best Regards
>> >>>>>>>>>> Satya Prakash
>> >>>>>>>>>> (M)+91-9845111913
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> --
>> >>>>>>>>>>
>> >>>>>>>>>> --------------------------
>> >>>>>>>>>> Best Regards
>> >>>>>>>>>> Satya Prakash
>> >>>>>>>>>> (M)+91-9845111913
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>> --
>> >>>>>>>>> --------------------------
>> >>>>>>>>> Best Regards
>> >>>>>>>>> Satya Prakash
>> >>>>>>>>> (M)+91-9845111913
>> >>>>>>>>>
>> >>>>>>>> --
>> >>>>>>>> --------------------------
>> >>>>>>>> Best Regards
>> >>>>>>>> Satya Prakash
>> >>>>>>>> (M)+91-9845111913
>> >>>>>>>>
>> >>>>>>> --
>> >>>>>>> --------------------------
>> >>>>>>> Best Regards
>> >>>>>>> Satya Prakash
>> >>>>>>> (M)+91-9845111913
>> >>>>>>>
>> >>>>>> --
>> >>>>>> --------------------------
>> >>>>>> Best Regards
>> >>>>>> Satya Prakash
>> >>>>>> (M)+91-9845111913
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>> --
>> >>>>> --------------------------
>> >>>>> Best Regards
>> >>>>> Satya Prakash
>> >>>>> (M)+91-9845111913
>> >>>>>
>> >>>> --
>> >>>> --------------------------
>> >>>> Best Regards
>> >>>> Satya Prakash
>> >>>> (M)+91-9845111913
>> >>>>
>> >>
>>
>>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>


--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913
Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Chesnay Schepler-3
To fix the OutOfMemory error you will have to provide Flink with more
memory, use more task executors or possibly reduce the parallelism.

Did the job fail before the first checkpoint has occurred?

What sink are you using?

On 10/27/2020 12:45 PM, Satyaa Dixit wrote:

> In continuation of the above email, I have tried below code also but
> it is restarting the job from the beginning.
>
> environment.enableCheckpointing(30000L);
> environment.disableOperatorChaining();
> environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> environment.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>
> environment.setRestartStrategy(RestartStrategies.failureRateRestart(5,
> // max failures per interval
> Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate
> Time.of(60, TimeUnit.SECONDS) // delay
> ));
>
> Please have a look into this as well.
>
> Regards,
> Satya
>
> On Tue, Oct 27, 2020 at 4:46 PM Satyaa Dixit <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Chesnay & Team,
>
>     I'm using already using "ContinuousFileMonitoringFunction"  but
>     still I'm not able to achieve the below use case. For example once
>     job started and it process half of the data and in between job got
>     failed because of below exception. how to avoid this exception?
>     could you please help us on this too.
>
>     *org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>     Caught exception when processing split: [110]
>     s3://messages/qa/test-data/DEMO/0077/data/2020-09-03/10/2020-09-03_10:01:51_53807.jsonmod@
>     1599127313000 : 0 + 345
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>     at
>     org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
>     Caused by: java.lang.OutOfMemoryError: Java heap space*
>
>     If I start the job back again, it is causing the data duplication.
>     How to fix this negative use case with flink s3 reader source
>     using checkpointing?
>
>     Regards,
>
>     Satya
>
>
>     On Thu, Oct 22, 2020 at 7:05 PM Chesnay Schepler
>     <[hidden email] <mailto:[hidden email]>> wrote:
>
>         The existing ContinuousFileMonitoringFunction and
>         ContinuousFileReaderOperator already take care of that.
>         Unless you aren't re-implementing them from scratch you
>         shouldn't have
>         to do anything.
>
>         On 10/22/2020 1:47 PM, Satyaa Dixit wrote:
>         > Hi Chesnay,
>         > Thanks for your support.It helped a lot. I need one more
>         help on how to do
>         > checkpointing as part of the s3 reader source in case if
>         some failure
>         > happens due to OutOfMemoryError exception or it could be any
>         other failure,
>         > and want to recover the data from last reader splitted
>         offset during
>         > restart the job in continuation of the previous job in order
>         to avoid
>         > duplicate data.
>         >
>         > Thanks,
>         > Satya
>         >
>         > On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler
>         <[hidden email] <mailto:[hidden email]>> wrote:
>         >
>         >> hmm...I don't see an easy way.
>         >> You may have to replicated
>         StreamExecutionEnvironment#createFileInput and
>         >> create a custom ContinuousFileMonitoringFunction that
>         ignores missing
>         >> files in it's run() method.
>         >>
>         >> Alternatively, use some library to check the existence of
>         the S3
>         >> directories before creating the sources.
>         >>
>         >> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
>         >>> Hi Chesnay/Team,
>         >>>
>         >>> Thanks, we got the fix for our problem but got stuck with
>         the below
>         >> issue,
>         >>> request your support.
>         >>>
>         >>>
>         >>> How to catch FileNotFoundException during runtime,if any
>         directory is
>         >>> missing in s3 as part of the below source code to avoid
>         job failure.
>         >>>
>         >>>
>         >>> s3PathList.stream().map(directory ->
>         >>> S3Service.createInputStream(environment, directory,
>         readerParallelism))
>         >>> .reduce(DataStream::union).map(joinedStream ->
>         >>> joinedStream.addSink(kafkaProducer));
>         >>>
>         >>>
>         >>>
>         >>>
>         >>>
>         >>> Regards,
>         >>>
>         >>> Satya
>         >>>
>         >>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit
>         <[hidden email] <mailto:[hidden email]>>
>         >> wrote:
>         >>>> Hi Chesnay/Team
>         >>>>
>         >>>> Thank you so much.I have tried with the solution but it
>         is not working
>         >> as
>         >>>> expected showing compilation issues and tried all the
>         ways .Please find
>         >>>> below code snippet :
>         >>>>
>         >>>> s3PathList.stream()
>         >>>> .map(directory ->
>         S3Service.customCreateInputStream(environment,
>         >>>> directory, readerParallelism))
>         >>>> .reduce(DataStream::union).map(joinedStream ->
>         stream.flatMap(new
>         >>>> IntermidiateOperator()).map(joinedStream ->
>         >>>> joinedStream.addSink(kafkaProducer).name("Publish to " +
>         >> kafkaTopicName));
>         >>>> public static class IntermidiateOperator implements
>         >>>> FlatMapFunction<String, String> {
>         >>>> private static final ObjectMapper objectMapper1 = new
>         ObjectMapper();
>         >>>>
>         >>>> @Override
>         >>>> public void flatMap(String value, Collector<String> out)
>         throws
>         >> Exception {
>         >>>> Test m = objectMapper1.readValue(value, Test.class);
>         >>>> System.out.println("Json string:: ------" + m);
>         >>>> // logger.info <http://logger.info>("Json string::
>         ------"+m);
>         >>>> out.collect(value);
>         >>>> }
>         >>>> }
>         >>>>
>         >>>> Also just to clarify one doubt , How to handle
>         *FileNotFoundException*
>         >> as
>         >>>> part of flink reader during runtime if in case directory
>         is not
>         >> available
>         >>>> in s3. How to avoid job failure in that use case.
>         >>>>
>         >>>> Regards,
>         >>>> Satya
>         >>>>
>         >>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit
>         <[hidden email] <mailto:[hidden email]>>
>         >>>> wrote:
>         >>>>
>         >>>>> Thanks, I'll check it out.
>         >>>>>
>         >>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler
>         <[hidden email] <mailto:[hidden email]>>
>         >>>>> wrote:
>         >>>>>
>         >>>>>> 1) There's no mechanism in the API to restrict the
>         number of  number
>         >> of
>         >>>>>> readers across several sources. I can't quite think of
>         a way to
>         >> achieve
>         >>>>>> this; maybe Kostas has an idea.
>         >>>>>>
>         >>>>>> 2) You're mixing  up the Java Streams and Finks
>         DataStream API.
>         >>>>>>
>         >>>>>> Try this:
>         >>>>>>
>         >>>>>> s3PathList.stream()
>         >>>>>> .map(...)
>         >>>>>> .reduce(...)
>         >>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>         >>>>>> .map(joinedStream-> joinedStream.addSink...)
>         >>>>>>
>         >>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>         >>>>>>
>         >>>>>> Hi Team,
>         >>>>>>
>         >>>>>> Could you please help me here. I’m sorry for asking on
>         such short
>         >> notice
>         >>>>>> but my work has stopped due to this.
>         >>>>>>
>         >>>>>>
>         >>>>>> Regards,
>         >>>>>> Satya
>         >>>>>>
>         >>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit
>         <[hidden email] <mailto:[hidden email]>>
>         >>>>>> wrote:
>         >>>>>>
>         >>>>>>> Hi  Shesnay/Team,
>         >>>>>>>
>         >>>>>>> Thank you so much for the reply.In the continuation of
>         the previous
>         >>>>>>> email, below is the block diagram where I am reading
>         the file  from
>         >> s3 and
>         >>>>>>> pushing it to kafka.Now with the current setup, I have
>         total 4
>         >> directory
>         >>>>>>> based on the readfile method from flink environment
>         ,we are
>         >> creating 4
>         >>>>>>> readers parallely to process the data from s3 .
>         >>>>>>>
>         >>>>>>> Below are my Questions:
>         >>>>>>> 1. Can we restrict the no. of readers to process the 
>         data parallely.
>         >>>>>>> e.g let's say if  we have a thousand of directory , in
>         that case i
>         >> want to
>         >>>>>>> restrict the no. of readers to 10 and ten parallel
>         threads will
>         >> continue
>         >>>>>>> with 100 sequential reading of the directory per
>         thread to consume
>         >> the data
>         >>>>>>> .
>         >>>>>>>
>         >>>>>>> 2.In between the two flink operators i.e s3 reader and
>         kafka sink , i
>         >>>>>>> just want to implement one more operator in order to
>         transform the
>         >> data
>         >>>>>>> which i am reading from s3 bucket and then want to
>         push into the
>         >> kafka
>         >>>>>>> sink. Below is my working code.Here i am finding 
>         difficulties to
>         >>>>>>> implement  map operator in order to transform the union of
>         >> datastreams  by
>         >>>>>>> applying union method over each directory's reader
>         before pushing to
>         >> kafka.
>         >>>>>>> List<String> s3PathList =
>         >> S3Service.getListOfS3Paths(finalParameters);
>         >>>>>>> s3PathList.stream()
>         >>>>>>> .map(directory ->
>         S3Service.customInputStream(environment, directory,
>         >>>>>>> readerParallelism))
>         >>>>>>> .reduce(DataStream::union)
>         >>>>>>> .map(joinedStream ->
>         >> joinedStream.addSink(kafkaProducer).name("Publish
>         >>>>>>> to " + kafkaTopicName));
>         >>>>>>>
>         >>>>>>>
>         >>>>>>> *Something like this I'm trying to do in order to
>         achieve the above
>         >> use
>         >>>>>>> case by applying FlatMap, it could be map as well:*
>         >>>>>>> s3PathList.stream()
>         >>>>>>> .map(directory ->
>         S3Service.customInputStream(environment, directory,
>         >>>>>>> readerParallelism))
>         >>>>>>> .reduce(DataStream::union).flatMap(new
>         FlatMapFunction<DataStream,
>         >>>>>>> String>() {
>         >>>>>>>      @Override
>         >>>>>>>      public void flatMap(String value,
>         Collector<String> out) throws
>         >>>>>>> Exception {
>         >>>>>>>       FinalJsonMessage m=objectMapper.readValue(value,
>         >>>>>>> FinalJsonMessage.class);
>         >>>>>>>  System.out.println("Json string:: ------"+m);
>         >>>>>>>        //transformation logic
>         >>>>>>>        out.collect(value);
>         >>>>>>>      }
>         >>>>>>>    })
>         >>>>>>> .map(joinedStream ->
>         >> joinedStream.addSink(kafkaProducer).name("Publish
>         >>>>>>> to " + kafkaTopicName));
>         >>>>>>> [image: image.png]
>         >>>>>>> Request your support on the same.
>         >>>>>>> Regards,
>         >>>>>>> Satya
>         >>>>>>>
>         >>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit
>         <[hidden email] <mailto:[hidden email]>>
>         >>>>>>> wrote:
>         >>>>>>>
>         >>>>>>>> Hi @[hidden email] <mailto:[hidden email]>
>         <[hidden email] <mailto:[hidden email]>> ,
>         >>>>>>>>
>         >>>>>>>> Thanks for your support, it was really helpful.
>         >>>>>>>> Do you know the list of directories when you submit
>         the job? [Yes we
>         >>>>>>>> do have]
>         >>>>>>>> The impletemation is progress and will get back to
>         you if any
>         >> further
>         >>>>>>>> challenges we may face.
>         >>>>>>>> Appreciate your support in this regard.
>         >>>>>>>>
>         >>>>>>>> Regards,
>         >>>>>>>> Satya
>         >>>>>>>>
>         >>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit
>         <[hidden email] <mailto:[hidden email]>>
>         >>>>>>>> wrote:
>         >>>>>>>>
>         >>>>>>>>> Thank you @Chesnay let me try this change .
>         >>>>>>>>>
>         >>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <
>         >> [hidden email] <mailto:[hidden email]>>
>         >>>>>>>>> wrote:
>         >>>>>>>>>
>         >>>>>>>>>> You could also try using streams to make it a
>         little more concise:
>         >>>>>>>>>>
>         >>>>>>>>>> directories.stream()
>         >>>>>>>>>> .map(directory -> createInputStream(environment,
>         directory))
>         >>>>>>>>>> .reduce(DataStream::union)
>         >>>>>>>>>> .map(joinedStream -> joinedStream.addSink(kafka));
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>         >>>>>>>>>>
>         >>>>>>>>>> Do you know the list of directories when you submit
>         the job?
>         >>>>>>>>>>
>         >>>>>>>>>> If so, then you can iterate over them, create a
>         source for each
>         >>>>>>>>>> directory, union them, and apply the sink to the union.
>         >>>>>>>>>>
>         >>>>>>>>>> private static
>         >>>>>>>>>>
>         DataStream<String>createInputStream(StreamExecutionEnvironment
>         >> environment,
>         >>>>>>>>>> String directory) {
>         >>>>>>>>>> TextInputFormat format =new TextInputFormat(new
>         >>>>>>>>>> org.apache.flink.core.fs.Path(directory));
>         >>>>>>>>>> format.setNestedFileEnumeration(true); return
>         >> environment.readFile(format,
>         >>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>         >>>>>>>>>> FilePathFilter.createDefaultFilter()); }
>         >>>>>>>>>>
>         >>>>>>>>>> public static void runJob()throws Exception {
>         >>>>>>>>>> StreamExecutionEnvironment environment =
>         >>>>>>>>>>
>         StreamExecutionEnvironment.getExecutionEnvironment(); List<String>
>         >>>>>>>>>> directories =getDirectories(); DataStream<String>
>         joinedStreams
>         >> =null; for
>         >>>>>>>>>> (String directory : directories) {
>         >>>>>>>>>>  DataStream<String> inputStream
>         >> =createInputStream(environment,
>         >>>>>>>>>> directory); if (joinedStreams ==null) {
>         >>>>>>>>>> joinedStreams = inputStream; }else {
>         >>>>>>>>>> joinedStreams.union(inputStream); }
>         >>>>>>>>>>      }
>         >>>>>>>>>>      // add a sanity check that there was at least
>         1 directory
>         >>>>>>>>>>
>         >>>>>>>>>> joinedStreams.addSink(kafka); }
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>         >>>>>>>>>>
>         >>>>>>>>>> Hi Guys,
>         >>>>>>>>>>
>         >>>>>>>>>> Got stuck with it please help me here
>         >>>>>>>>>> Regards,
>         >>>>>>>>>> Satya
>         >>>>>>>>>>
>         >>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>         >>>>>>>>>> <[hidden email]
>         <mailto:[hidden email]>> <[hidden email]
>         <mailto:[hidden email]>> wrote:
>         >>>>>>>>>>
>         >>>>>>>>>> Hi Guys,
>         >>>>>>>>>>
>         >>>>>>>>>> Sorry to bother you again, but someone could help
>         me here? Any
>         >> help
>         >>>>>>>>>> in
>         >>>>>>>>>> this regard will be much appreciated.
>         >>>>>>>>>>
>         >>>>>>>>>> Regards,
>         >>>>>>>>>> Satya
>         >>>>>>>>>>
>         >>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <
>         >> [hidden email] <mailto:[hidden email]>>
>         >>>>>>>>>> <[hidden email] <mailto:[hidden email]>>
>         >>>>>>>>>> wrote:
>         >>>>>>>>>>
>         >>>>>>>>>> Hi Guys,
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> I need one help, any leads will be highly
>         appreciated.I have
>         >> written
>         >>>>>>>>>> a
>         >>>>>>>>>> flink streaming job to read the data from s3 bucket
>         and push to
>         >>>>>>>>>> kafka.
>         >>>>>>>>>> Below is the working source that deal with single
>         s3 path:
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> TextInputFormat format = new TextInputFormat(new
>         >>>>>>>>>>
>         org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> format.setNestedFileEnumeration(true);
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> DataStream<String> inputStream =
>         environment.readFile(format,
>         >>>>>>>>>> "s3a://directory/2020-09-03/",
>         FileProcessingMode.PROCESS_ONCE,
>         >> -1,
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> FilePathFilter.createDefaultFilter());
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> inputStream.addSink(kafka);
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> But my requirement is get the list of paths and
>         pass them one by
>         >> one
>         >>>>>>>>>> to
>         >>>>>>>>>> this environment.readFile() method.How we can
>         achieve this.
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> Thanks,
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> Satya
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> --
>         >>>>>>>>>> --------------------------
>         >>>>>>>>>> Best Regards
>         >>>>>>>>>> Satya Prakash
>         >>>>>>>>>> (M)+91-9845111913
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>> --
>         >>>>>>>>>>
>         >>>>>>>>>> --------------------------
>         >>>>>>>>>> Best Regards
>         >>>>>>>>>> Satya Prakash
>         >>>>>>>>>> (M)+91-9845111913
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>>>
>         >>>>>>>>> --
>         >>>>>>>>> --------------------------
>         >>>>>>>>> Best Regards
>         >>>>>>>>> Satya Prakash
>         >>>>>>>>> (M)+91-9845111913
>         >>>>>>>>>
>         >>>>>>>> --
>         >>>>>>>> --------------------------
>         >>>>>>>> Best Regards
>         >>>>>>>> Satya Prakash
>         >>>>>>>> (M)+91-9845111913
>         >>>>>>>>
>         >>>>>>> --
>         >>>>>>> --------------------------
>         >>>>>>> Best Regards
>         >>>>>>> Satya Prakash
>         >>>>>>> (M)+91-9845111913
>         >>>>>>>
>         >>>>>> --
>         >>>>>> --------------------------
>         >>>>>> Best Regards
>         >>>>>> Satya Prakash
>         >>>>>> (M)+91-9845111913
>         >>>>>>
>         >>>>>>
>         >>>>>>
>         >>>>> --
>         >>>>> --------------------------
>         >>>>> Best Regards
>         >>>>> Satya Prakash
>         >>>>> (M)+91-9845111913
>         >>>>>
>         >>>> --
>         >>>> --------------------------
>         >>>> Best Regards
>         >>>> Satya Prakash
>         >>>> (M)+91-9845111913
>         >>>>
>         >>
>
>
>
>     --
>     --------------------------
>     Best Regards
>     Satya Prakash
>     (M)+91-9845111913
>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913


Reply | Threaded
Open this post in threaded view
|

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

Satyaa Dixit
Hi Chesnay,

Thanks for your prompt response.
Sink : Kafka
After the first checkpointing of the reader job failed, that's where
duplication happened.

Can we use the external checkpointing by using below code snipped? as part
of reader source.
*environment.setStateBackend(new
FsStateBackend("s3://flink-test-directory/flink/checkpoints")); *

Regards,
Satya


On Tue, Oct 27, 2020 at 5:36 PM Chesnay Schepler <[hidden email]> wrote:

> To fix the OutOfMemory error you will have to provide Flink with more
> memory, use more task executors or possibly reduce the parallelism.
>
> Did the job fail before the first checkpoint has occurred?
>
> What sink are you using?
>
> On 10/27/2020 12:45 PM, Satyaa Dixit wrote:
>
> In continuation of the above email, I have tried below code also but it is
> restarting the job from the beginning.
>
> environment.enableCheckpointing(30000L);
> environment.disableOperatorChaining();
> environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>
> environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> environment.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>
> environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, //
> max failures per interval
> Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate
> Time.of(60, TimeUnit.SECONDS) // delay
> ));
>
> Please have a look into this as well.
>
> Regards,
> Satya
>
>
> On Tue, Oct 27, 2020 at 4:46 PM Satyaa Dixit <[hidden email]>
> wrote:
>
>> Hi Chesnay & Team,
>>
>> I'm using already using "ContinuousFileMonitoringFunction"  but still
>> I'm not able to achieve the below use case. For example once job started
>> and it process half of the data and in between job got failed because of
>> below exception. how to avoid this exception? could you please help us on
>> this too.
>>
>>
>>
>>
>>
>> *org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> exception when processing split: [110]
>> s3://messages/qa/test-data/DEMO/0077/data/2020-09-03/10/2020-09-03_10:01:51_53807.jsonmod@
>> 1599127313000 : 0 + 345 at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
>> Caused by: java.lang.OutOfMemoryError: Java heap space*
>>
>> If I start the job back again, it is causing the data duplication. How to
>> fix this negative use case with flink s3 reader source using checkpointing?
>>
>> Regards,
>>
>> Satya
>>
>> On Thu, Oct 22, 2020 at 7:05 PM Chesnay Schepler <[hidden email]>
>> wrote:
>>
>>> The existing ContinuousFileMonitoringFunction and
>>> ContinuousFileReaderOperator already take care of that.
>>> Unless you aren't re-implementing them from scratch you shouldn't have
>>> to do anything.
>>>
>>> On 10/22/2020 1:47 PM, Satyaa Dixit wrote:
>>> > Hi Chesnay,
>>> > Thanks for your support.It helped a lot. I need one more help on how
>>> to do
>>> > checkpointing as part of the s3 reader source in case if some failure
>>> > happens due to OutOfMemoryError exception or it could be any other
>>> failure,
>>> > and want to recover the data from last reader splitted offset during
>>> > restart the job in continuation of the previous job in order to avoid
>>> > duplicate data.
>>> >
>>> > Thanks,
>>> > Satya
>>> >
>>> > On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler <[hidden email]>
>>> wrote:
>>> >
>>> >> hmm...I don't see an easy way.
>>> >> You may have to replicated StreamExecutionEnvironment#createFileInput
>>> and
>>> >> create a custom ContinuousFileMonitoringFunction that ignores missing
>>> >> files in it's run() method.
>>> >>
>>> >> Alternatively, use some library to check the existence of the S3
>>> >> directories before creating the sources.
>>> >>
>>> >> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
>>> >>> Hi Chesnay/Team,
>>> >>>
>>> >>> Thanks, we got the fix for our problem but got stuck with the below
>>> >> issue,
>>> >>> request your support.
>>> >>>
>>> >>>
>>> >>> How to catch FileNotFoundException during runtime,if any directory is
>>> >>> missing in s3 as part of the below source code to avoid job failure.
>>> >>>
>>> >>>
>>> >>> s3PathList.stream().map(directory ->
>>> >>> S3Service.createInputStream(environment, directory,
>>> readerParallelism))
>>> >>> .reduce(DataStream::union).map(joinedStream ->
>>> >>> joinedStream.addSink(kafkaProducer));
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> Regards,
>>> >>>
>>> >>> Satya
>>> >>>
>>> >>> On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit <[hidden email]>
>>> >> wrote:
>>> >>>> Hi Chesnay/Team
>>> >>>>
>>> >>>> Thank you so much.I have tried with the solution but it is not
>>> working
>>> >> as
>>> >>>> expected showing compilation issues and tried all the ways .Please
>>> find
>>> >>>> below code snippet :
>>> >>>>
>>> >>>> s3PathList.stream()
>>> >>>> .map(directory -> S3Service.customCreateInputStream(environment,
>>> >>>> directory, readerParallelism))
>>> >>>> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
>>> >>>> IntermidiateOperator()).map(joinedStream ->
>>> >>>> joinedStream.addSink(kafkaProducer).name("Publish to " +
>>> >> kafkaTopicName));
>>> >>>> public static class IntermidiateOperator implements
>>> >>>> FlatMapFunction<String, String> {
>>> >>>> private static final ObjectMapper objectMapper1 = new
>>> ObjectMapper();
>>> >>>>
>>> >>>> @Override
>>> >>>> public void flatMap(String value, Collector<String> out) throws
>>> >> Exception {
>>> >>>> Test m = objectMapper1.readValue(value, Test.class);
>>> >>>> System.out.println("Json string:: ------" + m);
>>> >>>> // logger.info("Json string:: ------"+m);
>>> >>>> out.collect(value);
>>> >>>> }
>>> >>>> }
>>> >>>>
>>> >>>> Also just to clarify one doubt , How to handle
>>> *FileNotFoundException*
>>> >> as
>>> >>>> part of flink reader during runtime if in case directory is not
>>> >> available
>>> >>>> in s3. How to avoid job failure in that use case.
>>> >>>>
>>> >>>> Regards,
>>> >>>> Satya
>>> >>>>
>>> >>>> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit <
>>> [hidden email]>
>>> >>>> wrote:
>>> >>>>
>>> >>>>> Thanks, I'll check it out.
>>> >>>>>
>>> >>>>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler <
>>> [hidden email]>
>>> >>>>> wrote:
>>> >>>>>
>>> >>>>>> 1) There's no mechanism in the API to restrict the number of
>>> number
>>> >> of
>>> >>>>>> readers across several sources. I can't quite think of a way to
>>> >> achieve
>>> >>>>>> this; maybe Kostas has an idea.
>>> >>>>>>
>>> >>>>>> 2) You're mixing  up the Java Streams and Finks DataStream API.
>>> >>>>>>
>>> >>>>>> Try this:
>>> >>>>>>
>>> >>>>>> s3PathList.stream()
>>> >>>>>> .map(...)
>>> >>>>>> .reduce(...)
>>> >>>>>> .map(joinedStream -> stream.map(new FlatMapFunction...))
>>> >>>>>> .map(joinedStream->  joinedStream.addSink...)
>>> >>>>>>
>>> >>>>>> On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
>>> >>>>>>
>>> >>>>>> Hi Team,
>>> >>>>>>
>>> >>>>>> Could you please help me here. I’m sorry for asking on such short
>>> >> notice
>>> >>>>>> but my work has stopped due to this.
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> Regards,
>>> >>>>>> Satya
>>> >>>>>>
>>> >>>>>> On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit <
>>> [hidden email]>
>>> >>>>>> wrote:
>>> >>>>>>
>>> >>>>>>> Hi  Shesnay/Team,
>>> >>>>>>>
>>> >>>>>>> Thank you so much for the reply.In the continuation of the
>>> previous
>>> >>>>>>> email, below is the block diagram where I am reading the file
>>> from
>>> >> s3 and
>>> >>>>>>> pushing it to kafka.Now with the current setup, I have total 4
>>> >> directory
>>> >>>>>>> based on the readfile method  from flink environment ,we are
>>> >> creating 4
>>> >>>>>>> readers parallely to process the data from s3 .
>>> >>>>>>>
>>> >>>>>>> Below are my Questions:
>>> >>>>>>> 1. Can we restrict the no. of readers to process the  data
>>> parallely.
>>> >>>>>>> e.g let's say if  we have a thousand of directory , in that case
>>> i
>>> >> want to
>>> >>>>>>> restrict the no. of readers to 10 and ten parallel threads will
>>> >> continue
>>> >>>>>>> with 100 sequential reading of the directory per thread to
>>> consume
>>> >> the data
>>> >>>>>>> .
>>> >>>>>>>
>>> >>>>>>> 2.In between the two flink operators i.e s3 reader and kafka
>>> sink , i
>>> >>>>>>> just want to implement one more operator in order to transform
>>> the
>>> >> data
>>> >>>>>>> which i am reading from s3 bucket and then want to push into the
>>> >> kafka
>>> >>>>>>> sink. Below is my working code.Here i am finding  difficulties to
>>> >>>>>>> implement  map operator in order to transform the union of
>>> >> datastreams  by
>>> >>>>>>> applying union method over each directory's reader before
>>> pushing to
>>> >> kafka.
>>> >>>>>>> List<String> s3PathList =
>>> >> S3Service.getListOfS3Paths(finalParameters);
>>> >>>>>>> s3PathList.stream()
>>> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
>>> directory,
>>> >>>>>>> readerParallelism))
>>> >>>>>>> .reduce(DataStream::union)
>>> >>>>>>> .map(joinedStream ->
>>> >> joinedStream.addSink(kafkaProducer).name("Publish
>>> >>>>>>> to " + kafkaTopicName));
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> *Something like this I'm trying to do in order to achieve the
>>> above
>>> >> use
>>> >>>>>>> case by applying FlatMap, it could be map as well:*
>>> >>>>>>> s3PathList.stream()
>>> >>>>>>> .map(directory -> S3Service.customInputStream(environment,
>>> directory,
>>> >>>>>>> readerParallelism))
>>> >>>>>>> .reduce(DataStream::union).flatMap(new
>>> FlatMapFunction<DataStream,
>>> >>>>>>> String>() {
>>> >>>>>>>      @Override
>>> >>>>>>>      public void flatMap(String value, Collector<String> out)
>>> throws
>>> >>>>>>> Exception {
>>> >>>>>>>       FinalJsonMessage m=objectMapper.readValue(value,
>>> >>>>>>> FinalJsonMessage.class);
>>> >>>>>>>       System.out.println("Json string:: ------"+m);
>>> >>>>>>>        //transformation logic
>>> >>>>>>>        out.collect(value);
>>> >>>>>>>      }
>>> >>>>>>>    })
>>> >>>>>>> .map(joinedStream ->
>>> >> joinedStream.addSink(kafkaProducer).name("Publish
>>> >>>>>>> to " + kafkaTopicName));
>>> >>>>>>> [image: image.png]
>>> >>>>>>> Request your support on the same.
>>> >>>>>>> Regards,
>>> >>>>>>> Satya
>>> >>>>>>>
>>> >>>>>>> On Mon, Oct 5, 2020 at 12:16 PM Satyaa Dixit <
>>> [hidden email]>
>>> >>>>>>> wrote:
>>> >>>>>>>
>>> >>>>>>>> Hi @[hidden email] <[hidden email]> ,
>>> >>>>>>>>
>>> >>>>>>>> Thanks for your support, it was really helpful.
>>> >>>>>>>> Do you know the list of directories when you submit the job?
>>> [Yes we
>>> >>>>>>>> do have]
>>> >>>>>>>> The impletemation is progress and will get back to you if any
>>> >> further
>>> >>>>>>>> challenges we may face.
>>> >>>>>>>> Appreciate your support in this regard.
>>> >>>>>>>>
>>> >>>>>>>> Regards,
>>> >>>>>>>> Satya
>>> >>>>>>>>
>>> >>>>>>>> On Thu, Oct 1, 2020 at 7:46 PM Satyaa Dixit <
>>> [hidden email]>
>>> >>>>>>>> wrote:
>>> >>>>>>>>
>>> >>>>>>>>> Thank you @Chesnay let me try this change .
>>> >>>>>>>>>
>>> >>>>>>>>> On Thu, Oct 1, 2020 at 1:21 PM Chesnay Schepler <
>>> >> [hidden email]>
>>> >>>>>>>>> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>>> You could also try using streams to make it a little more
>>> concise:
>>> >>>>>>>>>>
>>> >>>>>>>>>> directories.stream()
>>> >>>>>>>>>>      .map(directory -> createInputStream(environment,
>>> directory))
>>> >>>>>>>>>>      .reduce(DataStream::union)
>>> >>>>>>>>>>      .map(joinedStream -> joinedStream.addSink(kafka));
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> On 10/1/2020 9:48 AM, Chesnay Schepler wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> Do you know the list of directories when you submit the job?
>>> >>>>>>>>>>
>>> >>>>>>>>>> If so, then you can iterate over them, create a source for
>>> each
>>> >>>>>>>>>> directory, union them, and apply the sink to the union.
>>> >>>>>>>>>>
>>> >>>>>>>>>> private static
>>> >>>>>>>>>> DataStream<String>createInputStream(StreamExecutionEnvironment
>>> >> environment,
>>> >>>>>>>>>> String directory) {
>>> >>>>>>>>>>      TextInputFormat format =new TextInputFormat(new
>>> >>>>>>>>>> org.apache.flink.core.fs.Path(directory));
>>> >>>>>>>>>> format.setNestedFileEnumeration(true); return
>>> >> environment.readFile(format,
>>> >>>>>>>>>> directory, FileProcessingMode.PROCESS_ONCE, -1,
>>> >>>>>>>>>> FilePathFilter.createDefaultFilter()); }
>>> >>>>>>>>>>
>>> >>>>>>>>>> public static void runJob()throws Exception {
>>> >>>>>>>>>>      StreamExecutionEnvironment environment =
>>> >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> List<String>
>>> >>>>>>>>>> directories =getDirectories(); DataStream<String>
>>> joinedStreams
>>> >> =null; for
>>> >>>>>>>>>> (String directory : directories) {
>>> >>>>>>>>>>         DataStream<String> inputStream
>>> >> =createInputStream(environment,
>>> >>>>>>>>>> directory); if (joinedStreams ==null) {
>>> >>>>>>>>>>            joinedStreams = inputStream; }else {
>>> >>>>>>>>>>            joinedStreams.union(inputStream); }
>>> >>>>>>>>>>      }
>>> >>>>>>>>>>      // add a sanity check that there was at least 1 directory
>>> >>>>>>>>>>
>>> >>>>>>>>>>      joinedStreams.addSink(kafka); }
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> On 10/1/2020 9:08 AM, Satyaa Dixit wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> Hi Guys,
>>> >>>>>>>>>>
>>> >>>>>>>>>> Got stuck with it please help me here
>>> >>>>>>>>>> Regards,
>>> >>>>>>>>>> Satya
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Wed, 30 Sep 2020 at 11:09 AM, Satyaa Dixit
>>> >>>>>>>>>> <[hidden email]> <[hidden email]> wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> Hi Guys,
>>> >>>>>>>>>>
>>> >>>>>>>>>> Sorry to bother you again, but someone could help me here? Any
>>> >> help
>>> >>>>>>>>>> in
>>> >>>>>>>>>> this regard will be much appreciated.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Regards,
>>> >>>>>>>>>> Satya
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Tue, Sep 29, 2020 at 2:57 PM Satyaa Dixit <
>>> >> [hidden email]>
>>> >>>>>>>>>> <[hidden email]>
>>> >>>>>>>>>> wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> Hi Guys,
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> I need one help, any leads will be highly appreciated.I have
>>> >> written
>>> >>>>>>>>>> a
>>> >>>>>>>>>> flink streaming job to read the data from s3 bucket and push
>>> to
>>> >>>>>>>>>> kafka.
>>> >>>>>>>>>> Below is the working source that deal with single s3 path:
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> TextInputFormat format = new TextInputFormat(new
>>> >>>>>>>>>> org.apache.flink.core.fs.Path("s3a://directory/2020-09-03/"));
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> format.setNestedFileEnumeration(true);
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> DataStream<String> inputStream = environment.readFile(format,
>>> >>>>>>>>>> "s3a://directory/2020-09-03/",
>>> FileProcessingMode.PROCESS_ONCE,
>>> >> -1,
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> FilePathFilter.createDefaultFilter());
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> inputStream.addSink(kafka);
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> But my requirement is get the list of paths and pass them one
>>> by
>>> >> one
>>> >>>>>>>>>> to
>>> >>>>>>>>>> this environment.readFile() method.How we can achieve this.
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> Thanks,
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> Satya
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> --
>>> >>>>>>>>>> --------------------------
>>> >>>>>>>>>> Best Regards
>>> >>>>>>>>>> Satya Prakash
>>> >>>>>>>>>> (M)+91-9845111913
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> --
>>> >>>>>>>>>>
>>> >>>>>>>>>> --------------------------
>>> >>>>>>>>>> Best Regards
>>> >>>>>>>>>> Satya Prakash
>>> >>>>>>>>>> (M)+91-9845111913
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>> --
>>> >>>>>>>>> --------------------------
>>> >>>>>>>>> Best Regards
>>> >>>>>>>>> Satya Prakash
>>> >>>>>>>>> (M)+91-9845111913
>>> >>>>>>>>>
>>> >>>>>>>> --
>>> >>>>>>>> --------------------------
>>> >>>>>>>> Best Regards
>>> >>>>>>>> Satya Prakash
>>> >>>>>>>> (M)+91-9845111913
>>> >>>>>>>>
>>> >>>>>>> --
>>> >>>>>>> --------------------------
>>> >>>>>>> Best Regards
>>> >>>>>>> Satya Prakash
>>> >>>>>>> (M)+91-9845111913
>>> >>>>>>>
>>> >>>>>> --
>>> >>>>>> --------------------------
>>> >>>>>> Best Regards
>>> >>>>>> Satya Prakash
>>> >>>>>> (M)+91-9845111913
>>> >>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>> --
>>> >>>>> --------------------------
>>> >>>>> Best Regards
>>> >>>>> Satya Prakash
>>> >>>>> (M)+91-9845111913
>>> >>>>>
>>> >>>> --
>>> >>>> --------------------------
>>> >>>> Best Regards
>>> >>>> Satya Prakash
>>> >>>> (M)+91-9845111913
>>> >>>>
>>> >>
>>>
>>>
>>
>> --
>> --------------------------
>> Best Regards
>> Satya Prakash
>> (M)+91-9845111913
>>
>
>
> --
> --------------------------
> Best Regards
> Satya Prakash
> (M)+91-9845111913
>
>
>

--
--------------------------
Best Regards
Satya Prakash
(M)+91-9845111913