|
Hey guys,
Me again :) So now that my wonderful job finishes, I would like to monitor it a bit (i.e. build some charts on the number of messages per vertex, compute the total amount of time elapsed per computation per vertex, etc). The main computational-intensive operation is a coGroup. There, within the iteration I count the number of "messages" sent and then I do simple: Files.append(messages, messagesTempFile, Charsets.UTF_8); The problem is that with this approach, I get a deadlock (yes!! Now that I know the code itself works I am positive that the deadlock comes from the append -this regarding my previous mail-). It is normal if you come to think of it 200 something threads are trying to write to the same file... A possible workaround is this one: public class Singleton { private static final Singleton inst= new Singleton(); private Singleton() { super(); } public synchronized void writeToFile(String str) { // Do whatever } public Singleton getInstance() { return inst; } } Singleton.getInstance().writeToFile("Hello!!"); However, I am not sure how well Flink plays with synchronised.... Is there a smarter way to do it? Thanks! Andra |
|
Why don't you use Flink dataset output functions (like writeAsText,
writeAsCsv, etc..)? Or if they are not sufficient you can implement/override your own InputFormat. From what is my experience static variables are evil in distributed environments.. Moreover, one of the main strengths of Flink are its input/output APIs so I would avoid to write to a file in that way. Of course, dataset.append() will be a very convenient API to add (IMHO). Best, Flavio On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <[hidden email]> wrote: > Hey guys, > > Me again :) So now that my wonderful job finishes, I would like to monitor > it a bit (i.e. build some charts on the number of messages per vertex, > compute the total amount of time elapsed per computation per vertex, etc). > > The main computational-intensive operation is a coGroup. There, within the > iteration I count the number of "messages" sent and then I do simple: > > Files.append(messages, messagesTempFile, Charsets.UTF_8); > > The problem is that with this approach, I get a deadlock (yes!! Now that I > know the code itself works I am positive that the deadlock comes from the > append -this regarding my previous mail-). It is normal if you come to > think of it 200 something threads are trying to write to the same file... > > A possible workaround is this one: > > public class Singleton { > private static final Singleton inst= new Singleton(); > > private Singleton() { > super(); > } > > public synchronized void writeToFile(String str) { > // Do whatever > } > > public Singleton getInstance() { > return inst; > } > } > > Singleton.getInstance().writeToFile("Hello!!"); > > However, I am not sure how well Flink plays with synchronised.... > > Is there a smarter way to do it? > > Thanks! > > Andra > |
|
You can measure the time of each iteration in the open() methods operators
within an iteration. open() will be called before each iteration. The times can be collected by either printing to std out (you need to collect the files then...) or by implementing a list accumulator. Each time should include the iteration# und parallel task id. After the execution, the acculuator will be available in the execution result. Accumulators can of course also be used to collect number of messages, etc. Best, Fabian 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <[hidden email]>: > Why don't you use Flink dataset output functions (like writeAsText, > writeAsCsv, etc..)? > Or if they are not sufficient you can implement/override your own > InputFormat. > > From what is my experience static variables are evil in distributed > environments.. > Moreover, one of the main strengths of Flink are its input/output APIs so I > would avoid to write to a file in that way. > > Of course, dataset.append() will be a very convenient API to add (IMHO). > > Best, > Flavio > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <[hidden email]> > wrote: > > > Hey guys, > > > > Me again :) So now that my wonderful job finishes, I would like to > monitor > > it a bit (i.e. build some charts on the number of messages per vertex, > > compute the total amount of time elapsed per computation per vertex, > etc). > > > > The main computational-intensive operation is a coGroup. There, within > the > > iteration I count the number of "messages" sent and then I do simple: > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8); > > > > The problem is that with this approach, I get a deadlock (yes!! Now that > I > > know the code itself works I am positive that the deadlock comes from the > > append -this regarding my previous mail-). It is normal if you come to > > think of it 200 something threads are trying to write to the same file... > > > > A possible workaround is this one: > > > > public class Singleton { > > private static final Singleton inst= new Singleton(); > > > > private Singleton() { > > super(); > > } > > > > public synchronized void writeToFile(String str) { > > // Do whatever > > } > > > > public Singleton getInstance() { > > return inst; > > } > > } > > > > Singleton.getInstance().writeToFile("Hello!!"); > > > > However, I am not sure how well Flink plays with synchronised.... > > > > Is there a smarter way to do it? > > > > Thanks! > > > > Andra > > > |
|
Hey Fabian,
I am aware of the way open, preSuperstep(), postSuperstep() etc can help me within an interation, unfortunately I am writing my own method here. I could try to briefly describe it: public static final class PropagateNeighborValues implements NeighborsFunctionWithVertexValue(...) { @Override public void iterateNeighbors(Iterable..., Collector...) { while(iterator.hasNext) neighbors++; // and I would need something like appendToFile(myAwesomeFile, neighbors); } } Open() and synchronised are definitely not doing the trick for me right now. Any other way !? :( On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <[hidden email]> wrote: > You can measure the time of each iteration in the open() methods operators > within an iteration. open() will be called before each iteration. > The times can be collected by either printing to std out (you need to > collect the files then...) or by implementing a list accumulator. Each time > should include the iteration# und parallel task id. > After the execution, the acculuator will be available in the execution > result. > > Accumulators can of course also be used to collect number of messages, etc. > > Best, Fabian > > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > Why don't you use Flink dataset output functions (like writeAsText, > > writeAsCsv, etc..)? > > Or if they are not sufficient you can implement/override your own > > InputFormat. > > > > From what is my experience static variables are evil in distributed > > environments.. > > Moreover, one of the main strengths of Flink are its input/output APIs > so I > > would avoid to write to a file in that way. > > > > Of course, dataset.append() will be a very convenient API to add (IMHO). > > > > Best, > > Flavio > > > > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <[hidden email]> > > wrote: > > > > > Hey guys, > > > > > > Me again :) So now that my wonderful job finishes, I would like to > > monitor > > > it a bit (i.e. build some charts on the number of messages per vertex, > > > compute the total amount of time elapsed per computation per vertex, > > etc). > > > > > > The main computational-intensive operation is a coGroup. There, within > > the > > > iteration I count the number of "messages" sent and then I do simple: > > > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8); > > > > > > The problem is that with this approach, I get a deadlock (yes!! Now > that > > I > > > know the code itself works I am positive that the deadlock comes from > the > > > append -this regarding my previous mail-). It is normal if you come to > > > think of it 200 something threads are trying to write to the same > file... > > > > > > A possible workaround is this one: > > > > > > public class Singleton { > > > private static final Singleton inst= new Singleton(); > > > > > > private Singleton() { > > > super(); > > > } > > > > > > public synchronized void writeToFile(String str) { > > > // Do whatever > > > } > > > > > > public Singleton getInstance() { > > > return inst; > > > } > > > } > > > > > > Singleton.getInstance().writeToFile("Hello!!"); > > > > > > However, I am not sure how well Flink plays with synchronised.... > > > > > > Is there a smarter way to do it? > > > > > > Thanks! > > > > > > Andra > > > > > > |
|
Have you tried to use a custom accumulator that just appends to a list?
2015-06-29 12:59 GMT+02:00 Andra Lungu <[hidden email]>: > Hey Fabian, > > I am aware of the way open, preSuperstep(), postSuperstep() etc can help me > within an interation, unfortunately I am writing my own method here. I > could try to briefly describe it: > > public static final class PropagateNeighborValues implements > NeighborsFunctionWithVertexValue(...) { > @Override > public void iterateNeighbors(Iterable..., Collector...) { > while(iterator.hasNext) neighbors++; > // and I would need something like > appendToFile(myAwesomeFile, neighbors); > } > } > > Open() and synchronised are definitely not doing the trick for me right > now. > Any other way !? :( > > On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <[hidden email]> wrote: > > > You can measure the time of each iteration in the open() methods > operators > > within an iteration. open() will be called before each iteration. > > The times can be collected by either printing to std out (you need to > > collect the files then...) or by implementing a list accumulator. Each > time > > should include the iteration# und parallel task id. > > After the execution, the acculuator will be available in the execution > > result. > > > > Accumulators can of course also be used to collect number of messages, > etc. > > > > Best, Fabian > > > > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > Why don't you use Flink dataset output functions (like writeAsText, > > > writeAsCsv, etc..)? > > > Or if they are not sufficient you can implement/override your own > > > InputFormat. > > > > > > From what is my experience static variables are evil in distributed > > > environments.. > > > Moreover, one of the main strengths of Flink are its input/output APIs > > so I > > > would avoid to write to a file in that way. > > > > > > Of course, dataset.append() will be a very convenient API to add > (IMHO). > > > > > > Best, > > > Flavio > > > > > > > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <[hidden email]> > > > wrote: > > > > > > > Hey guys, > > > > > > > > Me again :) So now that my wonderful job finishes, I would like to > > > monitor > > > > it a bit (i.e. build some charts on the number of messages per > vertex, > > > > compute the total amount of time elapsed per computation per vertex, > > > etc). > > > > > > > > The main computational-intensive operation is a coGroup. There, > within > > > the > > > > iteration I count the number of "messages" sent and then I do simple: > > > > > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8); > > > > > > > > The problem is that with this approach, I get a deadlock (yes!! Now > > that > > > I > > > > know the code itself works I am positive that the deadlock comes from > > the > > > > append -this regarding my previous mail-). It is normal if you come > to > > > > think of it 200 something threads are trying to write to the same > > file... > > > > > > > > A possible workaround is this one: > > > > > > > > public class Singleton { > > > > private static final Singleton inst= new Singleton(); > > > > > > > > private Singleton() { > > > > super(); > > > > } > > > > > > > > public synchronized void writeToFile(String str) { > > > > // Do whatever > > > > } > > > > > > > > public Singleton getInstance() { > > > > return inst; > > > > } > > > > } > > > > > > > > Singleton.getInstance().writeToFile("Hello!!"); > > > > > > > > However, I am not sure how well Flink plays with synchronised.... > > > > > > > > Is there a smarter way to do it? > > > > > > > > Thanks! > > > > > > > > Andra > > > > > > > > > > |
|
Caution! I am getting philosophical. Stop me if I'm talking nonsense!
You are suggesting a list that will have one or two entries per vertex = (approx) billions. Won't this over-saturate my memory? I am already filling it with lots of junk resulted from the computation... On Mon, Jun 29, 2015 at 1:58 PM, Fabian Hueske <[hidden email]> wrote: > Have you tried to use a custom accumulator that just appends to a list? > > 2015-06-29 12:59 GMT+02:00 Andra Lungu <[hidden email]>: > > > Hey Fabian, > > > > I am aware of the way open, preSuperstep(), postSuperstep() etc can help > me > > within an interation, unfortunately I am writing my own method here. I > > could try to briefly describe it: > > > > public static final class PropagateNeighborValues implements > > NeighborsFunctionWithVertexValue(...) { > > @Override > > public void iterateNeighbors(Iterable..., Collector...) { > > while(iterator.hasNext) neighbors++; > > // and I would need something like > > appendToFile(myAwesomeFile, neighbors); > > } > > } > > > > Open() and synchronised are definitely not doing the trick for me right > > now. > > Any other way !? :( > > > > On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <[hidden email]> > wrote: > > > > > You can measure the time of each iteration in the open() methods > > operators > > > within an iteration. open() will be called before each iteration. > > > The times can be collected by either printing to std out (you need to > > > collect the files then...) or by implementing a list accumulator. Each > > time > > > should include the iteration# und parallel task id. > > > After the execution, the acculuator will be available in the execution > > > result. > > > > > > Accumulators can of course also be used to collect number of messages, > > etc. > > > > > > Best, Fabian > > > > > > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > > > Why don't you use Flink dataset output functions (like writeAsText, > > > > writeAsCsv, etc..)? > > > > Or if they are not sufficient you can implement/override your own > > > > InputFormat. > > > > > > > > From what is my experience static variables are evil in distributed > > > > environments.. > > > > Moreover, one of the main strengths of Flink are its input/output > APIs > > > so I > > > > would avoid to write to a file in that way. > > > > > > > > Of course, dataset.append() will be a very convenient API to add > > (IMHO). > > > > > > > > Best, > > > > Flavio > > > > > > > > > > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <[hidden email]> > > > > wrote: > > > > > > > > > Hey guys, > > > > > > > > > > Me again :) So now that my wonderful job finishes, I would like to > > > > monitor > > > > > it a bit (i.e. build some charts on the number of messages per > > vertex, > > > > > compute the total amount of time elapsed per computation per > vertex, > > > > etc). > > > > > > > > > > The main computational-intensive operation is a coGroup. There, > > within > > > > the > > > > > iteration I count the number of "messages" sent and then I do > simple: > > > > > > > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8); > > > > > > > > > > The problem is that with this approach, I get a deadlock (yes!! Now > > > that > > > > I > > > > > know the code itself works I am positive that the deadlock comes > from > > > the > > > > > append -this regarding my previous mail-). It is normal if you come > > to > > > > > think of it 200 something threads are trying to write to the same > > > file... > > > > > > > > > > A possible workaround is this one: > > > > > > > > > > public class Singleton { > > > > > private static final Singleton inst= new Singleton(); > > > > > > > > > > private Singleton() { > > > > > super(); > > > > > } > > > > > > > > > > public synchronized void writeToFile(String str) { > > > > > // Do whatever > > > > > } > > > > > > > > > > public Singleton getInstance() { > > > > > return inst; > > > > > } > > > > > } > > > > > > > > > > Singleton.getInstance().writeToFile("Hello!!"); > > > > > > > > > > However, I am not sure how well Flink plays with synchronised.... > > > > > > > > > > Is there a smarter way to do it? > > > > > > > > > > Thanks! > > > > > > > > > > Andra > > > > > > > > > > > > > > > |
|
Andra,
why don't you simply print to standard output and gather your metrics from the taskmanagers' log files after execution? Wouldn't that work for you? -V. On 29 June 2015 at 22:36, Andra Lungu <[hidden email]> wrote: > Caution! I am getting philosophical. Stop me if I'm talking nonsense! > > You are suggesting a list that will have one or two entries per vertex = > (approx) billions. Won't this over-saturate my memory? I am already filling > it with lots of junk resulted from the computation... > > On Mon, Jun 29, 2015 at 1:58 PM, Fabian Hueske <[hidden email]> wrote: > > > Have you tried to use a custom accumulator that just appends to a list? > > > > 2015-06-29 12:59 GMT+02:00 Andra Lungu <[hidden email]>: > > > > > Hey Fabian, > > > > > > I am aware of the way open, preSuperstep(), postSuperstep() etc can > help > > me > > > within an interation, unfortunately I am writing my own method here. I > > > could try to briefly describe it: > > > > > > public static final class PropagateNeighborValues implements > > > NeighborsFunctionWithVertexValue(...) { > > > @Override > > > public void iterateNeighbors(Iterable..., Collector...) { > > > while(iterator.hasNext) neighbors++; > > > // and I would need something like > > > appendToFile(myAwesomeFile, neighbors); > > > } > > > } > > > > > > Open() and synchronised are definitely not doing the trick for me right > > > now. > > > Any other way !? :( > > > > > > On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > You can measure the time of each iteration in the open() methods > > > operators > > > > within an iteration. open() will be called before each iteration. > > > > The times can be collected by either printing to std out (you need to > > > > collect the files then...) or by implementing a list accumulator. > Each > > > time > > > > should include the iteration# und parallel task id. > > > > After the execution, the acculuator will be available in the > execution > > > > result. > > > > > > > > Accumulators can of course also be used to collect number of > messages, > > > etc. > > > > > > > > Best, Fabian > > > > > > > > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > > > > > Why don't you use Flink dataset output functions (like writeAsText, > > > > > writeAsCsv, etc..)? > > > > > Or if they are not sufficient you can implement/override your own > > > > > InputFormat. > > > > > > > > > > From what is my experience static variables are evil in distributed > > > > > environments.. > > > > > Moreover, one of the main strengths of Flink are its input/output > > APIs > > > > so I > > > > > would avoid to write to a file in that way. > > > > > > > > > > Of course, dataset.append() will be a very convenient API to add > > > (IMHO). > > > > > > > > > > Best, > > > > > Flavio > > > > > > > > > > > > > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu < > [hidden email]> > > > > > wrote: > > > > > > > > > > > Hey guys, > > > > > > > > > > > > Me again :) So now that my wonderful job finishes, I would like > to > > > > > monitor > > > > > > it a bit (i.e. build some charts on the number of messages per > > > vertex, > > > > > > compute the total amount of time elapsed per computation per > > vertex, > > > > > etc). > > > > > > > > > > > > The main computational-intensive operation is a coGroup. There, > > > within > > > > > the > > > > > > iteration I count the number of "messages" sent and then I do > > simple: > > > > > > > > > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8); > > > > > > > > > > > > The problem is that with this approach, I get a deadlock (yes!! > Now > > > > that > > > > > I > > > > > > know the code itself works I am positive that the deadlock comes > > from > > > > the > > > > > > append -this regarding my previous mail-). It is normal if you > come > > > to > > > > > > think of it 200 something threads are trying to write to the same > > > > file... > > > > > > > > > > > > A possible workaround is this one: > > > > > > > > > > > > public class Singleton { > > > > > > private static final Singleton inst= new Singleton(); > > > > > > > > > > > > private Singleton() { > > > > > > super(); > > > > > > } > > > > > > > > > > > > public synchronized void writeToFile(String str) { > > > > > > // Do whatever > > > > > > } > > > > > > > > > > > > public Singleton getInstance() { > > > > > > return inst; > > > > > > } > > > > > > } > > > > > > > > > > > > Singleton.getInstance().writeToFile("Hello!!"); > > > > > > > > > > > > However, I am not sure how well Flink plays with synchronised.... > > > > > > > > > > > > Is there a smarter way to do it? > > > > > > > > > > > > Thanks! > > > > > > > > > > > > Andra > > > > > > > > > > > > > > > > > > > > > |
|
Hi Andra,
sure, if you do the logging for each record (or group of records) using a list accumulator is a very bad idea. If you don't need exact stats for each vertex but rather a distribution over all vertices, you can use a histogram accumulator. If you need exact vertex stats, I'd go with Vasia's proposal. 2015-06-29 22:48 GMT+02:00 Vasiliki Kalavri <[hidden email]>: > Andra, > > why don't you simply print to standard output and gather your metrics from > the taskmanagers' log files after execution? > Wouldn't that work for you? > > -V. > > On 29 June 2015 at 22:36, Andra Lungu <[hidden email]> wrote: > > > Caution! I am getting philosophical. Stop me if I'm talking nonsense! > > > > You are suggesting a list that will have one or two entries per vertex = > > (approx) billions. Won't this over-saturate my memory? I am already > filling > > it with lots of junk resulted from the computation... > > > > On Mon, Jun 29, 2015 at 1:58 PM, Fabian Hueske <[hidden email]> > wrote: > > > > > Have you tried to use a custom accumulator that just appends to a list? > > > > > > 2015-06-29 12:59 GMT+02:00 Andra Lungu <[hidden email]>: > > > > > > > Hey Fabian, > > > > > > > > I am aware of the way open, preSuperstep(), postSuperstep() etc can > > help > > > me > > > > within an interation, unfortunately I am writing my own method here. > I > > > > could try to briefly describe it: > > > > > > > > public static final class PropagateNeighborValues implements > > > > NeighborsFunctionWithVertexValue(...) { > > > > @Override > > > > public void iterateNeighbors(Iterable..., Collector...) { > > > > while(iterator.hasNext) neighbors++; > > > > // and I would need something like > > > > appendToFile(myAwesomeFile, neighbors); > > > > } > > > > } > > > > > > > > Open() and synchronised are definitely not doing the trick for me > right > > > > now. > > > > Any other way !? :( > > > > > > > > On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <[hidden email]> > > > wrote: > > > > > > > > > You can measure the time of each iteration in the open() methods > > > > operators > > > > > within an iteration. open() will be called before each iteration. > > > > > The times can be collected by either printing to std out (you need > to > > > > > collect the files then...) or by implementing a list accumulator. > > Each > > > > time > > > > > should include the iteration# und parallel task id. > > > > > After the execution, the acculuator will be available in the > > execution > > > > > result. > > > > > > > > > > Accumulators can of course also be used to collect number of > > messages, > > > > etc. > > > > > > > > > > Best, Fabian > > > > > > > > > > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <[hidden email] > >: > > > > > > > > > > > Why don't you use Flink dataset output functions (like > writeAsText, > > > > > > writeAsCsv, etc..)? > > > > > > Or if they are not sufficient you can implement/override your own > > > > > > InputFormat. > > > > > > > > > > > > From what is my experience static variables are evil in > distributed > > > > > > environments.. > > > > > > Moreover, one of the main strengths of Flink are its input/output > > > APIs > > > > > so I > > > > > > would avoid to write to a file in that way. > > > > > > > > > > > > Of course, dataset.append() will be a very convenient API to add > > > > (IMHO). > > > > > > > > > > > > Best, > > > > > > Flavio > > > > > > > > > > > > > > > > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu < > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > Hey guys, > > > > > > > > > > > > > > Me again :) So now that my wonderful job finishes, I would like > > to > > > > > > monitor > > > > > > > it a bit (i.e. build some charts on the number of messages per > > > > vertex, > > > > > > > compute the total amount of time elapsed per computation per > > > vertex, > > > > > > etc). > > > > > > > > > > > > > > The main computational-intensive operation is a coGroup. There, > > > > within > > > > > > the > > > > > > > iteration I count the number of "messages" sent and then I do > > > simple: > > > > > > > > > > > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8); > > > > > > > > > > > > > > The problem is that with this approach, I get a deadlock (yes!! > > Now > > > > > that > > > > > > I > > > > > > > know the code itself works I am positive that the deadlock > comes > > > from > > > > > the > > > > > > > append -this regarding my previous mail-). It is normal if you > > come > > > > to > > > > > > > think of it 200 something threads are trying to write to the > same > > > > > file... > > > > > > > > > > > > > > A possible workaround is this one: > > > > > > > > > > > > > > public class Singleton { > > > > > > > private static final Singleton inst= new Singleton(); > > > > > > > > > > > > > > private Singleton() { > > > > > > > super(); > > > > > > > } > > > > > > > > > > > > > > public synchronized void writeToFile(String str) { > > > > > > > // Do whatever > > > > > > > } > > > > > > > > > > > > > > public Singleton getInstance() { > > > > > > > return inst; > > > > > > > } > > > > > > > } > > > > > > > > > > > > > > Singleton.getInstance().writeToFile("Hello!!"); > > > > > > > > > > > > > > However, I am not sure how well Flink plays with > synchronised.... > > > > > > > > > > > > > > Is there a smarter way to do it? > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > Andra > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
| Free forum by Nabble | Edit this page |
