Hello,
I’m trying to wrap my head around task parallelism in a Flink cluster. Let’s say I have a cluster of 3 nodes, each node offering 16 task slots, so in total I’d have 48 slots for processing. Do the parallel instances of each task get distributed across the cluster or is it possible that they all run on the same node? If they can all run on the same node, what happens when that node crashes? Does the job manager recreate them using the remaining open slots? Thanks, Ali |
> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote: > Do the parallel instances of each task get distributed across the cluster or is it possible that they all run on the same node? Yes, slots are requested from all nodes of the cluster. But keep in mind that multiple tasks (forming a local pipeline) can be scheduled to the same slot (1 slot can hold many tasks). Have you seen this? https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/job_scheduling.html > If they can all run on the same node, what happens when that node crashes? Does the job manager recreate them using the remaining open slots? What happens: The job manager tries to restart the program with the same parallelism. Thus if you have enough free slots available in your cluster, this works smoothly (so yes, the remaining/available slots are used) With a YARN cluster the task manager containers are restarted automatically. In standalone mode, you have to take care of this yourself. Does this help? – Ufuk |
Is there a way to make a task cluster-parallelizable? I.e. Make sure the
parallel instances of the task are distributed across the cluster. When I run my flink job with a parallelism of 16, all the parallel tasks are assigned to the first task manager. - Ali On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote: >> Do the parallel instances of each task get distributed across the >>cluster or is it possible that they all run on the same node? > >Yes, slots are requested from all nodes of the cluster. But keep in mind >that multiple tasks (forming a local pipeline) can be scheduled to the >same slot (1 slot can hold many tasks). > >Have you seen this? >https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/job >_scheduling.html > >> If they can all run on the same node, what happens when that node >>crashes? Does the job manager recreate them using the remaining open >>slots? > >What happens: The job manager tries to restart the program with the same >parallelism. Thus if you have enough free slots available in your >cluster, this works smoothly (so yes, the remaining/available slots are >used) > >With a YARN cluster the task manager containers are restarted >automatically. In standalone mode, you have to take care of this yourself. > > >Does this help? > > Ufuk > |
> On 01 Dec 2015, at 15:26, Kashmar, Ali <[hidden email]> wrote: > > Is there a way to make a task cluster-parallelizable? I.e. Make sure the > parallel instances of the task are distributed across the cluster. When I > run my flink job with a parallelism of 16, all the parallel tasks are > assigned to the first task manager. No, currently this is not possible*. But it can actually be beneficial for job performance if everything is running locally on a single task manager. Is there a specific reason that you want to spread it out? – Ufuk * There was a pull request by Till Rohrmann, which never made it into the main code: https://github.com/apache/flink/pull/60 |
In reply to this post by Kashmar, Ali
Slots are like "resource groups" which execute entire pipelines. They
frequently have more than one operator. What you can try as a workaround is decrease the number of slots per machine to cause the operators to be spread across more machines. If this is a crucial issue for your use case, it should be simple to add a "preference to spread out" to the scheduler... On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]> wrote: > Is there a way to make a task cluster-parallelizable? I.e. Make sure the > parallel instances of the task are distributed across the cluster. When I > run my flink job with a parallelism of 16, all the parallel tasks are > assigned to the first task manager. > > - Ali > > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: > > > > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote: > >> Do the parallel instances of each task get distributed across the > >>cluster or is it possible that they all run on the same node? > > > >Yes, slots are requested from all nodes of the cluster. But keep in mind > >that multiple tasks (forming a local pipeline) can be scheduled to the > >same slot (1 slot can hold many tasks). > > > >Have you seen this? > > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/job > >_scheduling.html > > > >> If they can all run on the same node, what happens when that node > >>crashes? Does the job manager recreate them using the remaining open > >>slots? > > > >What happens: The job manager tries to restart the program with the same > >parallelism. Thus if you have enough free slots available in your > >cluster, this works smoothly (so yes, the remaining/available slots are > >used) > > > >With a YARN cluster the task manager containers are restarted > >automatically. In standalone mode, you have to take care of this yourself. > > > > > >Does this help? > > > > Ufuk > > > > |
If I'm not mistaken, then the scheduler has already a preference to spread
independent pipelines out across the cluster. At least he uses a queue of instances from which it pops the first element if it allocates a new slot. This instance is then appended to the queue again, if it has some resources (slots) left. I would assume that you have a shuffle operation involved in your job such that it makes sense for the scheduler to deploy all pipelines to the same machine. Cheers, Till On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote: > Slots are like "resource groups" which execute entire pipelines. They > frequently have more than one operator. > > What you can try as a workaround is decrease the number of slots per > machine to cause the operators to be spread across more machines. > > If this is a crucial issue for your use case, it should be simple to add a > "preference to spread out" to the scheduler... > > On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]> wrote: > > > Is there a way to make a task cluster-parallelizable? I.e. Make sure the > > parallel instances of the task are distributed across the cluster. When I > > run my flink job with a parallelism of 16, all the parallel tasks are > > assigned to the first task manager. > > > > - Ali > > > > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: > > > > > > > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote: > > >> Do the parallel instances of each task get distributed across the > > >>cluster or is it possible that they all run on the same node? > > > > > >Yes, slots are requested from all nodes of the cluster. But keep in mind > > >that multiple tasks (forming a local pipeline) can be scheduled to the > > >same slot (1 slot can hold many tasks). > > > > > >Have you seen this? > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/job > > >_scheduling.html > > > > > >> If they can all run on the same node, what happens when that node > > >>crashes? Does the job manager recreate them using the remaining open > > >>slots? > > > > > >What happens: The job manager tries to restart the program with the same > > >parallelism. Thus if you have enough free slots available in your > > >cluster, this works smoothly (so yes, the remaining/available slots are > > >used) > > > > > >With a YARN cluster the task manager containers are restarted > > >automatically. In standalone mode, you have to take care of this > yourself. > > > > > > > > >Does this help? > > > > > > Ufuk > > > > > > > > |
There is no shuffle operation in my flow. Mine actually looks like this:
Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map -> Map, Filter) Maybe it’s treating this whole flow as one pipeline and assigning it to a slot. What I really wanted was to have the custom source I built to have running instances on all nodes. I’m not really sure if that’s the right approach, but if we could add this as a feature that’d be great, since having more than one node running the same pipeline guarantees the pipeline is never offline. -Ali On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote: >If I'm not mistaken, then the scheduler has already a preference to spread >independent pipelines out across the cluster. At least he uses a queue of >instances from which it pops the first element if it allocates a new slot. >This instance is then appended to the queue again, if it has some >resources >(slots) left. > >I would assume that you have a shuffle operation involved in your job such >that it makes sense for the scheduler to deploy all pipelines to the same >machine. > >Cheers, >Till >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote: > >> Slots are like "resource groups" which execute entire pipelines. They >> frequently have more than one operator. >> >> What you can try as a workaround is decrease the number of slots per >> machine to cause the operators to be spread across more machines. >> >> If this is a crucial issue for your use case, it should be simple to >>add a >> "preference to spread out" to the scheduler... >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]> >>wrote: >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make sure >>the >> > parallel instances of the task are distributed across the cluster. >>When I >> > run my flink job with a parallelism of 16, all the parallel tasks are >> > assigned to the first task manager. >> > >> > - Ali >> > >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: >> > >> > > >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote: >> > >> Do the parallel instances of each task get distributed across the >> > >>cluster or is it possible that they all run on the same node? >> > > >> > >Yes, slots are requested from all nodes of the cluster. But keep in >>mind >> > >that multiple tasks (forming a local pipeline) can be scheduled to >>the >> > >same slot (1 slot can hold many tasks). >> > > >> > >Have you seen this? >> > > >> > >> >>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo >>b >> > >_scheduling.html >> > > >> > >> If they can all run on the same node, what happens when that node >> > >>crashes? Does the job manager recreate them using the remaining open >> > >>slots? >> > > >> > >What happens: The job manager tries to restart the program with the >>same >> > >parallelism. Thus if you have enough free slots available in your >> > >cluster, this works smoothly (so yes, the remaining/available slots >>are >> > >used) >> > > >> > >With a YARN cluster the task manager containers are restarted >> > >automatically. In standalone mode, you have to take care of this >> yourself. >> > > >> > > >> > >Does this help? >> > > >> > > Ufuk >> > > >> > >> > >> |
Hi Ali!
In the case you have, the sequence of source-map-filter ... forms a pipeline. You mentioned that you set the parallelism to 16, so there should be 16 pipelines. These pipelines should be completely independent. Looking at the way the scheduler is implemented, independent pipelines should be spread across machines. But when you execute that in parallel, you say all 16 pipelines end up on the same machine? Can you share with us the rough code of your program? Or a Screenshot from the runtime dashboard that shows the program graph? If your cluster is basically for that one job only, you could try and set the number of slots to 4 for each machine. Then you have 16 slots in total and each node would run one of the 16 pipelines. Greetings, Stephan On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> wrote: > There is no shuffle operation in my flow. Mine actually looks like this: > > Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map -> > Map, Filter) > > > Maybe it’s treating this whole flow as one pipeline and assigning it to a > slot. What I really wanted was to have the custom source I built to have > running instances on all nodes. I’m not really sure if that’s the right > approach, but if we could add this as a feature that’d be great, since > having more than one node running the same pipeline guarantees the > pipeline is never offline. > > -Ali > > On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote: > > >If I'm not mistaken, then the scheduler has already a preference to spread > >independent pipelines out across the cluster. At least he uses a queue of > >instances from which it pops the first element if it allocates a new slot. > >This instance is then appended to the queue again, if it has some > >resources > >(slots) left. > > > >I would assume that you have a shuffle operation involved in your job such > >that it makes sense for the scheduler to deploy all pipelines to the same > >machine. > > > >Cheers, > >Till > >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote: > > > >> Slots are like "resource groups" which execute entire pipelines. They > >> frequently have more than one operator. > >> > >> What you can try as a workaround is decrease the number of slots per > >> machine to cause the operators to be spread across more machines. > >> > >> If this is a crucial issue for your use case, it should be simple to > >>add a > >> "preference to spread out" to the scheduler... > >> > >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]> > >>wrote: > >> > >> > Is there a way to make a task cluster-parallelizable? I.e. Make sure > >>the > >> > parallel instances of the task are distributed across the cluster. > >>When I > >> > run my flink job with a parallelism of 16, all the parallel tasks are > >> > assigned to the first task manager. > >> > > >> > - Ali > >> > > >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: > >> > > >> > > > >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> > wrote: > >> > >> Do the parallel instances of each task get distributed across the > >> > >>cluster or is it possible that they all run on the same node? > >> > > > >> > >Yes, slots are requested from all nodes of the cluster. But keep in > >>mind > >> > >that multiple tasks (forming a local pipeline) can be scheduled to > >>the > >> > >same slot (1 slot can hold many tasks). > >> > > > >> > >Have you seen this? > >> > > > >> > > >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo > >>b > >> > >_scheduling.html > >> > > > >> > >> If they can all run on the same node, what happens when that node > >> > >>crashes? Does the job manager recreate them using the remaining open > >> > >>slots? > >> > > > >> > >What happens: The job manager tries to restart the program with the > >>same > >> > >parallelism. Thus if you have enough free slots available in your > >> > >cluster, this works smoothly (so yes, the remaining/available slots > >>are > >> > >used) > >> > > > >> > >With a YARN cluster the task manager containers are restarted > >> > >automatically. In standalone mode, you have to take care of this > >> yourself. > >> > > > >> > > > >> > >Does this help? > >> > > > >> > > Ufuk > >> > > > >> > > >> > > >> > > |
Hi Stephan,
That was my original understanding, until I realized that I was not using a parallel socket source. I had a custom source that extended SourceFunction which always runs with parallelism = 1. I looked through the API and found the ParallelSourceFunction interface so I implemented that and voila, now all 3 nodes in the cluster are actually receiving traffic on socket connections. Now that I’m running it successfully end to end, I’m trying to improve the performance. Can you take a look at the attached screen shot and tell me if the distribution of work amongst the pipelines is normal? I feel like some pipelines are lot lazier than others, even though the cluster nodes are exactly the same. By the way, here’s the class I wrote. It would be useful to have this available in Flink distro: public class ParallelSocketSource implements ParallelSourceFunction<String> { private static final long serialVersionUID = -271094428915640892L; private static final Logger LOG = LoggerFactory.getLogger(ParallelSocketSource.class); private volatile boolean running = true; private String host; private int port; public ParallelSocketSource(String host, int port) { this.host = host; this.port = port; } @Override public void run(SourceContext<String> ctx) throws Exception { try (Socket socket = new Socket(host, port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { String line = null; while(running && ((line = reader.readLine()) != null)) { ctx.collect(line); } } catch(IOException ex) { LOG.error("error reading from socket", ex); } } @Override public void cancel() { running = false; } } Regards, Ali On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote: >Hi Ali! > >In the case you have, the sequence of source-map-filter ... forms a >pipeline. > >You mentioned that you set the parallelism to 16, so there should be 16 >pipelines. These pipelines should be completely independent. > >Looking at the way the scheduler is implemented, independent pipelines >should be spread across machines. But when you execute that in parallel, >you say all 16 pipelines end up on the same machine? > >Can you share with us the rough code of your program? Or a Screenshot from >the runtime dashboard that shows the program graph? > > >If your cluster is basically for that one job only, you could try and set >the number of slots to 4 for each machine. Then you have 16 slots in total >and each node would run one of the 16 pipelines. > > >Greetings, >Stephan > > >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> wrote: > >> There is no shuffle operation in my flow. Mine actually looks like this: >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map >>-> >> Map, Filter) >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning it to >>a >> slot. What I really wanted was to have the custom source I built to have >> running instances on all nodes. I’m not really sure if that’s the right >> approach, but if we could add this as a feature that’d be great, since >> having more than one node running the same pipeline guarantees the >> pipeline is never offline. >> >> -Ali >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote: >> >> >If I'm not mistaken, then the scheduler has already a preference to >>spread >> >independent pipelines out across the cluster. At least he uses a queue >>of >> >instances from which it pops the first element if it allocates a new >>slot. >> >This instance is then appended to the queue again, if it has some >> >resources >> >(slots) left. >> > >> >I would assume that you have a shuffle operation involved in your job >>such >> >that it makes sense for the scheduler to deploy all pipelines to the >>same >> >machine. >> > >> >Cheers, >> >Till >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote: >> > >> >> Slots are like "resource groups" which execute entire pipelines. They >> >> frequently have more than one operator. >> >> >> >> What you can try as a workaround is decrease the number of slots per >> >> machine to cause the operators to be spread across more machines. >> >> >> >> If this is a crucial issue for your use case, it should be simple to >> >>add a >> >> "preference to spread out" to the scheduler... >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]> >> >>wrote: >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make >>sure >> >>the >> >> > parallel instances of the task are distributed across the cluster. >> >>When I >> >> > run my flink job with a parallelism of 16, all the parallel tasks >>are >> >> > assigned to the first task manager. >> >> > >> >> > - Ali >> >> > >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: >> >> > >> >> > > >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> >> wrote: >> >> > >> Do the parallel instances of each task get distributed across >>the >> >> > >>cluster or is it possible that they all run on the same node? >> >> > > >> >> > >Yes, slots are requested from all nodes of the cluster. But keep >>in >> >>mind >> >> > >that multiple tasks (forming a local pipeline) can be scheduled to >> >>the >> >> > >same slot (1 slot can hold many tasks). >> >> > > >> >> > >Have you seen this? >> >> > > >> >> > >> >> >> >> >> >>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo >> >>b >> >> > >_scheduling.html >> >> > > >> >> > >> If they can all run on the same node, what happens when that >>node >> >> > >>crashes? Does the job manager recreate them using the remaining >>open >> >> > >>slots? >> >> > > >> >> > >What happens: The job manager tries to restart the program with >>the >> >>same >> >> > >parallelism. Thus if you have enough free slots available in your >> >> > >cluster, this works smoothly (so yes, the remaining/available >>slots >> >>are >> >> > >used) >> >> > > >> >> > >With a YARN cluster the task manager containers are restarted >> >> > >automatically. In standalone mode, you have to take care of this >> >> yourself. >> >> > > >> >> > > >> >> > >Does this help? >> >> > > >> >> > > Ufuk >> >> > > >> >> > >> >> > >> >> >> >> |
Hi!
The parallel socket source looks good. I think you forgot to attach the screenshot, or the mailing list dropped the attachment... Not sure if I can diagnose that without more details. The sources all do the same. Assuming that the server distributes the data evenly across all connected sockets, and that the network bandwidth ends up being divided in a fair way, all pipelines should run be similarly "eager". Greetings, Stephan On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> wrote: > Hi Stephan, > > That was my original understanding, until I realized that I was not using > a parallel socket source. I had a custom source that extended > SourceFunction which always runs with parallelism = 1. I looked through > the API and found the ParallelSourceFunction interface so I implemented > that and voila, now all 3 nodes in the cluster are actually receiving > traffic on socket connections. > > Now that I’m running it successfully end to end, I’m trying to improve the > performance. Can you take a look at the attached screen shot and tell me > if the distribution of work amongst the pipelines is normal? I feel like > some pipelines are lot lazier than others, even though the cluster nodes > are exactly the same. > > By the way, here’s the class I wrote. It would be useful to have this > available in Flink distro: > > public class ParallelSocketSource implements > ParallelSourceFunction<String> { > > private static final long serialVersionUID = -271094428915640892L; > private static final Logger LOG = > LoggerFactory.getLogger(ParallelSocketSource.class); > > private volatile boolean running = true; > private String host; > private int port; > > public ParallelSocketSource(String host, int port) { > this.host = host; > this.port = port; > } > > @Override > public void run(SourceContext<String> ctx) throws Exception { > try (Socket socket = new Socket(host, port); > BufferedReader reader = new BufferedReader(new > InputStreamReader(socket.getInputStream()))) { > String line = null; > while(running && ((line = reader.readLine()) != > null)) { > ctx.collect(line); > } > } catch(IOException ex) { > LOG.error("error reading from socket", ex); > } > } > > @Override > public void cancel() { > running = false; > } > } > > Regards, > Ali > > > On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote: > > >Hi Ali! > > > >In the case you have, the sequence of source-map-filter ... forms a > >pipeline. > > > >You mentioned that you set the parallelism to 16, so there should be 16 > >pipelines. These pipelines should be completely independent. > > > >Looking at the way the scheduler is implemented, independent pipelines > >should be spread across machines. But when you execute that in parallel, > >you say all 16 pipelines end up on the same machine? > > > >Can you share with us the rough code of your program? Or a Screenshot from > >the runtime dashboard that shows the program graph? > > > > > >If your cluster is basically for that one job only, you could try and set > >the number of slots to 4 for each machine. Then you have 16 slots in total > >and each node would run one of the 16 pipelines. > > > > > >Greetings, > >Stephan > > > > > >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> wrote: > > > >> There is no shuffle operation in my flow. Mine actually looks like this: > >> > >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map > >>-> > >> Map, Filter) > >> > >> > >> Maybe it’s treating this whole flow as one pipeline and assigning it to > >>a > >> slot. What I really wanted was to have the custom source I built to have > >> running instances on all nodes. I’m not really sure if that’s the right > >> approach, but if we could add this as a feature that’d be great, since > >> having more than one node running the same pipeline guarantees the > >> pipeline is never offline. > >> > >> -Ali > >> > >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote: > >> > >> >If I'm not mistaken, then the scheduler has already a preference to > >>spread > >> >independent pipelines out across the cluster. At least he uses a queue > >>of > >> >instances from which it pops the first element if it allocates a new > >>slot. > >> >This instance is then appended to the queue again, if it has some > >> >resources > >> >(slots) left. > >> > > >> >I would assume that you have a shuffle operation involved in your job > >>such > >> >that it makes sense for the scheduler to deploy all pipelines to the > >>same > >> >machine. > >> > > >> >Cheers, > >> >Till > >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote: > >> > > >> >> Slots are like "resource groups" which execute entire pipelines. They > >> >> frequently have more than one operator. > >> >> > >> >> What you can try as a workaround is decrease the number of slots per > >> >> machine to cause the operators to be spread across more machines. > >> >> > >> >> If this is a crucial issue for your use case, it should be simple to > >> >>add a > >> >> "preference to spread out" to the scheduler... > >> >> > >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]> > >> >>wrote: > >> >> > >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make > >>sure > >> >>the > >> >> > parallel instances of the task are distributed across the cluster. > >> >>When I > >> >> > run my flink job with a parallelism of 16, all the parallel tasks > >>are > >> >> > assigned to the first task manager. > >> >> > > >> >> > - Ali > >> >> > > >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: > >> >> > > >> >> > > > >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> > >> wrote: > >> >> > >> Do the parallel instances of each task get distributed across > >>the > >> >> > >>cluster or is it possible that they all run on the same node? > >> >> > > > >> >> > >Yes, slots are requested from all nodes of the cluster. But keep > >>in > >> >>mind > >> >> > >that multiple tasks (forming a local pipeline) can be scheduled to > >> >>the > >> >> > >same slot (1 slot can hold many tasks). > >> >> > > > >> >> > >Have you seen this? > >> >> > > > >> >> > > >> >> > >> >> > >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo > >> >>b > >> >> > >_scheduling.html > >> >> > > > >> >> > >> If they can all run on the same node, what happens when that > >>node > >> >> > >>crashes? Does the job manager recreate them using the remaining > >>open > >> >> > >>slots? > >> >> > > > >> >> > >What happens: The job manager tries to restart the program with > >>the > >> >>same > >> >> > >parallelism. Thus if you have enough free slots available in your > >> >> > >cluster, this works smoothly (so yes, the remaining/available > >>slots > >> >>are > >> >> > >used) > >> >> > > > >> >> > >With a YARN cluster the task manager containers are restarted > >> >> > >automatically. In standalone mode, you have to take care of this > >> >> yourself. > >> >> > > > >> >> > > > >> >> > >Does this help? > >> >> > > > >> >> > > Ufuk > >> >> > > > >> >> > > >> >> > > >> >> > >> > >> > > |
Hi Stephan,
Here’s a link to the screenshot I tried to attach earlier: https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 It looks to me like the distribution is fairly skewed across the nodes, even though they’re executing the same pipeline. Thanks, Ali On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote: >Hi! > >The parallel socket source looks good. >I think you forgot to attach the screenshot, or the mailing list dropped >the attachment... > >Not sure if I can diagnose that without more details. The sources all do >the same. Assuming that the server distributes the data evenly across all >connected sockets, and that the network bandwidth ends up being divided in >a fair way, all pipelines should run be similarly "eager". > >Greetings, >Stephan > > >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> wrote: > >> Hi Stephan, >> >> That was my original understanding, until I realized that I was not >>using >> a parallel socket source. I had a custom source that extended >> SourceFunction which always runs with parallelism = 1. I looked through >> the API and found the ParallelSourceFunction interface so I implemented >> that and voila, now all 3 nodes in the cluster are actually receiving >> traffic on socket connections. >> >> Now that I’m running it successfully end to end, I’m trying to improve >>the >> performance. Can you take a look at the attached screen shot and tell me >> if the distribution of work amongst the pipelines is normal? I feel like >> some pipelines are lot lazier than others, even though the cluster nodes >> are exactly the same. >> >> By the way, here’s the class I wrote. It would be useful to have this >> available in Flink distro: >> >> public class ParallelSocketSource implements >> ParallelSourceFunction<String> { >> >> private static final long serialVersionUID = >>-271094428915640892L; >> private static final Logger LOG = >> LoggerFactory.getLogger(ParallelSocketSource.class); >> >> private volatile boolean running = true; >> private String host; >> private int port; >> >> public ParallelSocketSource(String host, int port) { >> this.host = host; >> this.port = port; >> } >> >> @Override >> public void run(SourceContext<String> ctx) throws Exception { >> try (Socket socket = new Socket(host, port); >> BufferedReader reader = new BufferedReader(new >> InputStreamReader(socket.getInputStream()))) { >> String line = null; >> while(running && ((line = reader.readLine()) != >> null)) { >> ctx.collect(line); >> } >> } catch(IOException ex) { >> LOG.error("error reading from socket", ex); >> } >> } >> >> @Override >> public void cancel() { >> running = false; >> } >> } >> >> Regards, >> Ali >> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote: >> >> >Hi Ali! >> > >> >In the case you have, the sequence of source-map-filter ... forms a >> >pipeline. >> > >> >You mentioned that you set the parallelism to 16, so there should be 16 >> >pipelines. These pipelines should be completely independent. >> > >> >Looking at the way the scheduler is implemented, independent pipelines >> >should be spread across machines. But when you execute that in >>parallel, >> >you say all 16 pipelines end up on the same machine? >> > >> >Can you share with us the rough code of your program? Or a Screenshot >>from >> >the runtime dashboard that shows the program graph? >> > >> > >> >If your cluster is basically for that one job only, you could try and >>set >> >the number of slots to 4 for each machine. Then you have 16 slots in >>total >> >and each node would run one of the 16 pipelines. >> > >> > >> >Greetings, >> >Stephan >> > >> > >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> >>wrote: >> > >> >> There is no shuffle operation in my flow. Mine actually looks like >>this: >> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> >>Map >> >>-> >> >> Map, Filter) >> >> >> >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning it >>to >> >>a >> >> slot. What I really wanted was to have the custom source I built to >>have >> >> running instances on all nodes. I’m not really sure if that’s the >>right >> >> approach, but if we could add this as a feature that’d be great, >>since >> >> having more than one node running the same pipeline guarantees the >> >> pipeline is never offline. >> >> >> >> -Ali >> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote: >> >> >> >> >If I'm not mistaken, then the scheduler has already a preference to >> >>spread >> >> >independent pipelines out across the cluster. At least he uses a >>queue >> >>of >> >> >instances from which it pops the first element if it allocates a new >> >>slot. >> >> >This instance is then appended to the queue again, if it has some >> >> >resources >> >> >(slots) left. >> >> > >> >> >I would assume that you have a shuffle operation involved in your >>job >> >>such >> >> >that it makes sense for the scheduler to deploy all pipelines to the >> >>same >> >> >machine. >> >> > >> >> >Cheers, >> >> >Till >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote: >> >> > >> >> >> Slots are like "resource groups" which execute entire pipelines. >>They >> >> >> frequently have more than one operator. >> >> >> >> >> >> What you can try as a workaround is decrease the number of slots >>per >> >> >> machine to cause the operators to be spread across more machines. >> >> >> >> >> >> If this is a crucial issue for your use case, it should be simple >>to >> >> >>add a >> >> >> "preference to spread out" to the scheduler... >> >> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]> >> >> >>wrote: >> >> >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make >> >>sure >> >> >>the >> >> >> > parallel instances of the task are distributed across the >>cluster. >> >> >>When I >> >> >> > run my flink job with a parallelism of 16, all the parallel >>tasks >> >>are >> >> >> > assigned to the first task manager. >> >> >> > >> >> >> > - Ali >> >> >> > >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: >> >> >> > >> >> >> > > >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> >> >> wrote: >> >> >> > >> Do the parallel instances of each task get distributed across >> >>the >> >> >> > >>cluster or is it possible that they all run on the same node? >> >> >> > > >> >> >> > >Yes, slots are requested from all nodes of the cluster. But >>keep >> >>in >> >> >>mind >> >> >> > >that multiple tasks (forming a local pipeline) can be >>scheduled to >> >> >>the >> >> >> > >same slot (1 slot can hold many tasks). >> >> >> > > >> >> >> > >Have you seen this? >> >> >> > > >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo >> >> >>b >> >> >> > >_scheduling.html >> >> >> > > >> >> >> > >> If they can all run on the same node, what happens when that >> >>node >> >> >> > >>crashes? Does the job manager recreate them using the >>remaining >> >>open >> >> >> > >>slots? >> >> >> > > >> >> >> > >What happens: The job manager tries to restart the program with >> >>the >> >> >>same >> >> >> > >parallelism. Thus if you have enough free slots available in >>your >> >> >> > >cluster, this works smoothly (so yes, the remaining/available >> >>slots >> >> >>are >> >> >> > >used) >> >> >> > > >> >> >> > >With a YARN cluster the task manager containers are restarted >> >> >> > >automatically. In standalone mode, you have to take care of >>this >> >> >> yourself. >> >> >> > > >> >> >> > > >> >> >> > >Does this help? >> >> >> > > >> >> >> > > Ufuk >> >> >> > > >> >> >> > >> >> >> > >> >> >> >> >> >> >> >> |
Hi Ali!
Seems like the Google Doc has restricted access, I tells me I have no permission to view it... Stephan On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]> wrote: > Hi Stephan, > > Here’s a link to the screenshot I tried to attach earlier: > > https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 > > It looks to me like the distribution is fairly skewed across the nodes, > even though they’re executing the same pipeline. > > Thanks, > Ali > > > On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote: > > >Hi! > > > >The parallel socket source looks good. > >I think you forgot to attach the screenshot, or the mailing list dropped > >the attachment... > > > >Not sure if I can diagnose that without more details. The sources all do > >the same. Assuming that the server distributes the data evenly across all > >connected sockets, and that the network bandwidth ends up being divided in > >a fair way, all pipelines should run be similarly "eager". > > > >Greetings, > >Stephan > > > > > >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> wrote: > > > >> Hi Stephan, > >> > >> That was my original understanding, until I realized that I was not > >>using > >> a parallel socket source. I had a custom source that extended > >> SourceFunction which always runs with parallelism = 1. I looked through > >> the API and found the ParallelSourceFunction interface so I implemented > >> that and voila, now all 3 nodes in the cluster are actually receiving > >> traffic on socket connections. > >> > >> Now that I’m running it successfully end to end, I’m trying to improve > >>the > >> performance. Can you take a look at the attached screen shot and tell me > >> if the distribution of work amongst the pipelines is normal? I feel like > >> some pipelines are lot lazier than others, even though the cluster nodes > >> are exactly the same. > >> > >> By the way, here’s the class I wrote. It would be useful to have this > >> available in Flink distro: > >> > >> public class ParallelSocketSource implements > >> ParallelSourceFunction<String> { > >> > >> private static final long serialVersionUID = > >>-271094428915640892L; > >> private static final Logger LOG = > >> LoggerFactory.getLogger(ParallelSocketSource.class); > >> > >> private volatile boolean running = true; > >> private String host; > >> private int port; > >> > >> public ParallelSocketSource(String host, int port) { > >> this.host = host; > >> this.port = port; > >> } > >> > >> @Override > >> public void run(SourceContext<String> ctx) throws Exception { > >> try (Socket socket = new Socket(host, port); > >> BufferedReader reader = new BufferedReader(new > >> InputStreamReader(socket.getInputStream()))) { > >> String line = null; > >> while(running && ((line = reader.readLine()) != > >> null)) { > >> ctx.collect(line); > >> } > >> } catch(IOException ex) { > >> LOG.error("error reading from socket", ex); > >> } > >> } > >> > >> @Override > >> public void cancel() { > >> running = false; > >> } > >> } > >> > >> Regards, > >> Ali > >> > >> > >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote: > >> > >> >Hi Ali! > >> > > >> >In the case you have, the sequence of source-map-filter ... forms a > >> >pipeline. > >> > > >> >You mentioned that you set the parallelism to 16, so there should be 16 > >> >pipelines. These pipelines should be completely independent. > >> > > >> >Looking at the way the scheduler is implemented, independent pipelines > >> >should be spread across machines. But when you execute that in > >>parallel, > >> >you say all 16 pipelines end up on the same machine? > >> > > >> >Can you share with us the rough code of your program? Or a Screenshot > >>from > >> >the runtime dashboard that shows the program graph? > >> > > >> > > >> >If your cluster is basically for that one job only, you could try and > >>set > >> >the number of slots to 4 for each machine. Then you have 16 slots in > >>total > >> >and each node would run one of the 16 pipelines. > >> > > >> > > >> >Greetings, > >> >Stephan > >> > > >> > > >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> > >>wrote: > >> > > >> >> There is no shuffle operation in my flow. Mine actually looks like > >>this: > >> >> > >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> > >>Map > >> >>-> > >> >> Map, Filter) > >> >> > >> >> > >> >> Maybe it’s treating this whole flow as one pipeline and assigning it > >>to > >> >>a > >> >> slot. What I really wanted was to have the custom source I built to > >>have > >> >> running instances on all nodes. I’m not really sure if that’s the > >>right > >> >> approach, but if we could add this as a feature that’d be great, > >>since > >> >> having more than one node running the same pipeline guarantees the > >> >> pipeline is never offline. > >> >> > >> >> -Ali > >> >> > >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> > wrote: > >> >> > >> >> >If I'm not mistaken, then the scheduler has already a preference to > >> >>spread > >> >> >independent pipelines out across the cluster. At least he uses a > >>queue > >> >>of > >> >> >instances from which it pops the first element if it allocates a new > >> >>slot. > >> >> >This instance is then appended to the queue again, if it has some > >> >> >resources > >> >> >(slots) left. > >> >> > > >> >> >I would assume that you have a shuffle operation involved in your > >>job > >> >>such > >> >> >that it makes sense for the scheduler to deploy all pipelines to the > >> >>same > >> >> >machine. > >> >> > > >> >> >Cheers, > >> >> >Till > >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote: > >> >> > > >> >> >> Slots are like "resource groups" which execute entire pipelines. > >>They > >> >> >> frequently have more than one operator. > >> >> >> > >> >> >> What you can try as a workaround is decrease the number of slots > >>per > >> >> >> machine to cause the operators to be spread across more machines. > >> >> >> > >> >> >> If this is a crucial issue for your use case, it should be simple > >>to > >> >> >>add a > >> >> >> "preference to spread out" to the scheduler... > >> >> >> > >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email] > > > >> >> >>wrote: > >> >> >> > >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make > >> >>sure > >> >> >>the > >> >> >> > parallel instances of the task are distributed across the > >>cluster. > >> >> >>When I > >> >> >> > run my flink job with a parallelism of 16, all the parallel > >>tasks > >> >>are > >> >> >> > assigned to the first task manager. > >> >> >> > > >> >> >> > - Ali > >> >> >> > > >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: > >> >> >> > > >> >> >> > > > >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> > >> >> wrote: > >> >> >> > >> Do the parallel instances of each task get distributed across > >> >>the > >> >> >> > >>cluster or is it possible that they all run on the same node? > >> >> >> > > > >> >> >> > >Yes, slots are requested from all nodes of the cluster. But > >>keep > >> >>in > >> >> >>mind > >> >> >> > >that multiple tasks (forming a local pipeline) can be > >>scheduled to > >> >> >>the > >> >> >> > >same slot (1 slot can hold many tasks). > >> >> >> > > > >> >> >> > >Have you seen this? > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo > >> >> >>b > >> >> >> > >_scheduling.html > >> >> >> > > > >> >> >> > >> If they can all run on the same node, what happens when that > >> >>node > >> >> >> > >>crashes? Does the job manager recreate them using the > >>remaining > >> >>open > >> >> >> > >>slots? > >> >> >> > > > >> >> >> > >What happens: The job manager tries to restart the program with > >> >>the > >> >> >>same > >> >> >> > >parallelism. Thus if you have enough free slots available in > >>your > >> >> >> > >cluster, this works smoothly (so yes, the remaining/available > >> >>slots > >> >> >>are > >> >> >> > >used) > >> >> >> > > > >> >> >> > >With a YARN cluster the task manager containers are restarted > >> >> >> > >automatically. In standalone mode, you have to take care of > >>this > >> >> >> yourself. > >> >> >> > > > >> >> >> > > > >> >> >> > >Does this help? > >> >> >> > > > >> >> >> > > Ufuk > >> >> >> > > > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> >> > >> > > |
Hi Stephan,
I got a request to share the image with someone and I assume it was you. You should be able to see it now. This seems to be the main issue I have at this time. I've tried running the job on the cluster with a parallelism of 16, 24, 36, and even went up to 48. I see all the parallel pipelines working for a bit and then some of them just stop, I’m not sure if they’re stuck or not. Here’s another screenshot: http://postimg.org/image/gr6ogxqjj/ Two things you’ll notice: 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing anything at one point and only 192.168.200.173 is doing all the work. 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end time even though the job should be finished (the screenshot was taken after the source was closed). I’m not sure if this helps or not, but here are some properties from the flink-conf.yaml: jobmanager.heap.mb: 8192 taskmanager.heap.mb: 49152 taskmanager.numberOfTaskSlots: 16 parallelism.default: 1 state.backend: filesystem state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints taskmanager.network.numberOfBuffers: 3072 recovery.mode: zookeeper recovery.zookeeper.quorum: 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181 recovery.zookeeper.storageDir: file:///tmp/zk-recovery recovery.zookeeper.path.root: /opt/flink-0.10.0 I appreciate all the help. Thanks, Ali On 2015-12-10, 10:16 AM, "Stephan Ewen" <[hidden email]> wrote: >Hi Ali! > >Seems like the Google Doc has restricted access, I tells me I have no >permission to view it... > >Stephan > > >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]> wrote: > >> Hi Stephan, >> >> Here’s a link to the screenshot I tried to attach earlier: >> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 >> >> It looks to me like the distribution is fairly skewed across the nodes, >> even though they’re executing the same pipeline. >> >> Thanks, >> Ali >> >> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote: >> >> >Hi! >> > >> >The parallel socket source looks good. >> >I think you forgot to attach the screenshot, or the mailing list >>dropped >> >the attachment... >> > >> >Not sure if I can diagnose that without more details. The sources all >>do >> >the same. Assuming that the server distributes the data evenly across >>all >> >connected sockets, and that the network bandwidth ends up being >>divided in >> >a fair way, all pipelines should run be similarly "eager". >> > >> >Greetings, >> >Stephan >> > >> > >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> >>wrote: >> > >> >> Hi Stephan, >> >> >> >> That was my original understanding, until I realized that I was not >> >>using >> >> a parallel socket source. I had a custom source that extended >> >> SourceFunction which always runs with parallelism = 1. I looked >>through >> >> the API and found the ParallelSourceFunction interface so I >>implemented >> >> that and voila, now all 3 nodes in the cluster are actually receiving >> >> traffic on socket connections. >> >> >> >> Now that I’m running it successfully end to end, I’m trying to >>improve >> >>the >> >> performance. Can you take a look at the attached screen shot and >>tell me >> >> if the distribution of work amongst the pipelines is normal? I feel >>like >> >> some pipelines are lot lazier than others, even though the cluster >>nodes >> >> are exactly the same. >> >> >> >> By the way, here’s the class I wrote. It would be useful to have this >> >> available in Flink distro: >> >> >> >> public class ParallelSocketSource implements >> >> ParallelSourceFunction<String> { >> >> >> >> private static final long serialVersionUID = >> >>-271094428915640892L; >> >> private static final Logger LOG = >> >> LoggerFactory.getLogger(ParallelSocketSource.class); >> >> >> >> private volatile boolean running = true; >> >> private String host; >> >> private int port; >> >> >> >> public ParallelSocketSource(String host, int port) { >> >> this.host = host; >> >> this.port = port; >> >> } >> >> >> >> @Override >> >> public void run(SourceContext<String> ctx) throws Exception { >> >> try (Socket socket = new Socket(host, port); >> >> BufferedReader reader = new >>BufferedReader(new >> >> InputStreamReader(socket.getInputStream()))) { >> >> String line = null; >> >> while(running && ((line = reader.readLine()) >>!= >> >> null)) { >> >> ctx.collect(line); >> >> } >> >> } catch(IOException ex) { >> >> LOG.error("error reading from socket", ex); >> >> } >> >> } >> >> >> >> @Override >> >> public void cancel() { >> >> running = false; >> >> } >> >> } >> >> >> >> Regards, >> >> Ali >> >> >> >> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote: >> >> >> >> >Hi Ali! >> >> > >> >> >In the case you have, the sequence of source-map-filter ... forms a >> >> >pipeline. >> >> > >> >> >You mentioned that you set the parallelism to 16, so there should >>be 16 >> >> >pipelines. These pipelines should be completely independent. >> >> > >> >> >Looking at the way the scheduler is implemented, independent >>pipelines >> >> >should be spread across machines. But when you execute that in >> >>parallel, >> >> >you say all 16 pipelines end up on the same machine? >> >> > >> >> >Can you share with us the rough code of your program? Or a >>Screenshot >> >>from >> >> >the runtime dashboard that shows the program graph? >> >> > >> >> > >> >> >If your cluster is basically for that one job only, you could try >>and >> >>set >> >> >the number of slots to 4 for each machine. Then you have 16 slots in >> >>total >> >> >and each node would run one of the 16 pipelines. >> >> > >> >> > >> >> >Greetings, >> >> >Stephan >> >> > >> >> > >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> >> >>wrote: >> >> > >> >> >> There is no shuffle operation in my flow. Mine actually looks like >> >>this: >> >> >> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> >> >>Map >> >> >>-> >> >> >> Map, Filter) >> >> >> >> >> >> >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning >>it >> >>to >> >> >>a >> >> >> slot. What I really wanted was to have the custom source I built >>to >> >>have >> >> >> running instances on all nodes. I’m not really sure if that’s the >> >>right >> >> >> approach, but if we could add this as a feature that’d be great, >> >>since >> >> >> having more than one node running the same pipeline guarantees the >> >> >> pipeline is never offline. >> >> >> >> >> >> -Ali >> >> >> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> >> wrote: >> >> >> >> >> >> >If I'm not mistaken, then the scheduler has already a preference >>to >> >> >>spread >> >> >> >independent pipelines out across the cluster. At least he uses a >> >>queue >> >> >>of >> >> >> >instances from which it pops the first element if it allocates a >>new >> >> >>slot. >> >> >> >This instance is then appended to the queue again, if it has some >> >> >> >resources >> >> >> >(slots) left. >> >> >> > >> >> >> >I would assume that you have a shuffle operation involved in your >> >>job >> >> >>such >> >> >> >that it makes sense for the scheduler to deploy all pipelines to >>the >> >> >>same >> >> >> >machine. >> >> >> > >> >> >> >Cheers, >> >> >> >Till >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote: >> >> >> > >> >> >> >> Slots are like "resource groups" which execute entire >>pipelines. >> >>They >> >> >> >> frequently have more than one operator. >> >> >> >> >> >> >> >> What you can try as a workaround is decrease the number of >>slots >> >>per >> >> >> >> machine to cause the operators to be spread across more >>machines. >> >> >> >> >> >> >> >> If this is a crucial issue for your use case, it should be >>simple >> >>to >> >> >> >>add a >> >> >> >> "preference to spread out" to the scheduler... >> >> >> >> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali >><[hidden email] >> > >> >> >> >>wrote: >> >> >> >> >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. >>Make >> >> >>sure >> >> >> >>the >> >> >> >> > parallel instances of the task are distributed across the >> >>cluster. >> >> >> >>When I >> >> >> >> > run my flink job with a parallelism of 16, all the parallel >> >>tasks >> >> >>are >> >> >> >> > assigned to the first task manager. >> >> >> >> > >> >> >> >> > - Ali >> >> >> >> > >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote: >> >> >> >> > >> >> >> >> > > >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali >><[hidden email]> >> >> >> wrote: >> >> >> >> > >> Do the parallel instances of each task get distributed >>across >> >> >>the >> >> >> >> > >>cluster or is it possible that they all run on the same >>node? >> >> >> >> > > >> >> >> >> > >Yes, slots are requested from all nodes of the cluster. But >> >>keep >> >> >>in >> >> >> >>mind >> >> >> >> > >that multiple tasks (forming a local pipeline) can be >> >>scheduled to >> >> >> >>the >> >> >> >> > >same slot (1 slot can hold many tasks). >> >> >> >> > > >> >> >> >> > >Have you seen this? >> >> >> >> > > >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo >> >> >> >>b >> >> >> >> > >_scheduling.html >> >> >> >> > > >> >> >> >> > >> If they can all run on the same node, what happens when >>that >> >> >>node >> >> >> >> > >>crashes? Does the job manager recreate them using the >> >>remaining >> >> >>open >> >> >> >> > >>slots? >> >> >> >> > > >> >> >> >> > >What happens: The job manager tries to restart the program >>with >> >> >>the >> >> >> >>same >> >> >> >> > >parallelism. Thus if you have enough free slots available in >> >>your >> >> >> >> > >cluster, this works smoothly (so yes, the >>remaining/available >> >> >>slots >> >> >> >>are >> >> >> >> > >used) >> >> >> >> > > >> >> >> >> > >With a YARN cluster the task manager containers are >>restarted >> >> >> >> > >automatically. In standalone mode, you have to take care of >> >>this >> >> >> >> yourself. >> >> >> >> > > >> >> >> >> > > >> >> >> >> > >Does this help? >> >> >> >> > > >> >> >> >> > > Ufuk >> >> >> >> > > >> >> >> >> > >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> |
Hi Ali!
I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not make progress, even do not recognize the end-of-stream point. I expect that the streams on 192.168.200.174 and 192.168.200.175 are back-pressured to a stand-still. Since no network is involved, the reason for the back pressure are probably the sinks. What kind of data sink are you using (in the addSink()) function? Can you check if that one starts to fully block on machines 192.168.200.174 and 192.168.200.175 ? Greetings, Stephan On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <[hidden email]> wrote: > Hi Stephan, > > I got a request to share the image with someone and I assume it was you. > You should be able to see it now. This seems to be the main issue I have > at this time. I've tried running the job on the cluster with a parallelism > of 16, 24, 36, and even went up to 48. I see all the parallel pipelines > working for a bit and then some of them just stop, I’m not sure if they’re > stuck or not. Here’s another screenshot: > http://postimg.org/image/gr6ogxqjj/ > > Two things you’ll notice: > 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing > anything at one point and only 192.168.200.173 is doing all the work. > 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end time > even though the job should be finished (the screenshot was taken after the > source was closed). > > I’m not sure if this helps or not, but here are some properties from the > flink-conf.yaml: > > jobmanager.heap.mb: 8192 > taskmanager.heap.mb: 49152 > taskmanager.numberOfTaskSlots: 16 > parallelism.default: 1 > > state.backend: filesystem > state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints > > taskmanager.network.numberOfBuffers: 3072 > > recovery.mode: zookeeper > recovery.zookeeper.quorum: > 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181 > recovery.zookeeper.storageDir: file:///tmp/zk-recovery > recovery.zookeeper.path.root: /opt/flink-0.10.0 > > I appreciate all the help. > > > Thanks, > Ali > > > On 2015-12-10, 10:16 AM, "Stephan Ewen" <[hidden email]> wrote: > > >Hi Ali! > > > >Seems like the Google Doc has restricted access, I tells me I have no > >permission to view it... > > > >Stephan > > > > > >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]> wrote: > > > >> Hi Stephan, > >> > >> Here’s a link to the screenshot I tried to attach earlier: > >> > >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 > >> > >> It looks to me like the distribution is fairly skewed across the nodes, > >> even though they’re executing the same pipeline. > >> > >> Thanks, > >> Ali > >> > >> > >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote: > >> > >> >Hi! > >> > > >> >The parallel socket source looks good. > >> >I think you forgot to attach the screenshot, or the mailing list > >>dropped > >> >the attachment... > >> > > >> >Not sure if I can diagnose that without more details. The sources all > >>do > >> >the same. Assuming that the server distributes the data evenly across > >>all > >> >connected sockets, and that the network bandwidth ends up being > >>divided in > >> >a fair way, all pipelines should run be similarly "eager". > >> > > >> >Greetings, > >> >Stephan > >> > > >> > > >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> > >>wrote: > >> > > >> >> Hi Stephan, > >> >> > >> >> That was my original understanding, until I realized that I was not > >> >>using > >> >> a parallel socket source. I had a custom source that extended > >> >> SourceFunction which always runs with parallelism = 1. I looked > >>through > >> >> the API and found the ParallelSourceFunction interface so I > >>implemented > >> >> that and voila, now all 3 nodes in the cluster are actually receiving > >> >> traffic on socket connections. > >> >> > >> >> Now that I’m running it successfully end to end, I’m trying to > >>improve > >> >>the > >> >> performance. Can you take a look at the attached screen shot and > >>tell me > >> >> if the distribution of work amongst the pipelines is normal? I feel > >>like > >> >> some pipelines are lot lazier than others, even though the cluster > >>nodes > >> >> are exactly the same. > >> >> > >> >> By the way, here’s the class I wrote. It would be useful to have this > >> >> available in Flink distro: > >> >> > >> >> public class ParallelSocketSource implements > >> >> ParallelSourceFunction<String> { > >> >> > >> >> private static final long serialVersionUID = > >> >>-271094428915640892L; > >> >> private static final Logger LOG = > >> >> LoggerFactory.getLogger(ParallelSocketSource.class); > >> >> > >> >> private volatile boolean running = true; > >> >> private String host; > >> >> private int port; > >> >> > >> >> public ParallelSocketSource(String host, int port) { > >> >> this.host = host; > >> >> this.port = port; > >> >> } > >> >> > >> >> @Override > >> >> public void run(SourceContext<String> ctx) throws Exception { > >> >> try (Socket socket = new Socket(host, port); > >> >> BufferedReader reader = new > >>BufferedReader(new > >> >> InputStreamReader(socket.getInputStream()))) { > >> >> String line = null; > >> >> while(running && ((line = reader.readLine()) > >>!= > >> >> null)) { > >> >> ctx.collect(line); > >> >> } > >> >> } catch(IOException ex) { > >> >> LOG.error("error reading from socket", ex); > >> >> } > >> >> } > >> >> > >> >> @Override > >> >> public void cancel() { > >> >> running = false; > >> >> } > >> >> } > >> >> > >> >> Regards, > >> >> Ali > >> >> > >> >> > >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote: > >> >> > >> >> >Hi Ali! > >> >> > > >> >> >In the case you have, the sequence of source-map-filter ... forms a > >> >> >pipeline. > >> >> > > >> >> >You mentioned that you set the parallelism to 16, so there should > >>be 16 > >> >> >pipelines. These pipelines should be completely independent. > >> >> > > >> >> >Looking at the way the scheduler is implemented, independent > >>pipelines > >> >> >should be spread across machines. But when you execute that in > >> >>parallel, > >> >> >you say all 16 pipelines end up on the same machine? > >> >> > > >> >> >Can you share with us the rough code of your program? Or a > >>Screenshot > >> >>from > >> >> >the runtime dashboard that shows the program graph? > >> >> > > >> >> > > >> >> >If your cluster is basically for that one job only, you could try > >>and > >> >>set > >> >> >the number of slots to 4 for each machine. Then you have 16 slots in > >> >>total > >> >> >and each node would run one of the 16 pipelines. > >> >> > > >> >> > > >> >> >Greetings, > >> >> >Stephan > >> >> > > >> >> > > >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> > >> >>wrote: > >> >> > > >> >> >> There is no shuffle operation in my flow. Mine actually looks like > >> >>this: > >> >> >> > >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> > >> >>Map > >> >> >>-> > >> >> >> Map, Filter) > >> >> >> > >> >> >> > >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning > >>it > >> >>to > >> >> >>a > >> >> >> slot. What I really wanted was to have the custom source I built > >>to > >> >>have > >> >> >> running instances on all nodes. I’m not really sure if that’s the > >> >>right > >> >> >> approach, but if we could add this as a feature that’d be great, > >> >>since > >> >> >> having more than one node running the same pipeline guarantees the > >> >> >> pipeline is never offline. > >> >> >> > >> >> >> -Ali > >> >> >> > >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> > >> wrote: > >> >> >> > >> >> >> >If I'm not mistaken, then the scheduler has already a preference > >>to > >> >> >>spread > >> >> >> >independent pipelines out across the cluster. At least he uses a > >> >>queue > >> >> >>of > >> >> >> >instances from which it pops the first element if it allocates a > >>new > >> >> >>slot. > >> >> >> >This instance is then appended to the queue again, if it has some > >> >> >> >resources > >> >> >> >(slots) left. > >> >> >> > > >> >> >> >I would assume that you have a shuffle operation involved in your > >> >>job > >> >> >>such > >> >> >> >that it makes sense for the scheduler to deploy all pipelines to > >>the > >> >> >>same > >> >> >> >machine. > >> >> >> > > >> >> >> >Cheers, > >> >> >> >Till > >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote: > >> >> >> > > >> >> >> >> Slots are like "resource groups" which execute entire > >>pipelines. > >> >>They > >> >> >> >> frequently have more than one operator. > >> >> >> >> > >> >> >> >> What you can try as a workaround is decrease the number of > >>slots > >> >>per > >> >> >> >> machine to cause the operators to be spread across more > >>machines. > >> >> >> >> > >> >> >> >> If this is a crucial issue for your use case, it should be > >>simple > >> >>to > >> >> >> >>add a > >> >> >> >> "preference to spread out" to the scheduler... > >> >> >> >> > >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali > >><[hidden email] > >> > > >> >> >> >>wrote: > >> >> >> >> > >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. > >>Make > >> >> >>sure > >> >> >> >>the > >> >> >> >> > parallel instances of the task are distributed across the > >> >>cluster. > >> >> >> >>When I > >> >> >> >> > run my flink job with a parallelism of 16, all the parallel > >> >>tasks > >> >> >>are > >> >> >> >> > assigned to the first task manager. > >> >> >> >> > > >> >> >> >> > - Ali > >> >> >> >> > > >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> > wrote: > >> >> >> >> > > >> >> >> >> > > > >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali > >><[hidden email]> > >> >> >> wrote: > >> >> >> >> > >> Do the parallel instances of each task get distributed > >>across > >> >> >>the > >> >> >> >> > >>cluster or is it possible that they all run on the same > >>node? > >> >> >> >> > > > >> >> >> >> > >Yes, slots are requested from all nodes of the cluster. But > >> >>keep > >> >> >>in > >> >> >> >>mind > >> >> >> >> > >that multiple tasks (forming a local pipeline) can be > >> >>scheduled to > >> >> >> >>the > >> >> >> >> > >same slot (1 slot can hold many tasks). > >> >> >> >> > > > >> >> >> >> > >Have you seen this? > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > >> >> >> >> > >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo > >> >> >> >>b > >> >> >> >> > >_scheduling.html > >> >> >> >> > > > >> >> >> >> > >> If they can all run on the same node, what happens when > >>that > >> >> >>node > >> >> >> >> > >>crashes? Does the job manager recreate them using the > >> >>remaining > >> >> >>open > >> >> >> >> > >>slots? > >> >> >> >> > > > >> >> >> >> > >What happens: The job manager tries to restart the program > >>with > >> >> >>the > >> >> >> >>same > >> >> >> >> > >parallelism. Thus if you have enough free slots available in > >> >>your > >> >> >> >> > >cluster, this works smoothly (so yes, the > >>remaining/available > >> >> >>slots > >> >> >> >>are > >> >> >> >> > >used) > >> >> >> >> > > > >> >> >> >> > >With a YARN cluster the task manager containers are > >>restarted > >> >> >> >> > >automatically. In standalone mode, you have to take care of > >> >>this > >> >> >> >> yourself. > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > >Does this help? > >> >> >> >> > > > >> >> >> >> > > Ufuk > >> >> >> >> > > > >> >> >> >> > > >> >> >> >> > > >> >> >> >> > >> >> >> > >> >> >> > >> >> > >> > >> > > |
Hi Stephan,
I’m using DataStream.writeAsText(String path, WriteMode writemode) for my sink. The data is written to disk and there’s plenty of space available. I looked deeper into the logs and found out that the jobs on 174 and 175 are not actually stuck, but they’re moving extremely slowly, This is an excerpt from the task manager log on 175: 03:44:43,307 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 254 to read a 1000 lines 03:44:43,315 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 254 to read a 1000 lines 03:46:09,360 INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.200.173/192.168.200.173:2181. Will not attempt to au thenticate using SASL (unknown error) 03:46:09,361 INFO org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 86223ms for sessionid 0x25181a544860091, closing socket connection and attempting reconnect 03:46:09,362 WARN org.apache.flink.shaded.org.apache.curator.ConnectionState - Connection attempt unsuccessful after 86221 (greater than max timeout of 60000). Resetting conne ction and trying again with a new connection. 03:46:09,391 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86222 to read a 1000 lines 03:46:09,394 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86243 to read a 1000 lines 03:46:09,439 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86224 to read a 1000 lines 03:46:09,445 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86217 to read a 1000 lines 03:46:09,462 INFO org.apache.zookeeper.ZooKeeper - Session: 0x25181a544860091 closed 03:46:09,462 INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.200.173:2181,192.168.200.174:2181,192.168.20 0.175:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@550a1967 03:46:09,462 INFO org.apache.zookeeper.ClientCnxn - EventThread shut down 03:46:09,463 INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 192.168.200.174/192.168.200.174:2181. Will not attempt to au thenticate using SASL (unknown error) 03:46:09,463 ERROR org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl - Background operation retry gave up org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.performBackgroundOperation(CuratorFrameworkImpl.java:826) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.access$300(CuratorFrameworkImpl.java:62) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl$4.call(CuratorFrameworkImpl.java:257) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 617) at java.lang.Thread.run(Thread.java:745) 03:46:09,464 ERROR org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl - Background retry gave up org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.performBackgroundOperation(CuratorFrameworkImpl.java:809) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl.access$300(CuratorFrameworkImpl.java:62) at org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI mpl$4.call(CuratorFrameworkImpl.java:257) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 617) at java.lang.Thread.run(Thread.java:745) 03:46:09,464 INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 192.168.200.174/192.168.200.174:2181, initiating session 03:46:09,468 INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 192.168.200.174/192.168.200.174:2181, sessionid = 0x25181a544860094, negotiated timeout = 40000 03:46:09,469 INFO org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateM anager - State change: RECONNECTED 03:46:09,475 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86212 to read a 1000 lines 03:46:09,523 INFO com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took 86217 to read a 1000 lines You’ll notice that at some point it takes 254 milliseconds to process a 1000 lines of input, and then it jumps 86 seconds!! And I also see some zookeeper exceptions that lead me to believe that it’s a networking problem. I have 4 VMs running on 4 different hosts, and connected via a 10G NIC. Thanks, Ali On 2015-12-11, 11:23 AM, "Stephan Ewen" <[hidden email]> wrote: >Hi Ali! > >I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not >make progress, even do not recognize the end-of-stream point. > >I expect that the streams on 192.168.200.174 and 192.168.200.175 are >back-pressured to a stand-still. Since no network is involved, the reason >for the back pressure are probably the sinks. > >What kind of data sink are you using (in the addSink()) function? >Can you check if that one starts to fully block on machines >192.168.200.174 and 192.168.200.175 ? > >Greetings, >Stephan > > > >On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <[hidden email]> wrote: > >> Hi Stephan, >> >> I got a request to share the image with someone and I assume it was you. >> You should be able to see it now. This seems to be the main issue I have >> at this time. I've tried running the job on the cluster with a >>parallelism >> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines >> working for a bit and then some of them just stop, I’m not sure if >>they’re >> stuck or not. Here’s another screenshot: >> http://postimg.org/image/gr6ogxqjj/ >> >> Two things you’ll notice: >> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing >> anything at one point and only 192.168.200.173 is doing all the work. >> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end >>time >> even though the job should be finished (the screenshot was taken after >>the >> source was closed). >> >> I’m not sure if this helps or not, but here are some properties from the >> flink-conf.yaml: >> >> jobmanager.heap.mb: 8192 >> taskmanager.heap.mb: 49152 >> taskmanager.numberOfTaskSlots: 16 >> parallelism.default: 1 >> >> state.backend: filesystem >> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints >> >> taskmanager.network.numberOfBuffers: 3072 >> >> recovery.mode: zookeeper >> recovery.zookeeper.quorum: >> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181 >> recovery.zookeeper.storageDir: file:///tmp/zk-recovery >> recovery.zookeeper.path.root: /opt/flink-0.10.0 >> >> I appreciate all the help. >> >> >> Thanks, >> Ali >> >> >> On 2015-12-10, 10:16 AM, "Stephan Ewen" <[hidden email]> wrote: >> >> >Hi Ali! >> > >> >Seems like the Google Doc has restricted access, I tells me I have no >> >permission to view it... >> > >> >Stephan >> > >> > >> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]> >>wrote: >> > >> >> Hi Stephan, >> >> >> >> Here’s a link to the screenshot I tried to attach earlier: >> >> >> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 >> >> >> >> It looks to me like the distribution is fairly skewed across the >>nodes, >> >> even though they’re executing the same pipeline. >> >> >> >> Thanks, >> >> Ali >> >> >> >> >> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote: >> >> >> >> >Hi! >> >> > >> >> >The parallel socket source looks good. >> >> >I think you forgot to attach the screenshot, or the mailing list >> >>dropped >> >> >the attachment... >> >> > >> >> >Not sure if I can diagnose that without more details. The sources >>all >> >>do >> >> >the same. Assuming that the server distributes the data evenly >>across >> >>all >> >> >connected sockets, and that the network bandwidth ends up being >> >>divided in >> >> >a fair way, all pipelines should run be similarly "eager". >> >> > >> >> >Greetings, >> >> >Stephan >> >> > >> >> > >> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> >> >>wrote: >> >> > >> >> >> Hi Stephan, >> >> >> >> >> >> That was my original understanding, until I realized that I was >>not >> >> >>using >> >> >> a parallel socket source. I had a custom source that extended >> >> >> SourceFunction which always runs with parallelism = 1. I looked >> >>through >> >> >> the API and found the ParallelSourceFunction interface so I >> >>implemented >> >> >> that and voila, now all 3 nodes in the cluster are actually >>receiving >> >> >> traffic on socket connections. >> >> >> >> >> >> Now that I’m running it successfully end to end, I’m trying to >> >>improve >> >> >>the >> >> >> performance. Can you take a look at the attached screen shot and >> >>tell me >> >> >> if the distribution of work amongst the pipelines is normal? I >>feel >> >>like >> >> >> some pipelines are lot lazier than others, even though the cluster >> >>nodes >> >> >> are exactly the same. >> >> >> >> >> >> By the way, here’s the class I wrote. It would be useful to have >>this >> >> >> available in Flink distro: >> >> >> >> >> >> public class ParallelSocketSource implements >> >> >> ParallelSourceFunction<String> { >> >> >> >> >> >> private static final long serialVersionUID = >> >> >>-271094428915640892L; >> >> >> private static final Logger LOG = >> >> >> LoggerFactory.getLogger(ParallelSocketSource.class); >> >> >> >> >> >> private volatile boolean running = true; >> >> >> private String host; >> >> >> private int port; >> >> >> >> >> >> public ParallelSocketSource(String host, int port) { >> >> >> this.host = host; >> >> >> this.port = port; >> >> >> } >> >> >> >> >> >> @Override >> >> >> public void run(SourceContext<String> ctx) throws >>Exception { >> >> >> try (Socket socket = new Socket(host, port); >> >> >> BufferedReader reader = new >> >>BufferedReader(new >> >> >> InputStreamReader(socket.getInputStream()))) { >> >> >> String line = null; >> >> >> while(running && ((line = >>reader.readLine()) >> >>!= >> >> >> null)) { >> >> >> ctx.collect(line); >> >> >> } >> >> >> } catch(IOException ex) { >> >> >> LOG.error("error reading from socket", >>ex); >> >> >> } >> >> >> } >> >> >> >> >> >> @Override >> >> >> public void cancel() { >> >> >> running = false; >> >> >> } >> >> >> } >> >> >> >> >> >> Regards, >> >> >> Ali >> >> >> >> >> >> >> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote: >> >> >> >> >> >> >Hi Ali! >> >> >> > >> >> >> >In the case you have, the sequence of source-map-filter ... >>forms a >> >> >> >pipeline. >> >> >> > >> >> >> >You mentioned that you set the parallelism to 16, so there should >> >>be 16 >> >> >> >pipelines. These pipelines should be completely independent. >> >> >> > >> >> >> >Looking at the way the scheduler is implemented, independent >> >>pipelines >> >> >> >should be spread across machines. But when you execute that in >> >> >>parallel, >> >> >> >you say all 16 pipelines end up on the same machine? >> >> >> > >> >> >> >Can you share with us the rough code of your program? Or a >> >>Screenshot >> >> >>from >> >> >> >the runtime dashboard that shows the program graph? >> >> >> > >> >> >> > >> >> >> >If your cluster is basically for that one job only, you could try >> >>and >> >> >>set >> >> >> >the number of slots to 4 for each machine. Then you have 16 >>slots in >> >> >>total >> >> >> >and each node would run one of the 16 pipelines. >> >> >> > >> >> >> > >> >> >> >Greetings, >> >> >> >Stephan >> >> >> > >> >> >> > >> >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali >><[hidden email]> >> >> >>wrote: >> >> >> > >> >> >> >> There is no shuffle operation in my flow. Mine actually looks >>like >> >> >>this: >> >> >> >> >> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> >>Map -> >> >> >>Map >> >> >> >>-> >> >> >> >> Map, Filter) >> >> >> >> >> >> >> >> >> >> >> >> Maybe it’s treating this whole flow as one pipeline and >>assigning >> >>it >> >> >>to >> >> >> >>a >> >> >> >> slot. What I really wanted was to have the custom source I >>built >> >>to >> >> >>have >> >> >> >> running instances on all nodes. I’m not really sure if that’s >>the >> >> >>right >> >> >> >> approach, but if we could add this as a feature that’d be >>great, >> >> >>since >> >> >> >> having more than one node running the same pipeline guarantees >>the >> >> >> >> pipeline is never offline. >> >> >> >> >> >> >> >> -Ali >> >> >> >> >> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> >> >> wrote: >> >> >> >> >> >> >> >> >If I'm not mistaken, then the scheduler has already a >>preference >> >>to >> >> >> >>spread >> >> >> >> >independent pipelines out across the cluster. At least he >>uses a >> >> >>queue >> >> >> >>of >> >> >> >> >instances from which it pops the first element if it >>allocates a >> >>new >> >> >> >>slot. >> >> >> >> >This instance is then appended to the queue again, if it has >>some >> >> >> >> >resources >> >> >> >> >(slots) left. >> >> >> >> > >> >> >> >> >I would assume that you have a shuffle operation involved in >>your >> >> >>job >> >> >> >>such >> >> >> >> >that it makes sense for the scheduler to deploy all pipelines >>to >> >>the >> >> >> >>same >> >> >> >> >machine. >> >> >> >> > >> >> >> >> >Cheers, >> >> >> >> >Till >> >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> >>wrote: >> >> >> >> > >> >> >> >> >> Slots are like "resource groups" which execute entire >> >>pipelines. >> >> >>They >> >> >> >> >> frequently have more than one operator. >> >> >> >> >> >> >> >> >> >> What you can try as a workaround is decrease the number of >> >>slots >> >> >>per >> >> >> >> >> machine to cause the operators to be spread across more >> >>machines. >> >> >> >> >> >> >> >> >> >> If this is a crucial issue for your use case, it should be >> >>simple >> >> >>to >> >> >> >> >>add a >> >> >> >> >> "preference to spread out" to the scheduler... >> >> >> >> >> >> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali >> >><[hidden email] >> >> > >> >> >> >> >>wrote: >> >> >> >> >> >> >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. >> >>Make >> >> >> >>sure >> >> >> >> >>the >> >> >> >> >> > parallel instances of the task are distributed across the >> >> >>cluster. >> >> >> >> >>When I >> >> >> >> >> > run my flink job with a parallelism of 16, all the >>parallel >> >> >>tasks >> >> >> >>are >> >> >> >> >> > assigned to the first task manager. >> >> >> >> >> > >> >> >> >> >> > - Ali >> >> >> >> >> > >> >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> >> wrote: >> >> >> >> >> > >> >> >> >> >> > > >> >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali >> >><[hidden email]> >> >> >> >> wrote: >> >> >> >> >> > >> Do the parallel instances of each task get distributed >> >>across >> >> >> >>the >> >> >> >> >> > >>cluster or is it possible that they all run on the same >> >>node? >> >> >> >> >> > > >> >> >> >> >> > >Yes, slots are requested from all nodes of the cluster. >>But >> >> >>keep >> >> >> >>in >> >> >> >> >>mind >> >> >> >> >> > >that multiple tasks (forming a local pipeline) can be >> >> >>scheduled to >> >> >> >> >>the >> >> >> >> >> > >same slot (1 slot can hold many tasks). >> >> >> >> >> > > >> >> >> >> >> > >Have you seen this? >> >> >> >> >> > > >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo >> >> >> >> >>b >> >> >> >> >> > >_scheduling.html >> >> >> >> >> > > >> >> >> >> >> > >> If they can all run on the same node, what happens when >> >>that >> >> >> >>node >> >> >> >> >> > >>crashes? Does the job manager recreate them using the >> >> >>remaining >> >> >> >>open >> >> >> >> >> > >>slots? >> >> >> >> >> > > >> >> >> >> >> > >What happens: The job manager tries to restart the >>program >> >>with >> >> >> >>the >> >> >> >> >>same >> >> >> >> >> > >parallelism. Thus if you have enough free slots >>available in >> >> >>your >> >> >> >> >> > >cluster, this works smoothly (so yes, the >> >>remaining/available >> >> >> >>slots >> >> >> >> >>are >> >> >> >> >> > >used) >> >> >> >> >> > > >> >> >> >> >> > >With a YARN cluster the task manager containers are >> >>restarted >> >> >> >> >> > >automatically. In standalone mode, you have to take care >>of >> >> >>this >> >> >> >> >> yourself. >> >> >> >> >> > > >> >> >> >> >> > > >> >> >> >> >> > >Does this help? >> >> >> >> >> > > >> >> >> >> >> > > Ufuk >> >> >> >> >> > > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> |
Hi Stephan,
I figured it out. The problem was that the date/time was different on all 3 nodes. Zookeeper thought that it hadn’t heard from the other nodes for longer than the allowed period and dropped them, therefore causing the other two task managers in the cluster to fail. I synchronized the time between the 3 nodes and reran the test. It’s running very smoothly now. Thanks again for your help. -Ali On 2015-12-11, 12:03 PM, "Kashmar, Ali" <[hidden email]> wrote: >Hi Stephan, > >I’m using DataStream.writeAsText(String path, WriteMode writemode) for my >sink. The data is written to disk and there’s plenty of space available. > >I looked deeper into the logs and found out that the jobs on 174 and 175 >are not actually stuck, but they’re moving extremely slowly, This is an >excerpt from the task manager log on 175: > >03:44:43,307 INFO >com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took >254 to read a 1000 lines >03:44:43,315 INFO >com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took >254 to read a 1000 lines >03:46:09,360 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server >192.168.200.173/192.168.200.173:2181. Will not attempt to au >thenticate using SASL (unknown error) >03:46:09,361 INFO org.apache.zookeeper.ClientCnxn > - Client session timed out, have not heard from server in 86223ms >for sessionid 0x25181a544860091, > closing socket connection and attempting reconnect >03:46:09,362 WARN >org.apache.flink.shaded.org.apache.curator.ConnectionState - Connection >attempt unsuccessful after 86221 (greater than max timeout of 60000). >Resetting conne >ction and trying again with a new connection. >03:46:09,391 INFO >com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took >86222 to read a 1000 lines >03:46:09,394 INFO >com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took >86243 to read a 1000 lines >03:46:09,439 INFO >com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took >86224 to read a 1000 lines >03:46:09,445 INFO >com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took >86217 to read a 1000 lines >03:46:09,462 INFO org.apache.zookeeper.ZooKeeper > - Session: 0x25181a544860091 closed >03:46:09,462 INFO org.apache.zookeeper.ZooKeeper > - Initiating client connection, >connectString=192.168.200.173:2181,192.168.200.174:2181,192.168.20 >0.175:2181 sessionTimeout=60000 >watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@550a196 >7 >03:46:09,462 INFO org.apache.zookeeper.ClientCnxn > - EventThread shut down >03:46:09,463 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server >192.168.200.174/192.168.200.174:2181. Will not attempt to au >thenticate using SASL (unknown error) >03:46:09,463 ERROR >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl - Background operation retry gave up >org.apache.zookeeper.KeeperException$ConnectionLossException: >KeeperErrorCode = ConnectionLoss > at >org.apache.zookeeper.KeeperException.create(KeeperException.java:99) > at >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708) > at >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl.performBackgroundOperation(CuratorFrameworkImpl.java:826) > at >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792) > at >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl.access$300(CuratorFrameworkImpl.java:62) > at >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl$4.call(CuratorFrameworkImpl.java:257) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at >java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: >1 >142) > at >java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java >: >617) > at java.lang.Thread.run(Thread.java:745) >03:46:09,464 ERROR >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl - Background retry gave up >org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: >KeeperErrorCode = ConnectionLoss > at >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl.performBackgroundOperation(CuratorFrameworkImpl.java:809) > at >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792) > at >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl.access$300(CuratorFrameworkImpl.java:62) > at >org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework >I >mpl$4.call(CuratorFrameworkImpl.java:257) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at >java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: >1 >142) > at >java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java >: >617) > at java.lang.Thread.run(Thread.java:745) >03:46:09,464 INFO org.apache.zookeeper.ClientCnxn > - Socket connection established to >192.168.200.174/192.168.200.174:2181, initiating session >03:46:09,468 INFO org.apache.zookeeper.ClientCnxn > - Session establishment complete on server >192.168.200.174/192.168.200.174:2181, sessionid = 0x25181a544860094, >negotiated timeout = 40000 >03:46:09,469 INFO >org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionState >M >anager - State change: RECONNECTED >03:46:09,475 INFO >com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took >86212 to read a 1000 lines >03:46:09,523 INFO >com.emc.ngen.analytics.flink.source.ParallelSocketSource - It took >86217 to read a 1000 lines > > > >You’ll notice that at some point it takes 254 milliseconds to process a >1000 lines of input, and then it jumps 86 seconds!! And I also see some >zookeeper exceptions that lead me to believe that it’s a networking >problem. I have 4 VMs running on 4 different hosts, and connected via a >10G NIC. > >Thanks, >Ali > > >On 2015-12-11, 11:23 AM, "Stephan Ewen" <[hidden email]> wrote: > >>Hi Ali! >> >>I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not >>make progress, even do not recognize the end-of-stream point. >> >>I expect that the streams on 192.168.200.174 and 192.168.200.175 are >>back-pressured to a stand-still. Since no network is involved, the reason >>for the back pressure are probably the sinks. >> >>What kind of data sink are you using (in the addSink()) function? >>Can you check if that one starts to fully block on machines >>192.168.200.174 and 192.168.200.175 ? >> >>Greetings, >>Stephan >> >> >> >>On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <[hidden email]> >>wrote: >> >>> Hi Stephan, >>> >>> I got a request to share the image with someone and I assume it was >>>you. >>> You should be able to see it now. This seems to be the main issue I >>>have >>> at this time. I've tried running the job on the cluster with a >>>parallelism >>> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines >>> working for a bit and then some of them just stop, I’m not sure if >>>they’re >>> stuck or not. Here’s another screenshot: >>> http://postimg.org/image/gr6ogxqjj/ >>> >>> Two things you’ll notice: >>> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing >>> anything at one point and only 192.168.200.173 is doing all the work. >>> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end >>>time >>> even though the job should be finished (the screenshot was taken after >>>the >>> source was closed). >>> >>> I’m not sure if this helps or not, but here are some properties from >>>the >>> flink-conf.yaml: >>> >>> jobmanager.heap.mb: 8192 >>> taskmanager.heap.mb: 49152 >>> taskmanager.numberOfTaskSlots: 16 >>> parallelism.default: 1 >>> >>> state.backend: filesystem >>> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints >>> >>> taskmanager.network.numberOfBuffers: 3072 >>> >>> recovery.mode: zookeeper >>> recovery.zookeeper.quorum: >>> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181 >>> recovery.zookeeper.storageDir: file:///tmp/zk-recovery >>> recovery.zookeeper.path.root: /opt/flink-0.10.0 >>> >>> I appreciate all the help. >>> >>> >>> Thanks, >>> Ali >>> >>> >>> On 2015-12-10, 10:16 AM, "Stephan Ewen" <[hidden email]> wrote: >>> >>> >Hi Ali! >>> > >>> >Seems like the Google Doc has restricted access, I tells me I have no >>> >permission to view it... >>> > >>> >Stephan >>> > >>> > >>> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]> >>>wrote: >>> > >>> >> Hi Stephan, >>> >> >>> >> Here’s a link to the screenshot I tried to attach earlier: >>> >> >>> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 >>> >> >>> >> It looks to me like the distribution is fairly skewed across the >>>nodes, >>> >> even though they’re executing the same pipeline. >>> >> >>> >> Thanks, >>> >> Ali >>> >> >>> >> >>> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote: >>> >> >>> >> >Hi! >>> >> > >>> >> >The parallel socket source looks good. >>> >> >I think you forgot to attach the screenshot, or the mailing list >>> >>dropped >>> >> >the attachment... >>> >> > >>> >> >Not sure if I can diagnose that without more details. The sources >>>all >>> >>do >>> >> >the same. Assuming that the server distributes the data evenly >>>across >>> >>all >>> >> >connected sockets, and that the network bandwidth ends up being >>> >>divided in >>> >> >a fair way, all pipelines should run be similarly "eager". >>> >> > >>> >> >Greetings, >>> >> >Stephan >>> >> > >>> >> > >>> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> >>> >>wrote: >>> >> > >>> >> >> Hi Stephan, >>> >> >> >>> >> >> That was my original understanding, until I realized that I was >>>not >>> >> >>using >>> >> >> a parallel socket source. I had a custom source that extended >>> >> >> SourceFunction which always runs with parallelism = 1. I looked >>> >>through >>> >> >> the API and found the ParallelSourceFunction interface so I >>> >>implemented >>> >> >> that and voila, now all 3 nodes in the cluster are actually >>>receiving >>> >> >> traffic on socket connections. >>> >> >> >>> >> >> Now that I’m running it successfully end to end, I’m trying to >>> >>improve >>> >> >>the >>> >> >> performance. Can you take a look at the attached screen shot and >>> >>tell me >>> >> >> if the distribution of work amongst the pipelines is normal? I >>>feel >>> >>like >>> >> >> some pipelines are lot lazier than others, even though the >>>cluster >>> >>nodes >>> >> >> are exactly the same. >>> >> >> >>> >> >> By the way, here’s the class I wrote. It would be useful to have >>>this >>> >> >> available in Flink distro: >>> >> >> >>> >> >> public class ParallelSocketSource implements >>> >> >> ParallelSourceFunction<String> { >>> >> >> >>> >> >> private static final long serialVersionUID = >>> >> >>-271094428915640892L; >>> >> >> private static final Logger LOG = >>> >> >> LoggerFactory.getLogger(ParallelSocketSource.class); >>> >> >> >>> >> >> private volatile boolean running = true; >>> >> >> private String host; >>> >> >> private int port; >>> >> >> >>> >> >> public ParallelSocketSource(String host, int port) { >>> >> >> this.host = host; >>> >> >> this.port = port; >>> >> >> } >>> >> >> >>> >> >> @Override >>> >> >> public void run(SourceContext<String> ctx) throws >>>Exception { >>> >> >> try (Socket socket = new Socket(host, port); >>> >> >> BufferedReader reader = new >>> >>BufferedReader(new >>> >> >> InputStreamReader(socket.getInputStream()))) { >>> >> >> String line = null; >>> >> >> while(running && ((line = >>>reader.readLine()) >>> >>!= >>> >> >> null)) { >>> >> >> ctx.collect(line); >>> >> >> } >>> >> >> } catch(IOException ex) { >>> >> >> LOG.error("error reading from socket", >>>ex); >>> >> >> } >>> >> >> } >>> >> >> >>> >> >> @Override >>> >> >> public void cancel() { >>> >> >> running = false; >>> >> >> } >>> >> >> } >>> >> >> >>> >> >> Regards, >>> >> >> Ali >>> >> >> >>> >> >> >>> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote: >>> >> >> >>> >> >> >Hi Ali! >>> >> >> > >>> >> >> >In the case you have, the sequence of source-map-filter ... >>>forms a >>> >> >> >pipeline. >>> >> >> > >>> >> >> >You mentioned that you set the parallelism to 16, so there >>>should >>> >>be 16 >>> >> >> >pipelines. These pipelines should be completely independent. >>> >> >> > >>> >> >> >Looking at the way the scheduler is implemented, independent >>> >>pipelines >>> >> >> >should be spread across machines. But when you execute that in >>> >> >>parallel, >>> >> >> >you say all 16 pipelines end up on the same machine? >>> >> >> > >>> >> >> >Can you share with us the rough code of your program? Or a >>> >>Screenshot >>> >> >>from >>> >> >> >the runtime dashboard that shows the program graph? >>> >> >> > >>> >> >> > >>> >> >> >If your cluster is basically for that one job only, you could >>>try >>> >>and >>> >> >>set >>> >> >> >the number of slots to 4 for each machine. Then you have 16 >>>slots in >>> >> >>total >>> >> >> >and each node would run one of the 16 pipelines. >>> >> >> > >>> >> >> > >>> >> >> >Greetings, >>> >> >> >Stephan >>> >> >> > >>> >> >> > >>> >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali >>><[hidden email]> >>> >> >>wrote: >>> >> >> > >>> >> >> >> There is no shuffle operation in my flow. Mine actually looks >>>like >>> >> >>this: >>> >> >> >> >>> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> >>>Map -> >>> >> >>Map >>> >> >> >>-> >>> >> >> >> Map, Filter) >>> >> >> >> >>> >> >> >> >>> >> >> >> Maybe it’s treating this whole flow as one pipeline and >>>assigning >>> >>it >>> >> >>to >>> >> >> >>a >>> >> >> >> slot. What I really wanted was to have the custom source I >>>built >>> >>to >>> >> >>have >>> >> >> >> running instances on all nodes. I’m not really sure if that’s >>>the >>> >> >>right >>> >> >> >> approach, but if we could add this as a feature that’d be >>>great, >>> >> >>since >>> >> >> >> having more than one node running the same pipeline guarantees >>>the >>> >> >> >> pipeline is never offline. >>> >> >> >> >>> >> >> >> -Ali >>> >> >> >> >>> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> >>> >> wrote: >>> >> >> >> >>> >> >> >> >If I'm not mistaken, then the scheduler has already a >>>preference >>> >>to >>> >> >> >>spread >>> >> >> >> >independent pipelines out across the cluster. At least he >>>uses a >>> >> >>queue >>> >> >> >>of >>> >> >> >> >instances from which it pops the first element if it >>>allocates a >>> >>new >>> >> >> >>slot. >>> >> >> >> >This instance is then appended to the queue again, if it has >>>some >>> >> >> >> >resources >>> >> >> >> >(slots) left. >>> >> >> >> > >>> >> >> >> >I would assume that you have a shuffle operation involved in >>>your >>> >> >>job >>> >> >> >>such >>> >> >> >> >that it makes sense for the scheduler to deploy all pipelines >>>to >>> >>the >>> >> >> >>same >>> >> >> >> >machine. >>> >> >> >> > >>> >> >> >> >Cheers, >>> >> >> >> >Till >>> >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> >>>wrote: >>> >> >> >> > >>> >> >> >> >> Slots are like "resource groups" which execute entire >>> >>pipelines. >>> >> >>They >>> >> >> >> >> frequently have more than one operator. >>> >> >> >> >> >>> >> >> >> >> What you can try as a workaround is decrease the number of >>> >>slots >>> >> >>per >>> >> >> >> >> machine to cause the operators to be spread across more >>> >>machines. >>> >> >> >> >> >>> >> >> >> >> If this is a crucial issue for your use case, it should be >>> >>simple >>> >> >>to >>> >> >> >> >>add a >>> >> >> >> >> "preference to spread out" to the scheduler... >>> >> >> >> >> >>> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali >>> >><[hidden email] >>> >> > >>> >> >> >> >>wrote: >>> >> >> >> >> >>> >> >> >> >> > Is there a way to make a task cluster-parallelizable? >>>I.e. >>> >>Make >>> >> >> >>sure >>> >> >> >> >>the >>> >> >> >> >> > parallel instances of the task are distributed across the >>> >> >>cluster. >>> >> >> >> >>When I >>> >> >> >> >> > run my flink job with a parallelism of 16, all the >>>parallel >>> >> >>tasks >>> >> >> >>are >>> >> >> >> >> > assigned to the first task manager. >>> >> >> >> >> > >>> >> >> >> >> > - Ali >>> >> >> >> >> > >>> >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> >>> wrote: >>> >> >> >> >> > >>> >> >> >> >> > > >>> >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali >>> >><[hidden email]> >>> >> >> >> wrote: >>> >> >> >> >> > >> Do the parallel instances of each task get distributed >>> >>across >>> >> >> >>the >>> >> >> >> >> > >>cluster or is it possible that they all run on the same >>> >>node? >>> >> >> >> >> > > >>> >> >> >> >> > >Yes, slots are requested from all nodes of the cluster. >>>But >>> >> >>keep >>> >> >> >>in >>> >> >> >> >>mind >>> >> >> >> >> > >that multiple tasks (forming a local pipeline) can be >>> >> >>scheduled to >>> >> >> >> >>the >>> >> >> >> >> > >same slot (1 slot can hold many tasks). >>> >> >> >> >> > > >>> >> >> >> >> > >Have you seen this? >>> >> >> >> >> > > >>> >> >> >> >> > >>> >> >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >>> >> >> >>> >> >>> >> >>> >>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/j >>>o >>> >> >> >> >>b >>> >> >> >> >> > >_scheduling.html >>> >> >> >> >> > > >>> >> >> >> >> > >> If they can all run on the same node, what happens >>>when >>> >>that >>> >> >> >>node >>> >> >> >> >> > >>crashes? Does the job manager recreate them using the >>> >> >>remaining >>> >> >> >>open >>> >> >> >> >> > >>slots? >>> >> >> >> >> > > >>> >> >> >> >> > >What happens: The job manager tries to restart the >>>program >>> >>with >>> >> >> >>the >>> >> >> >> >>same >>> >> >> >> >> > >parallelism. Thus if you have enough free slots >>>available in >>> >> >>your >>> >> >> >> >> > >cluster, this works smoothly (so yes, the >>> >>remaining/available >>> >> >> >>slots >>> >> >> >> >>are >>> >> >> >> >> > >used) >>> >> >> >> >> > > >>> >> >> >> >> > >With a YARN cluster the task manager containers are >>> >>restarted >>> >> >> >> >> > >automatically. In standalone mode, you have to take care >>>of >>> >> >>this >>> >> >> >> >> yourself. >>> >> >> >> >> > > >>> >> >> >> >> > > >>> >> >> >> >> > >Does this help? >>> >> >> >> >> > > >>> >> >> >> >> > > Ufuk >>> >> >> >> >> > > >>> >> >> >> >> > >>> >> >> >> >> > >>> >> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >>> >> >>> >> >>> >>> > |
Free forum by Nabble | Edit this page |