How to use static data with streams?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use static data with streams?

Kashmar, Ali
Hi there,

I’m trying to design and implement a use case in Flink where I’m receiving protocol packets over a socket. Each packet has the subscriber IMSI in it and a bunch of more data. At the same time, I have a csv file with a mapping from IMSI -> subscriber group. I need to inject the group into packet and then send it to the sink.

I’ve tried loading the CSV into a memory map and then accessing the map from within the Flink operators but that only works when the CSV is very small (a few hundred subscribers). I’ve tried creating another stream for the CSV and connecting the streams but that doesn’t yield anything as I can’t have access to objects from both streams at the same time.

How would you guys approach this?

Thanks,
Ali
Reply | Threaded
Open this post in threaded view
|

Re: How to use static data with streams?

Robert Metzger
Hi Ali,

I'm excited to hear that EMC is looking into Apache Flink. I think the
solution to this problem depends on one question: What is the size of the
data in the CSV file compared to the memory you have available in the
cluster?
Would the mapping table from the file fit into the memory of all nodes
running Flink?

Regards,
Robert

PS: Did you subscribe to the mailing list? I've CCed you in case you're not
subscribed yet

On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <[hidden email]> wrote:

> Hi there,
>
> I’m trying to design and implement a use case in Flink where I’m receiving
> protocol packets over a socket. Each packet has the subscriber IMSI in it
> and a bunch of more data. At the same time, I have a csv file with a
> mapping from IMSI -> subscriber group. I need to inject the group into
> packet and then send it to the sink.
>
> I’ve tried loading the CSV into a memory map and then accessing the map
> from within the Flink operators but that only works when the CSV is very
> small (a few hundred subscribers). I’ve tried creating another stream for
> the CSV and connecting the streams but that doesn’t yield anything as I
> can’t have access to objects from both streams at the same time.
>
> How would you guys approach this?
>
> Thanks,
> Ali
>
Reply | Threaded
Open this post in threaded view
|

Re: How to use static data with streams?

Kashmar, Ali
Hi Robert,

The CSV file (or files as there will definitely be more than one) can be
large (let¹s say 1 GB). Memory is not an issue though. Each node has at
least 64 GB RAM mounted. The CSV files should easily fit in the memory of
each node.

Regards,
Ali



On 2015-11-05, 6:30 AM, "Robert Metzger" <[hidden email]> wrote:

>Hi Ali,
>
>I'm excited to hear that EMC is looking into Apache Flink. I think the
>solution to this problem depends on one question: What is the size of the
>data in the CSV file compared to the memory you have available in the
>cluster?
>Would the mapping table from the file fit into the memory of all nodes
>running Flink?
>
>Regards,
>Robert
>
>PS: Did you subscribe to the mailing list? I've CCed you in case you're
>not
>subscribed yet
>
>On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <[hidden email]> wrote:
>
>> Hi there,
>>
>> I¹m trying to design and implement a use case in Flink where I¹m
>>receiving
>> protocol packets over a socket. Each packet has the subscriber IMSI in
>>it
>> and a bunch of more data. At the same time, I have a csv file with a
>> mapping from IMSI -> subscriber group. I need to inject the group into
>> packet and then send it to the sink.
>>
>> I¹ve tried loading the CSV into a memory map and then accessing the map
>> from within the Flink operators but that only works when the CSV is very
>> small (a few hundred subscribers). I¹ve tried creating another stream
>>for
>> the CSV and connecting the streams but that doesn¹t yield anything as I
>> can¹t have access to objects from both streams at the same time.
>>
>> How would you guys approach this?
>>
>> Thanks,
>> Ali
>>

Reply | Threaded
Open this post in threaded view
|

Re: How to use static data with streams?

Robert Metzger
Okay.

you should be able to implement it as you described initially. I would do
the transformation in a map() operator of Flink. The RichMapFunction
provides you with an open() method which is called before the first record
arrives.
In the open() method, I would read the csv file(s) from HDFS or another
file system accessible by all nodes.

Then, you can access the data from the files in the map operator.

In order to utilize the memory best, I would recommend to start Flink in
the "streaming" mode. (-st argument on YARN). With that enabled, we provide
more memory to streaming operators.
Also, I would only expose one processing slot per TaskManager, this way we
ensure that the files are only read once per TaskManager. (make sure you
have only one TaskManager per machine).

Why did your previous approach fail? Do you still have the error message?

Regards,
Robert

On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <[hidden email]> wrote:

> Hi Robert,
>
> The CSV file (or files as there will definitely be more than one) can be
> large (let¹s say 1 GB). Memory is not an issue though. Each node has at
> least 64 GB RAM mounted. The CSV files should easily fit in the memory of
> each node.
>
> Regards,
> Ali
>
>
>
> On 2015-11-05, 6:30 AM, "Robert Metzger" <[hidden email]> wrote:
>
> >Hi Ali,
> >
> >I'm excited to hear that EMC is looking into Apache Flink. I think the
> >solution to this problem depends on one question: What is the size of the
> >data in the CSV file compared to the memory you have available in the
> >cluster?
> >Would the mapping table from the file fit into the memory of all nodes
> >running Flink?
> >
> >Regards,
> >Robert
> >
> >PS: Did you subscribe to the mailing list? I've CCed you in case you're
> >not
> >subscribed yet
> >
> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <[hidden email]> wrote:
> >
> >> Hi there,
> >>
> >> I¹m trying to design and implement a use case in Flink where I¹m
> >>receiving
> >> protocol packets over a socket. Each packet has the subscriber IMSI in
> >>it
> >> and a bunch of more data. At the same time, I have a csv file with a
> >> mapping from IMSI -> subscriber group. I need to inject the group into
> >> packet and then send it to the sink.
> >>
> >> I¹ve tried loading the CSV into a memory map and then accessing the map
> >> from within the Flink operators but that only works when the CSV is very
> >> small (a few hundred subscribers). I¹ve tried creating another stream
> >>for
> >> the CSV and connecting the streams but that doesn¹t yield anything as I
> >> can¹t have access to objects from both streams at the same time.
> >>
> >> How would you guys approach this?
> >>
> >> Thanks,
> >> Ali
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to use static data with streams?

Kashmar, Ali
I did not load the CSV file using the approach you suggested. I was
loading it outside the operators (at the beginning of the main method of
my class), since the file will be needed by multiple operators for sure.
When the file was small, I saw the job registered and started, but when I
used a big CSV file, the job never got registered with the task manager (I
tried the ‘list' command and got nothing).

Here’s what I saw with the small(ish) file:

# flink run analytics-flink.jar 19001 minisubs.csv output.csv
loaded 200000 subscribers from csv file
11/02/2015 16:36:59 Job execution switched to status RUNNING.
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
Sink(1/1) switched to SCHEDULED
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
Sink(1/1) switched to DEPLOYING
11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
Sink(1/1) switched to RUNNING


And here’s what I saw with the big file:

# flink run analytics-flink.jar 19001 subs.csv output.csv
loaded 1173547 subscribers from csv file


I’m already using the streaming mode. I’m running a single Flink node
right now on Centos 7 using the ‘start-local-streaming.sh’ script.

Thanks,
Ali

On 2015-11-05, 10:22 AM, "Robert Metzger" <[hidden email]> wrote:

>Okay.
>
>you should be able to implement it as you described initially. I would do
>the transformation in a map() operator of Flink. The RichMapFunction
>provides you with an open() method which is called before the first record
>arrives.
>In the open() method, I would read the csv file(s) from HDFS or another
>file system accessible by all nodes.
>
>Then, you can access the data from the files in the map operator.
>
>In order to utilize the memory best, I would recommend to start Flink in
>the "streaming" mode. (-st argument on YARN). With that enabled, we
>provide
>more memory to streaming operators.
>Also, I would only expose one processing slot per TaskManager, this way we
>ensure that the files are only read once per TaskManager. (make sure you
>have only one TaskManager per machine).
>
>Why did your previous approach fail? Do you still have the error message?
>
>Regards,
>Robert
>
>On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <[hidden email]> wrote:
>
>> Hi Robert,
>>
>> The CSV file (or files as there will definitely be more than one) can be
>> large (let¹s say 1 GB). Memory is not an issue though. Each node has at
>> least 64 GB RAM mounted. The CSV files should easily fit in the memory
>>of
>> each node.
>>
>> Regards,
>> Ali
>>
>>
>>
>> On 2015-11-05, 6:30 AM, "Robert Metzger" <[hidden email]> wrote:
>>
>> >Hi Ali,
>> >
>> >I'm excited to hear that EMC is looking into Apache Flink. I think the
>> >solution to this problem depends on one question: What is the size of
>>the
>> >data in the CSV file compared to the memory you have available in the
>> >cluster?
>> >Would the mapping table from the file fit into the memory of all nodes
>> >running Flink?
>> >
>> >Regards,
>> >Robert
>> >
>> >PS: Did you subscribe to the mailing list? I've CCed you in case you're
>> >not
>> >subscribed yet
>> >
>> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <[hidden email]>
>>wrote:
>> >
>> >> Hi there,
>> >>
>> >> I¹m trying to design and implement a use case in Flink where I¹m
>> >>receiving
>> >> protocol packets over a socket. Each packet has the subscriber IMSI
>>in
>> >>it
>> >> and a bunch of more data. At the same time, I have a csv file with a
>> >> mapping from IMSI -> subscriber group. I need to inject the group
>>into
>> >> packet and then send it to the sink.
>> >>
>> >> I¹ve tried loading the CSV into a memory map and then accessing the
>>map
>> >> from within the Flink operators but that only works when the CSV is
>>very
>> >> small (a few hundred subscribers). I¹ve tried creating another stream
>> >>for
>> >> the CSV and connecting the streams but that doesn¹t yield anything
>>as I
>> >> can¹t have access to objects from both streams at the same time.
>> >>
>> >> How would you guys approach this?
>> >>
>> >> Thanks,
>> >> Ali
>> >>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: How to use static data with streams?

Robert Metzger
Hi Ali,

great, the start-local-streaming.sh script sounds right.

I can explain why your first approach didn't work:

You were trying to send the CSV files from the Flink client to the cluster
using our RPC system (Akka). When you submit a job to Flink, we serialize
all the objects the user created (mappers, sources, ...) and send it to the
cluster.
There is a method StreamExecutionEnvironment.fromElements(..) which allows
users to serialize a few objects along with the job submission. But the
amount of data you can transfer like this is limited by the Akka frame
size. In our case I think the default is 10 megabytes.
After that, Akka will probably just drop or reject the deployment message.

I'm pretty sure the approach I've suggested will resolve the issue.

Please let me know if you need further assistance.

Regards,
Robert



On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <[hidden email]> wrote:

> I did not load the CSV file using the approach you suggested. I was
> loading it outside the operators (at the beginning of the main method of
> my class), since the file will be needed by multiple operators for sure.
> When the file was small, I saw the job registered and started, but when I
> used a big CSV file, the job never got registered with the task manager (I
> tried the ‘list' command and got nothing).
>
> Here’s what I saw with the small(ish) file:
>
> # flink run analytics-flink.jar 19001 minisubs.csv output.csv
> loaded 200000 subscribers from csv file
> 11/02/2015 16:36:59 Job execution switched to status RUNNING.
> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
> Sink(1/1) switched to SCHEDULED
> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
> Sink(1/1) switched to DEPLOYING
> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
> Sink(1/1) switched to RUNNING
>
>
> And here’s what I saw with the big file:
>
> # flink run analytics-flink.jar 19001 subs.csv output.csv
> loaded 1173547 subscribers from csv file
>
>
> I’m already using the streaming mode. I’m running a single Flink node
> right now on Centos 7 using the ‘start-local-streaming.sh’ script.
>
> Thanks,
> Ali
>
> On 2015-11-05, 10:22 AM, "Robert Metzger" <[hidden email]> wrote:
>
> >Okay.
> >
> >you should be able to implement it as you described initially. I would do
> >the transformation in a map() operator of Flink. The RichMapFunction
> >provides you with an open() method which is called before the first record
> >arrives.
> >In the open() method, I would read the csv file(s) from HDFS or another
> >file system accessible by all nodes.
> >
> >Then, you can access the data from the files in the map operator.
> >
> >In order to utilize the memory best, I would recommend to start Flink in
> >the "streaming" mode. (-st argument on YARN). With that enabled, we
> >provide
> >more memory to streaming operators.
> >Also, I would only expose one processing slot per TaskManager, this way we
> >ensure that the files are only read once per TaskManager. (make sure you
> >have only one TaskManager per machine).
> >
> >Why did your previous approach fail? Do you still have the error message?
> >
> >Regards,
> >Robert
> >
> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <[hidden email]> wrote:
> >
> >> Hi Robert,
> >>
> >> The CSV file (or files as there will definitely be more than one) can be
> >> large (let¹s say 1 GB). Memory is not an issue though. Each node has at
> >> least 64 GB RAM mounted. The CSV files should easily fit in the memory
> >>of
> >> each node.
> >>
> >> Regards,
> >> Ali
> >>
> >>
> >>
> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <[hidden email]> wrote:
> >>
> >> >Hi Ali,
> >> >
> >> >I'm excited to hear that EMC is looking into Apache Flink. I think the
> >> >solution to this problem depends on one question: What is the size of
> >>the
> >> >data in the CSV file compared to the memory you have available in the
> >> >cluster?
> >> >Would the mapping table from the file fit into the memory of all nodes
> >> >running Flink?
> >> >
> >> >Regards,
> >> >Robert
> >> >
> >> >PS: Did you subscribe to the mailing list? I've CCed you in case you're
> >> >not
> >> >subscribed yet
> >> >
> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <[hidden email]>
> >>wrote:
> >> >
> >> >> Hi there,
> >> >>
> >> >> I¹m trying to design and implement a use case in Flink where I¹m
> >> >>receiving
> >> >> protocol packets over a socket. Each packet has the subscriber IMSI
> >>in
> >> >>it
> >> >> and a bunch of more data. At the same time, I have a csv file with a
> >> >> mapping from IMSI -> subscriber group. I need to inject the group
> >>into
> >> >> packet and then send it to the sink.
> >> >>
> >> >> I¹ve tried loading the CSV into a memory map and then accessing the
> >>map
> >> >> from within the Flink operators but that only works when the CSV is
> >>very
> >> >> small (a few hundred subscribers). I¹ve tried creating another stream
> >> >>for
> >> >> the CSV and connecting the streams but that doesn¹t yield anything
> >>as I
> >> >> can¹t have access to objects from both streams at the same time.
> >> >>
> >> >> How would you guys approach this?
> >> >>
> >> >> Thanks,
> >> >> Ali
> >> >>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to use static data with streams?

Kashmar, Ali
Hi Robert,

I tried the approach you suggested and it works nicely. Thanks!

I have a few more questions if you don’t mind:

1. Is there a way to retrieve in one stream data that's stored in another
stream? I have a location stream that I can use to store the latest
subscriber location. I have another stream that needs access to the latest
subscriber location processed by the location stream. I read a bit on
broadcast variables but they’re only available for DataSets, not
DataStreams. Did I miss a way in Flink to do this?

2. We are planning to test this on a Flink cluster of 3 nodes (1 master
and 2 slaves).
   
   a. If I use a socket stream, does each node listen for data on its
socket or is it only the job manager node? I assume it’s the latter. This
is important because I have to figure out how to make the system highly
available.
   b. Is there a way to split the afore-mentioned CSV file across the
three nodes in the cluster?

Sorry for bombarding you with questions.

Thanks,
Ali


On 2015-11-05, 10:47 AM, "Robert Metzger" <[hidden email]> wrote:

>Hi Ali,
>
>great, the start-local-streaming.sh script sounds right.
>
>I can explain why your first approach didn't work:
>
>You were trying to send the CSV files from the Flink client to the cluster
>using our RPC system (Akka). When you submit a job to Flink, we serialize
>all the objects the user created (mappers, sources, ...) and send it to
>the
>cluster.
>There is a method StreamExecutionEnvironment.fromElements(..) which allows
>users to serialize a few objects along with the job submission. But the
>amount of data you can transfer like this is limited by the Akka frame
>size. In our case I think the default is 10 megabytes.
>After that, Akka will probably just drop or reject the deployment message.
>
>I'm pretty sure the approach I've suggested will resolve the issue.
>
>Please let me know if you need further assistance.
>
>Regards,
>Robert
>
>
>
>On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <[hidden email]> wrote:
>
>> I did not load the CSV file using the approach you suggested. I was
>> loading it outside the operators (at the beginning of the main method of
>> my class), since the file will be needed by multiple operators for sure.
>> When the file was small, I saw the job registered and started, but when
>>I
>> used a big CSV file, the job never got registered with the task manager
>>(I
>> tried the ‘list' command and got nothing).
>>
>> Here’s what I saw with the small(ish) file:
>>
>> # flink run analytics-flink.jar 19001 minisubs.csv output.csv
>> loaded 200000 subscribers from csv file
>> 11/02/2015 16:36:59 Job execution switched to status RUNNING.
>> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
>> Sink(1/1) switched to SCHEDULED
>> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
>> Sink(1/1) switched to DEPLOYING
>> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
>> Sink(1/1) switched to RUNNING
>>
>>
>> And here’s what I saw with the big file:
>>
>> # flink run analytics-flink.jar 19001 subs.csv output.csv
>> loaded 1173547 subscribers from csv file
>>
>>
>> I’m already using the streaming mode. I’m running a single Flink node
>> right now on Centos 7 using the ‘start-local-streaming.sh’ script.
>>
>> Thanks,
>> Ali
>>
>> On 2015-11-05, 10:22 AM, "Robert Metzger" <[hidden email]> wrote:
>>
>> >Okay.
>> >
>> >you should be able to implement it as you described initially. I would
>>do
>> >the transformation in a map() operator of Flink. The RichMapFunction
>> >provides you with an open() method which is called before the first
>>record
>> >arrives.
>> >In the open() method, I would read the csv file(s) from HDFS or another
>> >file system accessible by all nodes.
>> >
>> >Then, you can access the data from the files in the map operator.
>> >
>> >In order to utilize the memory best, I would recommend to start Flink
>>in
>> >the "streaming" mode. (-st argument on YARN). With that enabled, we
>> >provide
>> >more memory to streaming operators.
>> >Also, I would only expose one processing slot per TaskManager, this
>>way we
>> >ensure that the files are only read once per TaskManager. (make sure
>>you
>> >have only one TaskManager per machine).
>> >
>> >Why did your previous approach fail? Do you still have the error
>>message?
>> >
>> >Regards,
>> >Robert
>> >
>> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <[hidden email]>
>>wrote:
>> >
>> >> Hi Robert,
>> >>
>> >> The CSV file (or files as there will definitely be more than one)
>>can be
>> >> large (let¹s say 1 GB). Memory is not an issue though. Each node has
>>at
>> >> least 64 GB RAM mounted. The CSV files should easily fit in the
>>memory
>> >>of
>> >> each node.
>> >>
>> >> Regards,
>> >> Ali
>> >>
>> >>
>> >>
>> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <[hidden email]> wrote:
>> >>
>> >> >Hi Ali,
>> >> >
>> >> >I'm excited to hear that EMC is looking into Apache Flink. I think
>>the
>> >> >solution to this problem depends on one question: What is the size
>>of
>> >>the
>> >> >data in the CSV file compared to the memory you have available in
>>the
>> >> >cluster?
>> >> >Would the mapping table from the file fit into the memory of all
>>nodes
>> >> >running Flink?
>> >> >
>> >> >Regards,
>> >> >Robert
>> >> >
>> >> >PS: Did you subscribe to the mailing list? I've CCed you in case
>>you're
>> >> >not
>> >> >subscribed yet
>> >> >
>> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <[hidden email]>
>> >>wrote:
>> >> >
>> >> >> Hi there,
>> >> >>
>> >> >> I¹m trying to design and implement a use case in Flink where I¹m
>> >> >>receiving
>> >> >> protocol packets over a socket. Each packet has the subscriber
>>IMSI
>> >>in
>> >> >>it
>> >> >> and a bunch of more data. At the same time, I have a csv file
>>with a
>> >> >> mapping from IMSI -> subscriber group. I need to inject the group
>> >>into
>> >> >> packet and then send it to the sink.
>> >> >>
>> >> >> I¹ve tried loading the CSV into a memory map and then accessing
>>the
>> >>map
>> >> >> from within the Flink operators but that only works when the CSV
>>is
>> >>very
>> >> >> small (a few hundred subscribers). I¹ve tried creating another
>>stream
>> >> >>for
>> >> >> the CSV and connecting the streams but that doesn¹t yield anything
>> >>as I
>> >> >> can¹t have access to objects from both streams at the same time.
>> >> >>
>> >> >> How would you guys approach this?
>> >> >>
>> >> >> Thanks,
>> >> >> Ali
>> >> >>
>> >>
>> >>
>>

Reply | Threaded
Open this post in threaded view
|

Re: How to use static data with streams?

Robert Metzger
Hi Ali,

1. You can connect two streams and then use the co-map operator to consume
data from both streams. I'm not sure how much data arrives from one or the
other stream, but maybe you can store (update) the data in memory.
Read more here
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#datastream-abstraction

2 a) No, I think all the taskmanager nodes are listening to data. For
making this highly available, I would recommend to let the system which is
producing the data write it to Apache Kafka. Then, consume the data from
Kafka using Flink.
This way you get very good high availability and througput and you don't
have to worry about the sockets.

2 b) Sure, you can implement the splitting yourself (each mapper reads N
lines of the file) and then partition (split) the data stream so that the
right protocol packets end up at the right machine.
However, if the entire CSV file fits into the entire memory of one machine,
its probably faster to not split the stream and use each machine to join
the data locally.

Its really no problem that you're asking questions, that's what the mailing
list is made for.
I'm looking forward to the next set of questions ;)

Regards,
Robert



On Thu, Nov 5, 2015 at 9:56 PM, Kashmar, Ali <[hidden email]> wrote:

> Hi Robert,
>
> I tried the approach you suggested and it works nicely. Thanks!
>
> I have a few more questions if you don’t mind:
>
> 1. Is there a way to retrieve in one stream data that's stored in another
> stream? I have a location stream that I can use to store the latest
> subscriber location. I have another stream that needs access to the latest
> subscriber location processed by the location stream. I read a bit on
> broadcast variables but they’re only available for DataSets, not
> DataStreams. Did I miss a way in Flink to do this?
>
> 2. We are planning to test this on a Flink cluster of 3 nodes (1 master
> and 2 slaves).
>
>    a. If I use a socket stream, does each node listen for data on its
> socket or is it only the job manager node? I assume it’s the latter. This
> is               important because I have to figure out how to make the
> system highly
> available.
>    b. Is there a way to split the afore-mentioned CSV file across the
> three nodes in the cluster?
>
> Sorry for bombarding you with questions.
>
> Thanks,
> Ali
>
>
> On 2015-11-05, 10:47 AM, "Robert Metzger" <[hidden email]> wrote:
>
> >Hi Ali,
> >
> >great, the start-local-streaming.sh script sounds right.
> >
> >I can explain why your first approach didn't work:
> >
> >You were trying to send the CSV files from the Flink client to the cluster
> >using our RPC system (Akka). When you submit a job to Flink, we serialize
> >all the objects the user created (mappers, sources, ...) and send it to
> >the
> >cluster.
> >There is a method StreamExecutionEnvironment.fromElements(..) which allows
> >users to serialize a few objects along with the job submission. But the
> >amount of data you can transfer like this is limited by the Akka frame
> >size. In our case I think the default is 10 megabytes.
> >After that, Akka will probably just drop or reject the deployment message.
> >
> >I'm pretty sure the approach I've suggested will resolve the issue.
> >
> >Please let me know if you need further assistance.
> >
> >Regards,
> >Robert
> >
> >
> >
> >On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <[hidden email]> wrote:
> >
> >> I did not load the CSV file using the approach you suggested. I was
> >> loading it outside the operators (at the beginning of the main method of
> >> my class), since the file will be needed by multiple operators for sure.
> >> When the file was small, I saw the job registered and started, but when
> >>I
> >> used a big CSV file, the job never got registered with the task manager
> >>(I
> >> tried the ‘list' command and got nothing).
> >>
> >> Here’s what I saw with the small(ish) file:
> >>
> >> # flink run analytics-flink.jar 19001 minisubs.csv output.csv
> >> loaded 200000 subscribers from csv file
> >> 11/02/2015 16:36:59 Job execution switched to status RUNNING.
> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
> >> Sink(1/1) switched to SCHEDULED
> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
> >> Sink(1/1) switched to DEPLOYING
> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map -> Stream
> >> Sink(1/1) switched to RUNNING
> >>
> >>
> >> And here’s what I saw with the big file:
> >>
> >> # flink run analytics-flink.jar 19001 subs.csv output.csv
> >> loaded 1173547 subscribers from csv file
> >>
> >>
> >> I’m already using the streaming mode. I’m running a single Flink node
> >> right now on Centos 7 using the ‘start-local-streaming.sh’ script.
> >>
> >> Thanks,
> >> Ali
> >>
> >> On 2015-11-05, 10:22 AM, "Robert Metzger" <[hidden email]> wrote:
> >>
> >> >Okay.
> >> >
> >> >you should be able to implement it as you described initially. I would
> >>do
> >> >the transformation in a map() operator of Flink. The RichMapFunction
> >> >provides you with an open() method which is called before the first
> >>record
> >> >arrives.
> >> >In the open() method, I would read the csv file(s) from HDFS or another
> >> >file system accessible by all nodes.
> >> >
> >> >Then, you can access the data from the files in the map operator.
> >> >
> >> >In order to utilize the memory best, I would recommend to start Flink
> >>in
> >> >the "streaming" mode. (-st argument on YARN). With that enabled, we
> >> >provide
> >> >more memory to streaming operators.
> >> >Also, I would only expose one processing slot per TaskManager, this
> >>way we
> >> >ensure that the files are only read once per TaskManager. (make sure
> >>you
> >> >have only one TaskManager per machine).
> >> >
> >> >Why did your previous approach fail? Do you still have the error
> >>message?
> >> >
> >> >Regards,
> >> >Robert
> >> >
> >> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <[hidden email]>
> >>wrote:
> >> >
> >> >> Hi Robert,
> >> >>
> >> >> The CSV file (or files as there will definitely be more than one)
> >>can be
> >> >> large (let¹s say 1 GB). Memory is not an issue though. Each node has
> >>at
> >> >> least 64 GB RAM mounted. The CSV files should easily fit in the
> >>memory
> >> >>of
> >> >> each node.
> >> >>
> >> >> Regards,
> >> >> Ali
> >> >>
> >> >>
> >> >>
> >> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <[hidden email]>
> wrote:
> >> >>
> >> >> >Hi Ali,
> >> >> >
> >> >> >I'm excited to hear that EMC is looking into Apache Flink. I think
> >>the
> >> >> >solution to this problem depends on one question: What is the size
> >>of
> >> >>the
> >> >> >data in the CSV file compared to the memory you have available in
> >>the
> >> >> >cluster?
> >> >> >Would the mapping table from the file fit into the memory of all
> >>nodes
> >> >> >running Flink?
> >> >> >
> >> >> >Regards,
> >> >> >Robert
> >> >> >
> >> >> >PS: Did you subscribe to the mailing list? I've CCed you in case
> >>you're
> >> >> >not
> >> >> >subscribed yet
> >> >> >
> >> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali <[hidden email]>
> >> >>wrote:
> >> >> >
> >> >> >> Hi there,
> >> >> >>
> >> >> >> I¹m trying to design and implement a use case in Flink where I¹m
> >> >> >>receiving
> >> >> >> protocol packets over a socket. Each packet has the subscriber
> >>IMSI
> >> >>in
> >> >> >>it
> >> >> >> and a bunch of more data. At the same time, I have a csv file
> >>with a
> >> >> >> mapping from IMSI -> subscriber group. I need to inject the group
> >> >>into
> >> >> >> packet and then send it to the sink.
> >> >> >>
> >> >> >> I¹ve tried loading the CSV into a memory map and then accessing
> >>the
> >> >>map
> >> >> >> from within the Flink operators but that only works when the CSV
> >>is
> >> >>very
> >> >> >> small (a few hundred subscribers). I¹ve tried creating another
> >>stream
> >> >> >>for
> >> >> >> the CSV and connecting the streams but that doesn¹t yield anything
> >> >>as I
> >> >> >> can¹t have access to objects from both streams at the same time.
> >> >> >>
> >> >> >> How would you guys approach this?
> >> >> >>
> >> >> >> Thanks,
> >> >> >> Ali
> >> >> >>
> >> >>
> >> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to use static data with streams?

Kashmar, Ali
Hi Robert,

Thanks for the help! I’ve managed to implement my use case using your
suggested approach of combining the streams.

Just a follow up on 2b) below, I’m not clear on this statement "partition
(split) the data stream so that the right protocol packets end up at the
right machine”. How do I know which machine the data is ending up at? My
understanding is that the Flink program is agnostic of the cluster nodes.

Maybe it would help if I explained this use case:
1. Load a CSV file and split it equally, using the ID in the CSV record,
across the Flink cluster to be stored in memory (operator’s memory
maybe?). This is basically an initialization step.
2. Once 1) is done, read events from a socket (for now) and use the ID in
the event to add attributes from the matching CSV record to the event.
Store the updated events in a file.

Based on those two requirements, what can be accomplished using Flink and
what can’t be? Is the stuff that can’t be done in Flink’s roadmap?

Thanks,
Ali


On 2015-11-05, 5:29 PM, "Robert Metzger" <[hidden email]> wrote:

>Hi Ali,
>
>1. You can connect two streams and then use the co-map operator to consume
>data from both streams. I'm not sure how much data arrives from one or the
>other stream, but maybe you can store (update) the data in memory.
>Read more here
>https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guid
>e.html#datastream-abstraction
>
>2 a) No, I think all the taskmanager nodes are listening to data. For
>making this highly available, I would recommend to let the system which is
>producing the data write it to Apache Kafka. Then, consume the data from
>Kafka using Flink.
>This way you get very good high availability and througput and you don't
>have to worry about the sockets.
>
>2 b) Sure, you can implement the splitting yourself (each mapper reads N
>lines of the file) and then partition (split) the data stream so that the
>right protocol packets end up at the right machine.
>However, if the entire CSV file fits into the entire memory of one
>machine,
>its probably faster to not split the stream and use each machine to join
>the data locally.
>
>Its really no problem that you're asking questions, that's what the
>mailing
>list is made for.
>I'm looking forward to the next set of questions ;)
>
>Regards,
>Robert
>
>
>
>On Thu, Nov 5, 2015 at 9:56 PM, Kashmar, Ali <[hidden email]> wrote:
>
>> Hi Robert,
>>
>> I tried the approach you suggested and it works nicely. Thanks!
>>
>> I have a few more questions if you don’t mind:
>>
>> 1. Is there a way to retrieve in one stream data that's stored in
>>another
>> stream? I have a location stream that I can use to store the latest
>> subscriber location. I have another stream that needs access to the
>>latest
>> subscriber location processed by the location stream. I read a bit on
>> broadcast variables but they’re only available for DataSets, not
>> DataStreams. Did I miss a way in Flink to do this?
>>
>> 2. We are planning to test this on a Flink cluster of 3 nodes (1 master
>> and 2 slaves).
>>
>>    a. If I use a socket stream, does each node listen for data on its
>> socket or is it only the job manager node? I assume it’s the latter.
>>This
>> is               important because I have to figure out how to make the
>> system highly
>> available.
>>    b. Is there a way to split the afore-mentioned CSV file across the
>> three nodes in the cluster?
>>
>> Sorry for bombarding you with questions.
>>
>> Thanks,
>> Ali
>>
>>
>> On 2015-11-05, 10:47 AM, "Robert Metzger" <[hidden email]> wrote:
>>
>> >Hi Ali,
>> >
>> >great, the start-local-streaming.sh script sounds right.
>> >
>> >I can explain why your first approach didn't work:
>> >
>> >You were trying to send the CSV files from the Flink client to the
>>cluster
>> >using our RPC system (Akka). When you submit a job to Flink, we
>>serialize
>> >all the objects the user created (mappers, sources, ...) and send it to
>> >the
>> >cluster.
>> >There is a method StreamExecutionEnvironment.fromElements(..) which
>>allows
>> >users to serialize a few objects along with the job submission. But the
>> >amount of data you can transfer like this is limited by the Akka frame
>> >size. In our case I think the default is 10 megabytes.
>> >After that, Akka will probably just drop or reject the deployment
>>message.
>> >
>> >I'm pretty sure the approach I've suggested will resolve the issue.
>> >
>> >Please let me know if you need further assistance.
>> >
>> >Regards,
>> >Robert
>> >
>> >
>> >
>> >On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <[hidden email]>
>>wrote:
>> >
>> >> I did not load the CSV file using the approach you suggested. I was
>> >> loading it outside the operators (at the beginning of the main
>>method of
>> >> my class), since the file will be needed by multiple operators for
>>sure.
>> >> When the file was small, I saw the job registered and started, but
>>when
>> >>I
>> >> used a big CSV file, the job never got registered with the task
>>manager
>> >>(I
>> >> tried the ‘list' command and got nothing).
>> >>
>> >> Here’s what I saw with the small(ish) file:
>> >>
>> >> # flink run analytics-flink.jar 19001 minisubs.csv output.csv
>> >> loaded 200000 subscribers from csv file
>> >> 11/02/2015 16:36:59 Job execution switched to status RUNNING.
>> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map ->
>>Stream
>> >> Sink(1/1) switched to SCHEDULED
>> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map ->
>>Stream
>> >> Sink(1/1) switched to DEPLOYING
>> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map ->
>>Stream
>> >> Sink(1/1) switched to RUNNING
>> >>
>> >>
>> >> And here’s what I saw with the big file:
>> >>
>> >> # flink run analytics-flink.jar 19001 subs.csv output.csv
>> >> loaded 1173547 subscribers from csv file
>> >>
>> >>
>> >> I’m already using the streaming mode. I’m running a single Flink node
>> >> right now on Centos 7 using the ‘start-local-streaming.sh’ script.
>> >>
>> >> Thanks,
>> >> Ali
>> >>
>> >> On 2015-11-05, 10:22 AM, "Robert Metzger" <[hidden email]>
>>wrote:
>> >>
>> >> >Okay.
>> >> >
>> >> >you should be able to implement it as you described initially. I
>>would
>> >>do
>> >> >the transformation in a map() operator of Flink. The RichMapFunction
>> >> >provides you with an open() method which is called before the first
>> >>record
>> >> >arrives.
>> >> >In the open() method, I would read the csv file(s) from HDFS or
>>another
>> >> >file system accessible by all nodes.
>> >> >
>> >> >Then, you can access the data from the files in the map operator.
>> >> >
>> >> >In order to utilize the memory best, I would recommend to start
>>Flink
>> >>in
>> >> >the "streaming" mode. (-st argument on YARN). With that enabled, we
>> >> >provide
>> >> >more memory to streaming operators.
>> >> >Also, I would only expose one processing slot per TaskManager, this
>> >>way we
>> >> >ensure that the files are only read once per TaskManager. (make sure
>> >>you
>> >> >have only one TaskManager per machine).
>> >> >
>> >> >Why did your previous approach fail? Do you still have the error
>> >>message?
>> >> >
>> >> >Regards,
>> >> >Robert
>> >> >
>> >> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <[hidden email]>
>> >>wrote:
>> >> >
>> >> >> Hi Robert,
>> >> >>
>> >> >> The CSV file (or files as there will definitely be more than one)
>> >>can be
>> >> >> large (let¹s say 1 GB). Memory is not an issue though. Each node
>>has
>> >>at
>> >> >> least 64 GB RAM mounted. The CSV files should easily fit in the
>> >>memory
>> >> >>of
>> >> >> each node.
>> >> >>
>> >> >> Regards,
>> >> >> Ali
>> >> >>
>> >> >>
>> >> >>
>> >> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <[hidden email]>
>> wrote:
>> >> >>
>> >> >> >Hi Ali,
>> >> >> >
>> >> >> >I'm excited to hear that EMC is looking into Apache Flink. I
>>think
>> >>the
>> >> >> >solution to this problem depends on one question: What is the
>>size
>> >>of
>> >> >>the
>> >> >> >data in the CSV file compared to the memory you have available in
>> >>the
>> >> >> >cluster?
>> >> >> >Would the mapping table from the file fit into the memory of all
>> >>nodes
>> >> >> >running Flink?
>> >> >> >
>> >> >> >Regards,
>> >> >> >Robert
>> >> >> >
>> >> >> >PS: Did you subscribe to the mailing list? I've CCed you in case
>> >>you're
>> >> >> >not
>> >> >> >subscribed yet
>> >> >> >
>> >> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali
>><[hidden email]>
>> >> >>wrote:
>> >> >> >
>> >> >> >> Hi there,
>> >> >> >>
>> >> >> >> I¹m trying to design and implement a use case in Flink where
>>I¹m
>> >> >> >>receiving
>> >> >> >> protocol packets over a socket. Each packet has the subscriber
>> >>IMSI
>> >> >>in
>> >> >> >>it
>> >> >> >> and a bunch of more data. At the same time, I have a csv file
>> >>with a
>> >> >> >> mapping from IMSI -> subscriber group. I need to inject the
>>group
>> >> >>into
>> >> >> >> packet and then send it to the sink.
>> >> >> >>
>> >> >> >> I¹ve tried loading the CSV into a memory map and then accessing
>> >>the
>> >> >>map
>> >> >> >> from within the Flink operators but that only works when the
>>CSV
>> >>is
>> >> >>very
>> >> >> >> small (a few hundred subscribers). I¹ve tried creating another
>> >>stream
>> >> >> >>for
>> >> >> >> the CSV and connecting the streams but that doesn¹t yield
>>anything
>> >> >>as I
>> >> >> >> can¹t have access to objects from both streams at the same
>>time.
>> >> >> >>
>> >> >> >> How would you guys approach this?
>> >> >> >>
>> >> >> >> Thanks,
>> >> >> >> Ali
>> >> >> >>
>> >> >>
>> >> >>
>> >>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: How to use static data with streams?

Robert Metzger
Hi Ali,

sorry for the delayed response.

Regarding your question:

This is a bit tricky but doable.
I assume you have CSV records with ID's 1 - 1000 and you run your stuff
with a parallelism of 2.

The Rich* variants of the user defined functions allow you to access a
runtime context. This context gives you the subtaskID of your current task
and the total number of running tasks.
With that information, you can let let subtaskID=0 handle the IDs from 1 -
499 and subtaskID=1 handle 500 - 1000
Next, you need to send the elements form your stream to the the right
subtaskID.  Therefore, you can use a custom partitioner for a data stream.
The partitioner decides for each element to which subtask ID it goes. You
can determine the subtask id based on the ID in your data.

This is a small snipped with the required code:

DataStream<SimpleEntity> stream = dataStream.partitionCustom(new
Partitioner<SimpleEntity>() {
   @Override
   public int partition(SimpleEntity simpleEntity, int i) {
      return 0; // return the appropriate ID here
   }
}, "id");
stream.flatMap(new RichFlatMapFunction<SimpleEntity, String>() {
   @Override
   public void flatMap(SimpleEntity simpleEntity, Collector<String>
collector) throws Exception {
      int howMany = getRuntimeContext().getNumberOfParallelSubtasks();
      int myId = getRuntimeContext().getIndexOfThisSubtask();
      ///
   }
});


Let me know if you more help,

Regards,
Robert

On Mon, Nov 16, 2015 at 5:44 PM, Kashmar, Ali <[hidden email]> wrote:

> Hi Robert,
>
> Thanks for the help! I’ve managed to implement my use case using your
> suggested approach of combining the streams.
>
> Just a follow up on 2b) below, I’m not clear on this statement "partition
> (split) the data stream so that the right protocol packets end up at the
> right machine”. How do I know which machine the data is ending up at? My
> understanding is that the Flink program is agnostic of the cluster nodes.
>
> Maybe it would help if I explained this use case:
> 1. Load a CSV file and split it equally, using the ID in the CSV record,
> across the Flink cluster to be stored in memory (operator’s memory
> maybe?). This is basically an initialization step.
> 2. Once 1) is done, read events from a socket (for now) and use the ID in
> the event to add attributes from the matching CSV record to the event.
> Store the updated events in a file.
>
> Based on those two requirements, what can be accomplished using Flink and
> what can’t be? Is the stuff that can’t be done in Flink’s roadmap?
>
> Thanks,
> Ali
>
>
> On 2015-11-05, 5:29 PM, "Robert Metzger" <[hidden email]> wrote:
>
> >Hi Ali,
> >
> >1. You can connect two streams and then use the co-map operator to consume
> >data from both streams. I'm not sure how much data arrives from one or the
> >other stream, but maybe you can store (update) the data in memory.
> >Read more here
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guid
> >e.html#datastream-abstraction
> >
> >2 a) No, I think all the taskmanager nodes are listening to data. For
> >making this highly available, I would recommend to let the system which is
> >producing the data write it to Apache Kafka. Then, consume the data from
> >Kafka using Flink.
> >This way you get very good high availability and througput and you don't
> >have to worry about the sockets.
> >
> >2 b) Sure, you can implement the splitting yourself (each mapper reads N
> >lines of the file) and then partition (split) the data stream so that the
> >right protocol packets end up at the right machine.
> >However, if the entire CSV file fits into the entire memory of one
> >machine,
> >its probably faster to not split the stream and use each machine to join
> >the data locally.
> >
> >Its really no problem that you're asking questions, that's what the
> >mailing
> >list is made for.
> >I'm looking forward to the next set of questions ;)
> >
> >Regards,
> >Robert
> >
> >
> >
> >On Thu, Nov 5, 2015 at 9:56 PM, Kashmar, Ali <[hidden email]> wrote:
> >
> >> Hi Robert,
> >>
> >> I tried the approach you suggested and it works nicely. Thanks!
> >>
> >> I have a few more questions if you don’t mind:
> >>
> >> 1. Is there a way to retrieve in one stream data that's stored in
> >>another
> >> stream? I have a location stream that I can use to store the latest
> >> subscriber location. I have another stream that needs access to the
> >>latest
> >> subscriber location processed by the location stream. I read a bit on
> >> broadcast variables but they’re only available for DataSets, not
> >> DataStreams. Did I miss a way in Flink to do this?
> >>
> >> 2. We are planning to test this on a Flink cluster of 3 nodes (1 master
> >> and 2 slaves).
> >>
> >>    a. If I use a socket stream, does each node listen for data on its
> >> socket or is it only the job manager node? I assume it’s the latter.
> >>This
> >> is               important because I have to figure out how to make the
> >> system highly
> >> available.
> >>    b. Is there a way to split the afore-mentioned CSV file across the
> >> three nodes in the cluster?
> >>
> >> Sorry for bombarding you with questions.
> >>
> >> Thanks,
> >> Ali
> >>
> >>
> >> On 2015-11-05, 10:47 AM, "Robert Metzger" <[hidden email]> wrote:
> >>
> >> >Hi Ali,
> >> >
> >> >great, the start-local-streaming.sh script sounds right.
> >> >
> >> >I can explain why your first approach didn't work:
> >> >
> >> >You were trying to send the CSV files from the Flink client to the
> >>cluster
> >> >using our RPC system (Akka). When you submit a job to Flink, we
> >>serialize
> >> >all the objects the user created (mappers, sources, ...) and send it to
> >> >the
> >> >cluster.
> >> >There is a method StreamExecutionEnvironment.fromElements(..) which
> >>allows
> >> >users to serialize a few objects along with the job submission. But the
> >> >amount of data you can transfer like this is limited by the Akka frame
> >> >size. In our case I think the default is 10 megabytes.
> >> >After that, Akka will probably just drop or reject the deployment
> >>message.
> >> >
> >> >I'm pretty sure the approach I've suggested will resolve the issue.
> >> >
> >> >Please let me know if you need further assistance.
> >> >
> >> >Regards,
> >> >Robert
> >> >
> >> >
> >> >
> >> >On Thu, Nov 5, 2015 at 3:39 PM, Kashmar, Ali <[hidden email]>
> >>wrote:
> >> >
> >> >> I did not load the CSV file using the approach you suggested. I was
> >> >> loading it outside the operators (at the beginning of the main
> >>method of
> >> >> my class), since the file will be needed by multiple operators for
> >>sure.
> >> >> When the file was small, I saw the job registered and started, but
> >>when
> >> >>I
> >> >> used a big CSV file, the job never got registered with the task
> >>manager
> >> >>(I
> >> >> tried the ‘list' command and got nothing).
> >> >>
> >> >> Here’s what I saw with the small(ish) file:
> >> >>
> >> >> # flink run analytics-flink.jar 19001 minisubs.csv output.csv
> >> >> loaded 200000 subscribers from csv file
> >> >> 11/02/2015 16:36:59 Job execution switched to status RUNNING.
> >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map ->
> >>Stream
> >> >> Sink(1/1) switched to SCHEDULED
> >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map ->
> >>Stream
> >> >> Sink(1/1) switched to DEPLOYING
> >> >> 11/02/2015 16:36:59 Socket Stream -> Flat Map -> Filter -> Map ->
> >>Stream
> >> >> Sink(1/1) switched to RUNNING
> >> >>
> >> >>
> >> >> And here’s what I saw with the big file:
> >> >>
> >> >> # flink run analytics-flink.jar 19001 subs.csv output.csv
> >> >> loaded 1173547 subscribers from csv file
> >> >>
> >> >>
> >> >> I’m already using the streaming mode. I’m running a single Flink node
> >> >> right now on Centos 7 using the ‘start-local-streaming.sh’ script.
> >> >>
> >> >> Thanks,
> >> >> Ali
> >> >>
> >> >> On 2015-11-05, 10:22 AM, "Robert Metzger" <[hidden email]>
> >>wrote:
> >> >>
> >> >> >Okay.
> >> >> >
> >> >> >you should be able to implement it as you described initially. I
> >>would
> >> >>do
> >> >> >the transformation in a map() operator of Flink. The RichMapFunction
> >> >> >provides you with an open() method which is called before the first
> >> >>record
> >> >> >arrives.
> >> >> >In the open() method, I would read the csv file(s) from HDFS or
> >>another
> >> >> >file system accessible by all nodes.
> >> >> >
> >> >> >Then, you can access the data from the files in the map operator.
> >> >> >
> >> >> >In order to utilize the memory best, I would recommend to start
> >>Flink
> >> >>in
> >> >> >the "streaming" mode. (-st argument on YARN). With that enabled, we
> >> >> >provide
> >> >> >more memory to streaming operators.
> >> >> >Also, I would only expose one processing slot per TaskManager, this
> >> >>way we
> >> >> >ensure that the files are only read once per TaskManager. (make sure
> >> >>you
> >> >> >have only one TaskManager per machine).
> >> >> >
> >> >> >Why did your previous approach fail? Do you still have the error
> >> >>message?
> >> >> >
> >> >> >Regards,
> >> >> >Robert
> >> >> >
> >> >> >On Thu, Nov 5, 2015 at 3:02 PM, Kashmar, Ali <[hidden email]>
> >> >>wrote:
> >> >> >
> >> >> >> Hi Robert,
> >> >> >>
> >> >> >> The CSV file (or files as there will definitely be more than one)
> >> >>can be
> >> >> >> large (let¹s say 1 GB). Memory is not an issue though. Each node
> >>has
> >> >>at
> >> >> >> least 64 GB RAM mounted. The CSV files should easily fit in the
> >> >>memory
> >> >> >>of
> >> >> >> each node.
> >> >> >>
> >> >> >> Regards,
> >> >> >> Ali
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On 2015-11-05, 6:30 AM, "Robert Metzger" <[hidden email]>
> >> wrote:
> >> >> >>
> >> >> >> >Hi Ali,
> >> >> >> >
> >> >> >> >I'm excited to hear that EMC is looking into Apache Flink. I
> >>think
> >> >>the
> >> >> >> >solution to this problem depends on one question: What is the
> >>size
> >> >>of
> >> >> >>the
> >> >> >> >data in the CSV file compared to the memory you have available in
> >> >>the
> >> >> >> >cluster?
> >> >> >> >Would the mapping table from the file fit into the memory of all
> >> >>nodes
> >> >> >> >running Flink?
> >> >> >> >
> >> >> >> >Regards,
> >> >> >> >Robert
> >> >> >> >
> >> >> >> >PS: Did you subscribe to the mailing list? I've CCed you in case
> >> >>you're
> >> >> >> >not
> >> >> >> >subscribed yet
> >> >> >> >
> >> >> >> >On Wed, Nov 4, 2015 at 4:54 PM, Kashmar, Ali
> >><[hidden email]>
> >> >> >>wrote:
> >> >> >> >
> >> >> >> >> Hi there,
> >> >> >> >>
> >> >> >> >> I¹m trying to design and implement a use case in Flink where
> >>I¹m
> >> >> >> >>receiving
> >> >> >> >> protocol packets over a socket. Each packet has the subscriber
> >> >>IMSI
> >> >> >>in
> >> >> >> >>it
> >> >> >> >> and a bunch of more data. At the same time, I have a csv file
> >> >>with a
> >> >> >> >> mapping from IMSI -> subscriber group. I need to inject the
> >>group
> >> >> >>into
> >> >> >> >> packet and then send it to the sink.
> >> >> >> >>
> >> >> >> >> I¹ve tried loading the CSV into a memory map and then accessing
> >> >>the
> >> >> >>map
> >> >> >> >> from within the Flink operators but that only works when the
> >>CSV
> >> >>is
> >> >> >>very
> >> >> >> >> small (a few hundred subscribers). I¹ve tried creating another
> >> >>stream
> >> >> >> >>for
> >> >> >> >> the CSV and connecting the streams but that doesn¹t yield
> >>anything
> >> >> >>as I
> >> >> >> >> can¹t have access to objects from both streams at the same
> >>time.
> >> >> >> >>
> >> >> >> >> How would you guys approach this?
> >> >> >> >>
> >> >> >> >> Thanks,
> >> >> >> >> Ali
> >> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>
>
>