Task Parallelism in a Cluster

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Task Parallelism in a Cluster

Kashmar, Ali
Hello,

I’m trying to wrap my head around task parallelism in a Flink cluster. Let’s say I have a cluster of 3 nodes, each node offering 16 task slots, so in total I’d have 48 slots for processing. Do the parallel instances of each task get distributed across the cluster or is it possible that they all run on the same node? If they can all run on the same node, what happens when that node crashes? Does the job manager recreate them using the remaining open slots?

Thanks,
Ali
Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Ufuk Celebi-2

> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote:
> Do the parallel instances of each task get distributed across the cluster or is it possible that they all run on the same node?

Yes, slots are requested from all nodes of the cluster. But keep in mind that multiple tasks (forming a local pipeline) can be scheduled to the same slot (1 slot can hold many tasks).

Have you seen this? https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/job_scheduling.html

> If they can all run on the same node, what happens when that node crashes? Does the job manager recreate them using the remaining open slots?

What happens: The job manager tries to restart the program with the same parallelism. Thus if you have enough free slots available in your cluster, this works smoothly (so yes, the remaining/available slots are used)

With a YARN cluster the task manager containers are restarted automatically. In standalone mode, you have to take care of this yourself.


Does this help?

– Ufuk

Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Kashmar, Ali
Is there a way to make a task cluster-parallelizable? I.e. Make sure the
parallel instances of the task are distributed across the cluster. When I
run my flink job with a parallelism of 16, all the parallel tasks are
assigned to the first task manager.

- Ali

On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:

>
>> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote:
>> Do the parallel instances of each task get distributed across the
>>cluster or is it possible that they all run on the same node?
>
>Yes, slots are requested from all nodes of the cluster. But keep in mind
>that multiple tasks (forming a local pipeline) can be scheduled to the
>same slot (1 slot can hold many tasks).
>
>Have you seen this?
>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/job
>_scheduling.html
>
>> If they can all run on the same node, what happens when that node
>>crashes? Does the job manager recreate them using the remaining open
>>slots?
>
>What happens: The job manager tries to restart the program with the same
>parallelism. Thus if you have enough free slots available in your
>cluster, this works smoothly (so yes, the remaining/available slots are
>used)
>
>With a YARN cluster the task manager containers are restarted
>automatically. In standalone mode, you have to take care of this yourself.
>
>
>Does this help?
>
>­ Ufuk
>

Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Ufuk Celebi-2

> On 01 Dec 2015, at 15:26, Kashmar, Ali <[hidden email]> wrote:
>
> Is there a way to make a task cluster-parallelizable? I.e. Make sure the
> parallel instances of the task are distributed across the cluster. When I
> run my flink job with a parallelism of 16, all the parallel tasks are
> assigned to the first task manager.

No, currently this is not possible*. But it can actually be beneficial for job performance if everything is running locally on a single task manager. Is there a specific reason that you want to spread it out?

– Ufuk

* There was a pull request by Till Rohrmann, which never made it into the main code: https://github.com/apache/flink/pull/60

Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Stephan Ewen
In reply to this post by Kashmar, Ali
Slots are like "resource groups" which execute entire pipelines. They
frequently have more than one operator.

What you can try as a workaround is decrease the number of slots per
machine to cause the operators to be spread across more machines.

If this is a crucial issue for your use case, it should be simple to add a
"preference to spread out" to the scheduler...

On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]> wrote:

> Is there a way to make a task cluster-parallelizable? I.e. Make sure the
> parallel instances of the task are distributed across the cluster. When I
> run my flink job with a parallelism of 16, all the parallel tasks are
> assigned to the first task manager.
>
> - Ali
>
> On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:
>
> >
> >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote:
> >> Do the parallel instances of each task get distributed across the
> >>cluster or is it possible that they all run on the same node?
> >
> >Yes, slots are requested from all nodes of the cluster. But keep in mind
> >that multiple tasks (forming a local pipeline) can be scheduled to the
> >same slot (1 slot can hold many tasks).
> >
> >Have you seen this?
> >
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/job
> >_scheduling.html
> >
> >> If they can all run on the same node, what happens when that node
> >>crashes? Does the job manager recreate them using the remaining open
> >>slots?
> >
> >What happens: The job manager tries to restart the program with the same
> >parallelism. Thus if you have enough free slots available in your
> >cluster, this works smoothly (so yes, the remaining/available slots are
> >used)
> >
> >With a YARN cluster the task manager containers are restarted
> >automatically. In standalone mode, you have to take care of this yourself.
> >
> >
> >Does this help?
> >
> >­ Ufuk
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Till Rohrmann
If I'm not mistaken, then the scheduler has already a preference to spread
independent pipelines out across the cluster. At least he uses a queue of
instances from which it pops the first element if it allocates a new slot.
This instance is then appended to the queue again, if it has some resources
(slots) left.

I would assume that you have a shuffle operation involved in your job such
that it makes sense for the scheduler to deploy all pipelines to the same
machine.

Cheers,
Till
On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote:

> Slots are like "resource groups" which execute entire pipelines. They
> frequently have more than one operator.
>
> What you can try as a workaround is decrease the number of slots per
> machine to cause the operators to be spread across more machines.
>
> If this is a crucial issue for your use case, it should be simple to add a
> "preference to spread out" to the scheduler...
>
> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]> wrote:
>
> > Is there a way to make a task cluster-parallelizable? I.e. Make sure the
> > parallel instances of the task are distributed across the cluster. When I
> > run my flink job with a parallelism of 16, all the parallel tasks are
> > assigned to the first task manager.
> >
> > - Ali
> >
> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:
> >
> > >
> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote:
> > >> Do the parallel instances of each task get distributed across the
> > >>cluster or is it possible that they all run on the same node?
> > >
> > >Yes, slots are requested from all nodes of the cluster. But keep in mind
> > >that multiple tasks (forming a local pipeline) can be scheduled to the
> > >same slot (1 slot can hold many tasks).
> > >
> > >Have you seen this?
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/job
> > >_scheduling.html
> > >
> > >> If they can all run on the same node, what happens when that node
> > >>crashes? Does the job manager recreate them using the remaining open
> > >>slots?
> > >
> > >What happens: The job manager tries to restart the program with the same
> > >parallelism. Thus if you have enough free slots available in your
> > >cluster, this works smoothly (so yes, the remaining/available slots are
> > >used)
> > >
> > >With a YARN cluster the task manager containers are restarted
> > >automatically. In standalone mode, you have to take care of this
> yourself.
> > >
> > >
> > >Does this help?
> > >
> > >­ Ufuk
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Kashmar, Ali
There is no shuffle operation in my flow. Mine actually looks like this:

Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map ->
Map, Filter)


Maybe it’s treating this whole flow as one pipeline and assigning it to a
slot. What I really wanted was to have the custom source I built to have
running instances on all nodes. I’m not really sure if that’s the right
approach, but if we could add this as a feature that’d be great, since
having more than one node running the same pipeline guarantees the
pipeline is never offline.

-Ali

On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote:

>If I'm not mistaken, then the scheduler has already a preference to spread
>independent pipelines out across the cluster. At least he uses a queue of
>instances from which it pops the first element if it allocates a new slot.
>This instance is then appended to the queue again, if it has some
>resources
>(slots) left.
>
>I would assume that you have a shuffle operation involved in your job such
>that it makes sense for the scheduler to deploy all pipelines to the same
>machine.
>
>Cheers,
>Till
>On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote:
>
>> Slots are like "resource groups" which execute entire pipelines. They
>> frequently have more than one operator.
>>
>> What you can try as a workaround is decrease the number of slots per
>> machine to cause the operators to be spread across more machines.
>>
>> If this is a crucial issue for your use case, it should be simple to
>>add a
>> "preference to spread out" to the scheduler...
>>
>> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]>
>>wrote:
>>
>> > Is there a way to make a task cluster-parallelizable? I.e. Make sure
>>the
>> > parallel instances of the task are distributed across the cluster.
>>When I
>> > run my flink job with a parallelism of 16, all the parallel tasks are
>> > assigned to the first task manager.
>> >
>> > - Ali
>> >
>> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:
>> >
>> > >
>> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]> wrote:
>> > >> Do the parallel instances of each task get distributed across the
>> > >>cluster or is it possible that they all run on the same node?
>> > >
>> > >Yes, slots are requested from all nodes of the cluster. But keep in
>>mind
>> > >that multiple tasks (forming a local pipeline) can be scheduled to
>>the
>> > >same slot (1 slot can hold many tasks).
>> > >
>> > >Have you seen this?
>> > >
>> >
>>
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
>>b
>> > >_scheduling.html
>> > >
>> > >> If they can all run on the same node, what happens when that node
>> > >>crashes? Does the job manager recreate them using the remaining open
>> > >>slots?
>> > >
>> > >What happens: The job manager tries to restart the program with the
>>same
>> > >parallelism. Thus if you have enough free slots available in your
>> > >cluster, this works smoothly (so yes, the remaining/available slots
>>are
>> > >used)
>> > >
>> > >With a YARN cluster the task manager containers are restarted
>> > >automatically. In standalone mode, you have to take care of this
>> yourself.
>> > >
>> > >
>> > >Does this help?
>> > >
>> > >­ Ufuk
>> > >
>> >
>> >
>>

Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Stephan Ewen
Hi Ali!

In the case you have, the sequence of source-map-filter ... forms a
pipeline.

You mentioned that you set the parallelism to 16, so there should be 16
pipelines. These pipelines should be completely independent.

Looking at the way the scheduler is implemented, independent pipelines
should be spread across machines. But when you execute that in parallel,
you say all 16 pipelines end up on the same machine?

Can you share with us the rough code of your program? Or a Screenshot from
the runtime dashboard that shows the program graph?


If your cluster is basically for that one job only, you could try and set
the number of slots to 4 for each machine. Then you have 16 slots in total
and each node would run one of the 16 pipelines.


Greetings,
Stephan


On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> wrote:

> There is no shuffle operation in my flow. Mine actually looks like this:
>
> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map ->
> Map, Filter)
>
>
> Maybe it’s treating this whole flow as one pipeline and assigning it to a
> slot. What I really wanted was to have the custom source I built to have
> running instances on all nodes. I’m not really sure if that’s the right
> approach, but if we could add this as a feature that’d be great, since
> having more than one node running the same pipeline guarantees the
> pipeline is never offline.
>
> -Ali
>
> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote:
>
> >If I'm not mistaken, then the scheduler has already a preference to spread
> >independent pipelines out across the cluster. At least he uses a queue of
> >instances from which it pops the first element if it allocates a new slot.
> >This instance is then appended to the queue again, if it has some
> >resources
> >(slots) left.
> >
> >I would assume that you have a shuffle operation involved in your job such
> >that it makes sense for the scheduler to deploy all pipelines to the same
> >machine.
> >
> >Cheers,
> >Till
> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote:
> >
> >> Slots are like "resource groups" which execute entire pipelines. They
> >> frequently have more than one operator.
> >>
> >> What you can try as a workaround is decrease the number of slots per
> >> machine to cause the operators to be spread across more machines.
> >>
> >> If this is a crucial issue for your use case, it should be simple to
> >>add a
> >> "preference to spread out" to the scheduler...
> >>
> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]>
> >>wrote:
> >>
> >> > Is there a way to make a task cluster-parallelizable? I.e. Make sure
> >>the
> >> > parallel instances of the task are distributed across the cluster.
> >>When I
> >> > run my flink job with a parallelism of 16, all the parallel tasks are
> >> > assigned to the first task manager.
> >> >
> >> > - Ali
> >> >
> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:
> >> >
> >> > >
> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]>
> wrote:
> >> > >> Do the parallel instances of each task get distributed across the
> >> > >>cluster or is it possible that they all run on the same node?
> >> > >
> >> > >Yes, slots are requested from all nodes of the cluster. But keep in
> >>mind
> >> > >that multiple tasks (forming a local pipeline) can be scheduled to
> >>the
> >> > >same slot (1 slot can hold many tasks).
> >> > >
> >> > >Have you seen this?
> >> > >
> >> >
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
> >>b
> >> > >_scheduling.html
> >> > >
> >> > >> If they can all run on the same node, what happens when that node
> >> > >>crashes? Does the job manager recreate them using the remaining open
> >> > >>slots?
> >> > >
> >> > >What happens: The job manager tries to restart the program with the
> >>same
> >> > >parallelism. Thus if you have enough free slots available in your
> >> > >cluster, this works smoothly (so yes, the remaining/available slots
> >>are
> >> > >used)
> >> > >
> >> > >With a YARN cluster the task manager containers are restarted
> >> > >automatically. In standalone mode, you have to take care of this
> >> yourself.
> >> > >
> >> > >
> >> > >Does this help?
> >> > >
> >> > >­ Ufuk
> >> > >
> >> >
> >> >
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Kashmar, Ali
Hi Stephan,

That was my original understanding, until I realized that I was not using
a parallel socket source. I had a custom source that extended
SourceFunction which always runs with parallelism = 1. I looked through
the API and found the ParallelSourceFunction interface so I implemented
that and voila, now all 3 nodes in the cluster are actually receiving
traffic on socket connections.

Now that I’m running it successfully end to end, I’m trying to improve the
performance. Can you take a look at the attached screen shot and tell me
if the distribution of work amongst the pipelines is normal? I feel like
some pipelines are lot lazier than others, even though the cluster nodes
are exactly the same.

By the way, here’s the class I wrote. It would be useful to have this
available in Flink distro:

public class ParallelSocketSource implements
ParallelSourceFunction<String> {

        private static final long serialVersionUID = -271094428915640892L;
        private static final Logger LOG =
LoggerFactory.getLogger(ParallelSocketSource.class);

        private volatile boolean running = true;
        private String host;
        private int port;

        public ParallelSocketSource(String host, int port) {
                this.host = host;
                this.port = port;
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
                try (Socket socket = new Socket(host, port);
                        BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream()))) {
                        String line  = null;
                        while(running && ((line = reader.readLine()) != null)) {
                                ctx.collect(line);
                        }
                } catch(IOException ex) {
                        LOG.error("error reading from socket", ex);
                }
        }

        @Override
        public void cancel() {
                running = false;
        }
}

Regards,
Ali
 

On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote:

>Hi Ali!
>
>In the case you have, the sequence of source-map-filter ... forms a
>pipeline.
>
>You mentioned that you set the parallelism to 16, so there should be 16
>pipelines. These pipelines should be completely independent.
>
>Looking at the way the scheduler is implemented, independent pipelines
>should be spread across machines. But when you execute that in parallel,
>you say all 16 pipelines end up on the same machine?
>
>Can you share with us the rough code of your program? Or a Screenshot from
>the runtime dashboard that shows the program graph?
>
>
>If your cluster is basically for that one job only, you could try and set
>the number of slots to 4 for each machine. Then you have 16 slots in total
>and each node would run one of the 16 pipelines.
>
>
>Greetings,
>Stephan
>
>
>On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> wrote:
>
>> There is no shuffle operation in my flow. Mine actually looks like this:
>>
>> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map
>>->
>> Map, Filter)
>>
>>
>> Maybe it’s treating this whole flow as one pipeline and assigning it to
>>a
>> slot. What I really wanted was to have the custom source I built to have
>> running instances on all nodes. I’m not really sure if that’s the right
>> approach, but if we could add this as a feature that’d be great, since
>> having more than one node running the same pipeline guarantees the
>> pipeline is never offline.
>>
>> -Ali
>>
>> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote:
>>
>> >If I'm not mistaken, then the scheduler has already a preference to
>>spread
>> >independent pipelines out across the cluster. At least he uses a queue
>>of
>> >instances from which it pops the first element if it allocates a new
>>slot.
>> >This instance is then appended to the queue again, if it has some
>> >resources
>> >(slots) left.
>> >
>> >I would assume that you have a shuffle operation involved in your job
>>such
>> >that it makes sense for the scheduler to deploy all pipelines to the
>>same
>> >machine.
>> >
>> >Cheers,
>> >Till
>> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote:
>> >
>> >> Slots are like "resource groups" which execute entire pipelines. They
>> >> frequently have more than one operator.
>> >>
>> >> What you can try as a workaround is decrease the number of slots per
>> >> machine to cause the operators to be spread across more machines.
>> >>
>> >> If this is a crucial issue for your use case, it should be simple to
>> >>add a
>> >> "preference to spread out" to the scheduler...
>> >>
>> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]>
>> >>wrote:
>> >>
>> >> > Is there a way to make a task cluster-parallelizable? I.e. Make
>>sure
>> >>the
>> >> > parallel instances of the task are distributed across the cluster.
>> >>When I
>> >> > run my flink job with a parallelism of 16, all the parallel tasks
>>are
>> >> > assigned to the first task manager.
>> >> >
>> >> > - Ali
>> >> >
>> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:
>> >> >
>> >> > >
>> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]>
>> wrote:
>> >> > >> Do the parallel instances of each task get distributed across
>>the
>> >> > >>cluster or is it possible that they all run on the same node?
>> >> > >
>> >> > >Yes, slots are requested from all nodes of the cluster. But keep
>>in
>> >>mind
>> >> > >that multiple tasks (forming a local pipeline) can be scheduled to
>> >>the
>> >> > >same slot (1 slot can hold many tasks).
>> >> > >
>> >> > >Have you seen this?
>> >> > >
>> >> >
>> >>
>> >>
>>
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
>> >>b
>> >> > >_scheduling.html
>> >> > >
>> >> > >> If they can all run on the same node, what happens when that
>>node
>> >> > >>crashes? Does the job manager recreate them using the remaining
>>open
>> >> > >>slots?
>> >> > >
>> >> > >What happens: The job manager tries to restart the program with
>>the
>> >>same
>> >> > >parallelism. Thus if you have enough free slots available in your
>> >> > >cluster, this works smoothly (so yes, the remaining/available
>>slots
>> >>are
>> >> > >used)
>> >> > >
>> >> > >With a YARN cluster the task manager containers are restarted
>> >> > >automatically. In standalone mode, you have to take care of this
>> >> yourself.
>> >> > >
>> >> > >
>> >> > >Does this help?
>> >> > >
>> >> > >­ Ufuk
>> >> > >
>> >> >
>> >> >
>> >>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Stephan Ewen
Hi!

The parallel socket source looks good.
I think you forgot to attach the screenshot, or the mailing list dropped
the attachment...

Not sure if I can diagnose that without more details. The sources all do
the same. Assuming that the server distributes the data evenly across all
connected sockets, and that the network bandwidth ends up being divided in
a fair way, all pipelines should run be similarly "eager".

Greetings,
Stephan


On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> wrote:

> Hi Stephan,
>
> That was my original understanding, until I realized that I was not using
> a parallel socket source. I had a custom source that extended
> SourceFunction which always runs with parallelism = 1. I looked through
> the API and found the ParallelSourceFunction interface so I implemented
> that and voila, now all 3 nodes in the cluster are actually receiving
> traffic on socket connections.
>
> Now that I’m running it successfully end to end, I’m trying to improve the
> performance. Can you take a look at the attached screen shot and tell me
> if the distribution of work amongst the pipelines is normal? I feel like
> some pipelines are lot lazier than others, even though the cluster nodes
> are exactly the same.
>
> By the way, here’s the class I wrote. It would be useful to have this
> available in Flink distro:
>
> public class ParallelSocketSource implements
> ParallelSourceFunction<String> {
>
>         private static final long serialVersionUID = -271094428915640892L;
>         private static final Logger LOG =
> LoggerFactory.getLogger(ParallelSocketSource.class);
>
>         private volatile boolean running = true;
>         private String host;
>         private int port;
>
>         public ParallelSocketSource(String host, int port) {
>                 this.host = host;
>                 this.port = port;
>         }
>
>         @Override
>         public void run(SourceContext<String> ctx) throws Exception {
>                 try (Socket socket = new Socket(host, port);
>                         BufferedReader reader = new BufferedReader(new
> InputStreamReader(socket.getInputStream()))) {
>                         String line  = null;
>                         while(running && ((line = reader.readLine()) !=
> null)) {
>                                 ctx.collect(line);
>                         }
>                 } catch(IOException ex) {
>                         LOG.error("error reading from socket", ex);
>                 }
>         }
>
>         @Override
>         public void cancel() {
>                 running = false;
>         }
> }
>
> Regards,
> Ali
>
>
> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote:
>
> >Hi Ali!
> >
> >In the case you have, the sequence of source-map-filter ... forms a
> >pipeline.
> >
> >You mentioned that you set the parallelism to 16, so there should be 16
> >pipelines. These pipelines should be completely independent.
> >
> >Looking at the way the scheduler is implemented, independent pipelines
> >should be spread across machines. But when you execute that in parallel,
> >you say all 16 pipelines end up on the same machine?
> >
> >Can you share with us the rough code of your program? Or a Screenshot from
> >the runtime dashboard that shows the program graph?
> >
> >
> >If your cluster is basically for that one job only, you could try and set
> >the number of slots to 4 for each machine. Then you have 16 slots in total
> >and each node would run one of the 16 pipelines.
> >
> >
> >Greetings,
> >Stephan
> >
> >
> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]> wrote:
> >
> >> There is no shuffle operation in my flow. Mine actually looks like this:
> >>
> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map
> >>->
> >> Map, Filter)
> >>
> >>
> >> Maybe it’s treating this whole flow as one pipeline and assigning it to
> >>a
> >> slot. What I really wanted was to have the custom source I built to have
> >> running instances on all nodes. I’m not really sure if that’s the right
> >> approach, but if we could add this as a feature that’d be great, since
> >> having more than one node running the same pipeline guarantees the
> >> pipeline is never offline.
> >>
> >> -Ali
> >>
> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote:
> >>
> >> >If I'm not mistaken, then the scheduler has already a preference to
> >>spread
> >> >independent pipelines out across the cluster. At least he uses a queue
> >>of
> >> >instances from which it pops the first element if it allocates a new
> >>slot.
> >> >This instance is then appended to the queue again, if it has some
> >> >resources
> >> >(slots) left.
> >> >
> >> >I would assume that you have a shuffle operation involved in your job
> >>such
> >> >that it makes sense for the scheduler to deploy all pipelines to the
> >>same
> >> >machine.
> >> >
> >> >Cheers,
> >> >Till
> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote:
> >> >
> >> >> Slots are like "resource groups" which execute entire pipelines. They
> >> >> frequently have more than one operator.
> >> >>
> >> >> What you can try as a workaround is decrease the number of slots per
> >> >> machine to cause the operators to be spread across more machines.
> >> >>
> >> >> If this is a crucial issue for your use case, it should be simple to
> >> >>add a
> >> >> "preference to spread out" to the scheduler...
> >> >>
> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]>
> >> >>wrote:
> >> >>
> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make
> >>sure
> >> >>the
> >> >> > parallel instances of the task are distributed across the cluster.
> >> >>When I
> >> >> > run my flink job with a parallelism of 16, all the parallel tasks
> >>are
> >> >> > assigned to the first task manager.
> >> >> >
> >> >> > - Ali
> >> >> >
> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:
> >> >> >
> >> >> > >
> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]>
> >> wrote:
> >> >> > >> Do the parallel instances of each task get distributed across
> >>the
> >> >> > >>cluster or is it possible that they all run on the same node?
> >> >> > >
> >> >> > >Yes, slots are requested from all nodes of the cluster. But keep
> >>in
> >> >>mind
> >> >> > >that multiple tasks (forming a local pipeline) can be scheduled to
> >> >>the
> >> >> > >same slot (1 slot can hold many tasks).
> >> >> > >
> >> >> > >Have you seen this?
> >> >> > >
> >> >> >
> >> >>
> >> >>
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
> >> >>b
> >> >> > >_scheduling.html
> >> >> > >
> >> >> > >> If they can all run on the same node, what happens when that
> >>node
> >> >> > >>crashes? Does the job manager recreate them using the remaining
> >>open
> >> >> > >>slots?
> >> >> > >
> >> >> > >What happens: The job manager tries to restart the program with
> >>the
> >> >>same
> >> >> > >parallelism. Thus if you have enough free slots available in your
> >> >> > >cluster, this works smoothly (so yes, the remaining/available
> >>slots
> >> >>are
> >> >> > >used)
> >> >> > >
> >> >> > >With a YARN cluster the task manager containers are restarted
> >> >> > >automatically. In standalone mode, you have to take care of this
> >> >> yourself.
> >> >> > >
> >> >> > >
> >> >> > >Does this help?
> >> >> > >
> >> >> > >­ Ufuk
> >> >> > >
> >> >> >
> >> >> >
> >> >>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Kashmar, Ali
Hi Stephan,

Here’s a link to the screenshot I tried to attach earlier:

https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28

It looks to me like the distribution is fairly skewed across the nodes,
even though they’re executing the same pipeline.

Thanks,
Ali


On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote:

>Hi!
>
>The parallel socket source looks good.
>I think you forgot to attach the screenshot, or the mailing list dropped
>the attachment...
>
>Not sure if I can diagnose that without more details. The sources all do
>the same. Assuming that the server distributes the data evenly across all
>connected sockets, and that the network bandwidth ends up being divided in
>a fair way, all pipelines should run be similarly "eager".
>
>Greetings,
>Stephan
>
>
>On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> wrote:
>
>> Hi Stephan,
>>
>> That was my original understanding, until I realized that I was not
>>using
>> a parallel socket source. I had a custom source that extended
>> SourceFunction which always runs with parallelism = 1. I looked through
>> the API and found the ParallelSourceFunction interface so I implemented
>> that and voila, now all 3 nodes in the cluster are actually receiving
>> traffic on socket connections.
>>
>> Now that I’m running it successfully end to end, I’m trying to improve
>>the
>> performance. Can you take a look at the attached screen shot and tell me
>> if the distribution of work amongst the pipelines is normal? I feel like
>> some pipelines are lot lazier than others, even though the cluster nodes
>> are exactly the same.
>>
>> By the way, here’s the class I wrote. It would be useful to have this
>> available in Flink distro:
>>
>> public class ParallelSocketSource implements
>> ParallelSourceFunction<String> {
>>
>>         private static final long serialVersionUID =
>>-271094428915640892L;
>>         private static final Logger LOG =
>> LoggerFactory.getLogger(ParallelSocketSource.class);
>>
>>         private volatile boolean running = true;
>>         private String host;
>>         private int port;
>>
>>         public ParallelSocketSource(String host, int port) {
>>                 this.host = host;
>>                 this.port = port;
>>         }
>>
>>         @Override
>>         public void run(SourceContext<String> ctx) throws Exception {
>>                 try (Socket socket = new Socket(host, port);
>>                         BufferedReader reader = new BufferedReader(new
>> InputStreamReader(socket.getInputStream()))) {
>>                         String line  = null;
>>                         while(running && ((line = reader.readLine()) !=
>> null)) {
>>                                 ctx.collect(line);
>>                         }
>>                 } catch(IOException ex) {
>>                         LOG.error("error reading from socket", ex);
>>                 }
>>         }
>>
>>         @Override
>>         public void cancel() {
>>                 running = false;
>>         }
>> }
>>
>> Regards,
>> Ali
>>
>>
>> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote:
>>
>> >Hi Ali!
>> >
>> >In the case you have, the sequence of source-map-filter ... forms a
>> >pipeline.
>> >
>> >You mentioned that you set the parallelism to 16, so there should be 16
>> >pipelines. These pipelines should be completely independent.
>> >
>> >Looking at the way the scheduler is implemented, independent pipelines
>> >should be spread across machines. But when you execute that in
>>parallel,
>> >you say all 16 pipelines end up on the same machine?
>> >
>> >Can you share with us the rough code of your program? Or a Screenshot
>>from
>> >the runtime dashboard that shows the program graph?
>> >
>> >
>> >If your cluster is basically for that one job only, you could try and
>>set
>> >the number of slots to 4 for each machine. Then you have 16 slots in
>>total
>> >and each node would run one of the 16 pipelines.
>> >
>> >
>> >Greetings,
>> >Stephan
>> >
>> >
>> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]>
>>wrote:
>> >
>> >> There is no shuffle operation in my flow. Mine actually looks like
>>this:
>> >>
>> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map ->
>>Map
>> >>->
>> >> Map, Filter)
>> >>
>> >>
>> >> Maybe it’s treating this whole flow as one pipeline and assigning it
>>to
>> >>a
>> >> slot. What I really wanted was to have the custom source I built to
>>have
>> >> running instances on all nodes. I’m not really sure if that’s the
>>right
>> >> approach, but if we could add this as a feature that’d be great,
>>since
>> >> having more than one node running the same pipeline guarantees the
>> >> pipeline is never offline.
>> >>
>> >> -Ali
>> >>
>> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]> wrote:
>> >>
>> >> >If I'm not mistaken, then the scheduler has already a preference to
>> >>spread
>> >> >independent pipelines out across the cluster. At least he uses a
>>queue
>> >>of
>> >> >instances from which it pops the first element if it allocates a new
>> >>slot.
>> >> >This instance is then appended to the queue again, if it has some
>> >> >resources
>> >> >(slots) left.
>> >> >
>> >> >I would assume that you have a shuffle operation involved in your
>>job
>> >>such
>> >> >that it makes sense for the scheduler to deploy all pipelines to the
>> >>same
>> >> >machine.
>> >> >
>> >> >Cheers,
>> >> >Till
>> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote:
>> >> >
>> >> >> Slots are like "resource groups" which execute entire pipelines.
>>They
>> >> >> frequently have more than one operator.
>> >> >>
>> >> >> What you can try as a workaround is decrease the number of slots
>>per
>> >> >> machine to cause the operators to be spread across more machines.
>> >> >>
>> >> >> If this is a crucial issue for your use case, it should be simple
>>to
>> >> >>add a
>> >> >> "preference to spread out" to the scheduler...
>> >> >>
>> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]>
>> >> >>wrote:
>> >> >>
>> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make
>> >>sure
>> >> >>the
>> >> >> > parallel instances of the task are distributed across the
>>cluster.
>> >> >>When I
>> >> >> > run my flink job with a parallelism of 16, all the parallel
>>tasks
>> >>are
>> >> >> > assigned to the first task manager.
>> >> >> >
>> >> >> > - Ali
>> >> >> >
>> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:
>> >> >> >
>> >> >> > >
>> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]>
>> >> wrote:
>> >> >> > >> Do the parallel instances of each task get distributed across
>> >>the
>> >> >> > >>cluster or is it possible that they all run on the same node?
>> >> >> > >
>> >> >> > >Yes, slots are requested from all nodes of the cluster. But
>>keep
>> >>in
>> >> >>mind
>> >> >> > >that multiple tasks (forming a local pipeline) can be
>>scheduled to
>> >> >>the
>> >> >> > >same slot (1 slot can hold many tasks).
>> >> >> > >
>> >> >> > >Have you seen this?
>> >> >> > >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>> >>
>>
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
>> >> >>b
>> >> >> > >_scheduling.html
>> >> >> > >
>> >> >> > >> If they can all run on the same node, what happens when that
>> >>node
>> >> >> > >>crashes? Does the job manager recreate them using the
>>remaining
>> >>open
>> >> >> > >>slots?
>> >> >> > >
>> >> >> > >What happens: The job manager tries to restart the program with
>> >>the
>> >> >>same
>> >> >> > >parallelism. Thus if you have enough free slots available in
>>your
>> >> >> > >cluster, this works smoothly (so yes, the remaining/available
>> >>slots
>> >> >>are
>> >> >> > >used)
>> >> >> > >
>> >> >> > >With a YARN cluster the task manager containers are restarted
>> >> >> > >automatically. In standalone mode, you have to take care of
>>this
>> >> >> yourself.
>> >> >> > >
>> >> >> > >
>> >> >> > >Does this help?
>> >> >> > >
>> >> >> > >­ Ufuk
>> >> >> > >
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> >>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Stephan Ewen
Hi Ali!

Seems like the Google Doc has restricted access, I tells me I have no
permission to view it...

Stephan


On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]> wrote:

> Hi Stephan,
>
> Here’s a link to the screenshot I tried to attach earlier:
>
> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>
> It looks to me like the distribution is fairly skewed across the nodes,
> even though they’re executing the same pipeline.
>
> Thanks,
> Ali
>
>
> On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote:
>
> >Hi!
> >
> >The parallel socket source looks good.
> >I think you forgot to attach the screenshot, or the mailing list dropped
> >the attachment...
> >
> >Not sure if I can diagnose that without more details. The sources all do
> >the same. Assuming that the server distributes the data evenly across all
> >connected sockets, and that the network bandwidth ends up being divided in
> >a fair way, all pipelines should run be similarly "eager".
> >
> >Greetings,
> >Stephan
> >
> >
> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]> wrote:
> >
> >> Hi Stephan,
> >>
> >> That was my original understanding, until I realized that I was not
> >>using
> >> a parallel socket source. I had a custom source that extended
> >> SourceFunction which always runs with parallelism = 1. I looked through
> >> the API and found the ParallelSourceFunction interface so I implemented
> >> that and voila, now all 3 nodes in the cluster are actually receiving
> >> traffic on socket connections.
> >>
> >> Now that I’m running it successfully end to end, I’m trying to improve
> >>the
> >> performance. Can you take a look at the attached screen shot and tell me
> >> if the distribution of work amongst the pipelines is normal? I feel like
> >> some pipelines are lot lazier than others, even though the cluster nodes
> >> are exactly the same.
> >>
> >> By the way, here’s the class I wrote. It would be useful to have this
> >> available in Flink distro:
> >>
> >> public class ParallelSocketSource implements
> >> ParallelSourceFunction<String> {
> >>
> >>         private static final long serialVersionUID =
> >>-271094428915640892L;
> >>         private static final Logger LOG =
> >> LoggerFactory.getLogger(ParallelSocketSource.class);
> >>
> >>         private volatile boolean running = true;
> >>         private String host;
> >>         private int port;
> >>
> >>         public ParallelSocketSource(String host, int port) {
> >>                 this.host = host;
> >>                 this.port = port;
> >>         }
> >>
> >>         @Override
> >>         public void run(SourceContext<String> ctx) throws Exception {
> >>                 try (Socket socket = new Socket(host, port);
> >>                         BufferedReader reader = new BufferedReader(new
> >> InputStreamReader(socket.getInputStream()))) {
> >>                         String line  = null;
> >>                         while(running && ((line = reader.readLine()) !=
> >> null)) {
> >>                                 ctx.collect(line);
> >>                         }
> >>                 } catch(IOException ex) {
> >>                         LOG.error("error reading from socket", ex);
> >>                 }
> >>         }
> >>
> >>         @Override
> >>         public void cancel() {
> >>                 running = false;
> >>         }
> >> }
> >>
> >> Regards,
> >> Ali
> >>
> >>
> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote:
> >>
> >> >Hi Ali!
> >> >
> >> >In the case you have, the sequence of source-map-filter ... forms a
> >> >pipeline.
> >> >
> >> >You mentioned that you set the parallelism to 16, so there should be 16
> >> >pipelines. These pipelines should be completely independent.
> >> >
> >> >Looking at the way the scheduler is implemented, independent pipelines
> >> >should be spread across machines. But when you execute that in
> >>parallel,
> >> >you say all 16 pipelines end up on the same machine?
> >> >
> >> >Can you share with us the rough code of your program? Or a Screenshot
> >>from
> >> >the runtime dashboard that shows the program graph?
> >> >
> >> >
> >> >If your cluster is basically for that one job only, you could try and
> >>set
> >> >the number of slots to 4 for each machine. Then you have 16 slots in
> >>total
> >> >and each node would run one of the 16 pipelines.
> >> >
> >> >
> >> >Greetings,
> >> >Stephan
> >> >
> >> >
> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]>
> >>wrote:
> >> >
> >> >> There is no shuffle operation in my flow. Mine actually looks like
> >>this:
> >> >>
> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map ->
> >>Map
> >> >>->
> >> >> Map, Filter)
> >> >>
> >> >>
> >> >> Maybe it’s treating this whole flow as one pipeline and assigning it
> >>to
> >> >>a
> >> >> slot. What I really wanted was to have the custom source I built to
> >>have
> >> >> running instances on all nodes. I’m not really sure if that’s the
> >>right
> >> >> approach, but if we could add this as a feature that’d be great,
> >>since
> >> >> having more than one node running the same pipeline guarantees the
> >> >> pipeline is never offline.
> >> >>
> >> >> -Ali
> >> >>
> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]>
> wrote:
> >> >>
> >> >> >If I'm not mistaken, then the scheduler has already a preference to
> >> >>spread
> >> >> >independent pipelines out across the cluster. At least he uses a
> >>queue
> >> >>of
> >> >> >instances from which it pops the first element if it allocates a new
> >> >>slot.
> >> >> >This instance is then appended to the queue again, if it has some
> >> >> >resources
> >> >> >(slots) left.
> >> >> >
> >> >> >I would assume that you have a shuffle operation involved in your
> >>job
> >> >>such
> >> >> >that it makes sense for the scheduler to deploy all pipelines to the
> >> >>same
> >> >> >machine.
> >> >> >
> >> >> >Cheers,
> >> >> >Till
> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote:
> >> >> >
> >> >> >> Slots are like "resource groups" which execute entire pipelines.
> >>They
> >> >> >> frequently have more than one operator.
> >> >> >>
> >> >> >> What you can try as a workaround is decrease the number of slots
> >>per
> >> >> >> machine to cause the operators to be spread across more machines.
> >> >> >>
> >> >> >> If this is a crucial issue for your use case, it should be simple
> >>to
> >> >> >>add a
> >> >> >> "preference to spread out" to the scheduler...
> >> >> >>
> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <[hidden email]
> >
> >> >> >>wrote:
> >> >> >>
> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make
> >> >>sure
> >> >> >>the
> >> >> >> > parallel instances of the task are distributed across the
> >>cluster.
> >> >> >>When I
> >> >> >> > run my flink job with a parallelism of 16, all the parallel
> >>tasks
> >> >>are
> >> >> >> > assigned to the first task manager.
> >> >> >> >
> >> >> >> > - Ali
> >> >> >> >
> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:
> >> >> >> >
> >> >> >> > >
> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <[hidden email]>
> >> >> wrote:
> >> >> >> > >> Do the parallel instances of each task get distributed across
> >> >>the
> >> >> >> > >>cluster or is it possible that they all run on the same node?
> >> >> >> > >
> >> >> >> > >Yes, slots are requested from all nodes of the cluster. But
> >>keep
> >> >>in
> >> >> >>mind
> >> >> >> > >that multiple tasks (forming a local pipeline) can be
> >>scheduled to
> >> >> >>the
> >> >> >> > >same slot (1 slot can hold many tasks).
> >> >> >> > >
> >> >> >> > >Have you seen this?
> >> >> >> > >
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >>
> >> >>
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
> >> >> >>b
> >> >> >> > >_scheduling.html
> >> >> >> > >
> >> >> >> > >> If they can all run on the same node, what happens when that
> >> >>node
> >> >> >> > >>crashes? Does the job manager recreate them using the
> >>remaining
> >> >>open
> >> >> >> > >>slots?
> >> >> >> > >
> >> >> >> > >What happens: The job manager tries to restart the program with
> >> >>the
> >> >> >>same
> >> >> >> > >parallelism. Thus if you have enough free slots available in
> >>your
> >> >> >> > >cluster, this works smoothly (so yes, the remaining/available
> >> >>slots
> >> >> >>are
> >> >> >> > >used)
> >> >> >> > >
> >> >> >> > >With a YARN cluster the task manager containers are restarted
> >> >> >> > >automatically. In standalone mode, you have to take care of
> >>this
> >> >> >> yourself.
> >> >> >> > >
> >> >> >> > >
> >> >> >> > >Does this help?
> >> >> >> > >
> >> >> >> > >­ Ufuk
> >> >> >> > >
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Kashmar, Ali
Hi Stephan,

I got a request to share the image with someone and I assume it was you.
You should be able to see it now. This seems to be the main issue I have
at this time. I've tried running the job on the cluster with a parallelism
of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
working for a bit and then some of them just stop, I’m not sure if they’re
stuck or not. Here’s another screenshot:
http://postimg.org/image/gr6ogxqjj/

Two things you’ll notice:
1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
anything at one point and only 192.168.200.173 is doing all the work.
2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end time
even though the job should be finished (the screenshot was taken after the
source was closed).

I’m not sure if this helps or not, but here are some properties from the
flink-conf.yaml:

jobmanager.heap.mb: 8192
taskmanager.heap.mb: 49152
taskmanager.numberOfTaskSlots: 16
parallelism.default: 1

state.backend: filesystem
state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints

taskmanager.network.numberOfBuffers: 3072

recovery.mode: zookeeper
recovery.zookeeper.quorum:
192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
recovery.zookeeper.storageDir: file:///tmp/zk-recovery
recovery.zookeeper.path.root: /opt/flink-0.10.0

I appreciate all the help.


Thanks,
Ali


On 2015-12-10, 10:16 AM, "Stephan Ewen" <[hidden email]> wrote:

>Hi Ali!
>
>Seems like the Google Doc has restricted access, I tells me I have no
>permission to view it...
>
>Stephan
>
>
>On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]> wrote:
>
>> Hi Stephan,
>>
>> Here’s a link to the screenshot I tried to attach earlier:
>>
>> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>>
>> It looks to me like the distribution is fairly skewed across the nodes,
>> even though they’re executing the same pipeline.
>>
>> Thanks,
>> Ali
>>
>>
>> On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote:
>>
>> >Hi!
>> >
>> >The parallel socket source looks good.
>> >I think you forgot to attach the screenshot, or the mailing list
>>dropped
>> >the attachment...
>> >
>> >Not sure if I can diagnose that without more details. The sources all
>>do
>> >the same. Assuming that the server distributes the data evenly across
>>all
>> >connected sockets, and that the network bandwidth ends up being
>>divided in
>> >a fair way, all pipelines should run be similarly "eager".
>> >
>> >Greetings,
>> >Stephan
>> >
>> >
>> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]>
>>wrote:
>> >
>> >> Hi Stephan,
>> >>
>> >> That was my original understanding, until I realized that I was not
>> >>using
>> >> a parallel socket source. I had a custom source that extended
>> >> SourceFunction which always runs with parallelism = 1. I looked
>>through
>> >> the API and found the ParallelSourceFunction interface so I
>>implemented
>> >> that and voila, now all 3 nodes in the cluster are actually receiving
>> >> traffic on socket connections.
>> >>
>> >> Now that I’m running it successfully end to end, I’m trying to
>>improve
>> >>the
>> >> performance. Can you take a look at the attached screen shot and
>>tell me
>> >> if the distribution of work amongst the pipelines is normal? I feel
>>like
>> >> some pipelines are lot lazier than others, even though the cluster
>>nodes
>> >> are exactly the same.
>> >>
>> >> By the way, here’s the class I wrote. It would be useful to have this
>> >> available in Flink distro:
>> >>
>> >> public class ParallelSocketSource implements
>> >> ParallelSourceFunction<String> {
>> >>
>> >>         private static final long serialVersionUID =
>> >>-271094428915640892L;
>> >>         private static final Logger LOG =
>> >> LoggerFactory.getLogger(ParallelSocketSource.class);
>> >>
>> >>         private volatile boolean running = true;
>> >>         private String host;
>> >>         private int port;
>> >>
>> >>         public ParallelSocketSource(String host, int port) {
>> >>                 this.host = host;
>> >>                 this.port = port;
>> >>         }
>> >>
>> >>         @Override
>> >>         public void run(SourceContext<String> ctx) throws Exception {
>> >>                 try (Socket socket = new Socket(host, port);
>> >>                         BufferedReader reader = new
>>BufferedReader(new
>> >> InputStreamReader(socket.getInputStream()))) {
>> >>                         String line  = null;
>> >>                         while(running && ((line = reader.readLine())
>>!=
>> >> null)) {
>> >>                                 ctx.collect(line);
>> >>                         }
>> >>                 } catch(IOException ex) {
>> >>                         LOG.error("error reading from socket", ex);
>> >>                 }
>> >>         }
>> >>
>> >>         @Override
>> >>         public void cancel() {
>> >>                 running = false;
>> >>         }
>> >> }
>> >>
>> >> Regards,
>> >> Ali
>> >>
>> >>
>> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote:
>> >>
>> >> >Hi Ali!
>> >> >
>> >> >In the case you have, the sequence of source-map-filter ... forms a
>> >> >pipeline.
>> >> >
>> >> >You mentioned that you set the parallelism to 16, so there should
>>be 16
>> >> >pipelines. These pipelines should be completely independent.
>> >> >
>> >> >Looking at the way the scheduler is implemented, independent
>>pipelines
>> >> >should be spread across machines. But when you execute that in
>> >>parallel,
>> >> >you say all 16 pipelines end up on the same machine?
>> >> >
>> >> >Can you share with us the rough code of your program? Or a
>>Screenshot
>> >>from
>> >> >the runtime dashboard that shows the program graph?
>> >> >
>> >> >
>> >> >If your cluster is basically for that one job only, you could try
>>and
>> >>set
>> >> >the number of slots to 4 for each machine. Then you have 16 slots in
>> >>total
>> >> >and each node would run one of the 16 pipelines.
>> >> >
>> >> >
>> >> >Greetings,
>> >> >Stephan
>> >> >
>> >> >
>> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]>
>> >>wrote:
>> >> >
>> >> >> There is no shuffle operation in my flow. Mine actually looks like
>> >>this:
>> >> >>
>> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map ->
>> >>Map
>> >> >>->
>> >> >> Map, Filter)
>> >> >>
>> >> >>
>> >> >> Maybe it’s treating this whole flow as one pipeline and assigning
>>it
>> >>to
>> >> >>a
>> >> >> slot. What I really wanted was to have the custom source I built
>>to
>> >>have
>> >> >> running instances on all nodes. I’m not really sure if that’s the
>> >>right
>> >> >> approach, but if we could add this as a feature that’d be great,
>> >>since
>> >> >> having more than one node running the same pipeline guarantees the
>> >> >> pipeline is never offline.
>> >> >>
>> >> >> -Ali
>> >> >>
>> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]>
>> wrote:
>> >> >>
>> >> >> >If I'm not mistaken, then the scheduler has already a preference
>>to
>> >> >>spread
>> >> >> >independent pipelines out across the cluster. At least he uses a
>> >>queue
>> >> >>of
>> >> >> >instances from which it pops the first element if it allocates a
>>new
>> >> >>slot.
>> >> >> >This instance is then appended to the queue again, if it has some
>> >> >> >resources
>> >> >> >(slots) left.
>> >> >> >
>> >> >> >I would assume that you have a shuffle operation involved in your
>> >>job
>> >> >>such
>> >> >> >that it makes sense for the scheduler to deploy all pipelines to
>>the
>> >> >>same
>> >> >> >machine.
>> >> >> >
>> >> >> >Cheers,
>> >> >> >Till
>> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote:
>> >> >> >
>> >> >> >> Slots are like "resource groups" which execute entire
>>pipelines.
>> >>They
>> >> >> >> frequently have more than one operator.
>> >> >> >>
>> >> >> >> What you can try as a workaround is decrease the number of
>>slots
>> >>per
>> >> >> >> machine to cause the operators to be spread across more
>>machines.
>> >> >> >>
>> >> >> >> If this is a crucial issue for your use case, it should be
>>simple
>> >>to
>> >> >> >>add a
>> >> >> >> "preference to spread out" to the scheduler...
>> >> >> >>
>> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali
>><[hidden email]
>> >
>> >> >> >>wrote:
>> >> >> >>
>> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e.
>>Make
>> >> >>sure
>> >> >> >>the
>> >> >> >> > parallel instances of the task are distributed across the
>> >>cluster.
>> >> >> >>When I
>> >> >> >> > run my flink job with a parallelism of 16, all the parallel
>> >>tasks
>> >> >>are
>> >> >> >> > assigned to the first task manager.
>> >> >> >> >
>> >> >> >> > - Ali
>> >> >> >> >
>> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]> wrote:
>> >> >> >> >
>> >> >> >> > >
>> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali
>><[hidden email]>
>> >> >> wrote:
>> >> >> >> > >> Do the parallel instances of each task get distributed
>>across
>> >> >>the
>> >> >> >> > >>cluster or is it possible that they all run on the same
>>node?
>> >> >> >> > >
>> >> >> >> > >Yes, slots are requested from all nodes of the cluster. But
>> >>keep
>> >> >>in
>> >> >> >>mind
>> >> >> >> > >that multiple tasks (forming a local pipeline) can be
>> >>scheduled to
>> >> >> >>the
>> >> >> >> > >same slot (1 slot can hold many tasks).
>> >> >> >> > >
>> >> >> >> > >Have you seen this?
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >> >>
>> >> >>
>> >> >>
>> >>
>> >>
>>
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
>> >> >> >>b
>> >> >> >> > >_scheduling.html
>> >> >> >> > >
>> >> >> >> > >> If they can all run on the same node, what happens when
>>that
>> >> >>node
>> >> >> >> > >>crashes? Does the job manager recreate them using the
>> >>remaining
>> >> >>open
>> >> >> >> > >>slots?
>> >> >> >> > >
>> >> >> >> > >What happens: The job manager tries to restart the program
>>with
>> >> >>the
>> >> >> >>same
>> >> >> >> > >parallelism. Thus if you have enough free slots available in
>> >>your
>> >> >> >> > >cluster, this works smoothly (so yes, the
>>remaining/available
>> >> >>slots
>> >> >> >>are
>> >> >> >> > >used)
>> >> >> >> > >
>> >> >> >> > >With a YARN cluster the task manager containers are
>>restarted
>> >> >> >> > >automatically. In standalone mode, you have to take care of
>> >>this
>> >> >> >> yourself.
>> >> >> >> > >
>> >> >> >> > >
>> >> >> >> > >Does this help?
>> >> >> >> > >
>> >> >> >> > >­ Ufuk
>> >> >> >> > >
>> >> >> >> >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >> >>
>> >>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Stephan Ewen
Hi Ali!

I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not
make progress, even do not recognize the end-of-stream point.

I expect that the streams on 192.168.200.174 and 192.168.200.175 are
back-pressured to a stand-still. Since no network is involved, the reason
for the back pressure are probably the sinks.

What kind of data sink are you using (in the addSink()) function?
Can you check if that one starts to fully block on machines
192.168.200.174 and 192.168.200.175 ?

Greetings,
Stephan



On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <[hidden email]> wrote:

> Hi Stephan,
>
> I got a request to share the image with someone and I assume it was you.
> You should be able to see it now. This seems to be the main issue I have
> at this time. I've tried running the job on the cluster with a parallelism
> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
> working for a bit and then some of them just stop, I’m not sure if they’re
> stuck or not. Here’s another screenshot:
> http://postimg.org/image/gr6ogxqjj/
>
> Two things you’ll notice:
> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
> anything at one point and only 192.168.200.173 is doing all the work.
> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end time
> even though the job should be finished (the screenshot was taken after the
> source was closed).
>
> I’m not sure if this helps or not, but here are some properties from the
> flink-conf.yaml:
>
> jobmanager.heap.mb: 8192
> taskmanager.heap.mb: 49152
> taskmanager.numberOfTaskSlots: 16
> parallelism.default: 1
>
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints
>
> taskmanager.network.numberOfBuffers: 3072
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:
> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
> recovery.zookeeper.storageDir: file:///tmp/zk-recovery
> recovery.zookeeper.path.root: /opt/flink-0.10.0
>
> I appreciate all the help.
>
>
> Thanks,
> Ali
>
>
> On 2015-12-10, 10:16 AM, "Stephan Ewen" <[hidden email]> wrote:
>
> >Hi Ali!
> >
> >Seems like the Google Doc has restricted access, I tells me I have no
> >permission to view it...
> >
> >Stephan
> >
> >
> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]> wrote:
> >
> >> Hi Stephan,
> >>
> >> Here’s a link to the screenshot I tried to attach earlier:
> >>
> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
> >>
> >> It looks to me like the distribution is fairly skewed across the nodes,
> >> even though they’re executing the same pipeline.
> >>
> >> Thanks,
> >> Ali
> >>
> >>
> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote:
> >>
> >> >Hi!
> >> >
> >> >The parallel socket source looks good.
> >> >I think you forgot to attach the screenshot, or the mailing list
> >>dropped
> >> >the attachment...
> >> >
> >> >Not sure if I can diagnose that without more details. The sources all
> >>do
> >> >the same. Assuming that the server distributes the data evenly across
> >>all
> >> >connected sockets, and that the network bandwidth ends up being
> >>divided in
> >> >a fair way, all pipelines should run be similarly "eager".
> >> >
> >> >Greetings,
> >> >Stephan
> >> >
> >> >
> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]>
> >>wrote:
> >> >
> >> >> Hi Stephan,
> >> >>
> >> >> That was my original understanding, until I realized that I was not
> >> >>using
> >> >> a parallel socket source. I had a custom source that extended
> >> >> SourceFunction which always runs with parallelism = 1. I looked
> >>through
> >> >> the API and found the ParallelSourceFunction interface so I
> >>implemented
> >> >> that and voila, now all 3 nodes in the cluster are actually receiving
> >> >> traffic on socket connections.
> >> >>
> >> >> Now that I’m running it successfully end to end, I’m trying to
> >>improve
> >> >>the
> >> >> performance. Can you take a look at the attached screen shot and
> >>tell me
> >> >> if the distribution of work amongst the pipelines is normal? I feel
> >>like
> >> >> some pipelines are lot lazier than others, even though the cluster
> >>nodes
> >> >> are exactly the same.
> >> >>
> >> >> By the way, here’s the class I wrote. It would be useful to have this
> >> >> available in Flink distro:
> >> >>
> >> >> public class ParallelSocketSource implements
> >> >> ParallelSourceFunction<String> {
> >> >>
> >> >>         private static final long serialVersionUID =
> >> >>-271094428915640892L;
> >> >>         private static final Logger LOG =
> >> >> LoggerFactory.getLogger(ParallelSocketSource.class);
> >> >>
> >> >>         private volatile boolean running = true;
> >> >>         private String host;
> >> >>         private int port;
> >> >>
> >> >>         public ParallelSocketSource(String host, int port) {
> >> >>                 this.host = host;
> >> >>                 this.port = port;
> >> >>         }
> >> >>
> >> >>         @Override
> >> >>         public void run(SourceContext<String> ctx) throws Exception {
> >> >>                 try (Socket socket = new Socket(host, port);
> >> >>                         BufferedReader reader = new
> >>BufferedReader(new
> >> >> InputStreamReader(socket.getInputStream()))) {
> >> >>                         String line  = null;
> >> >>                         while(running && ((line = reader.readLine())
> >>!=
> >> >> null)) {
> >> >>                                 ctx.collect(line);
> >> >>                         }
> >> >>                 } catch(IOException ex) {
> >> >>                         LOG.error("error reading from socket", ex);
> >> >>                 }
> >> >>         }
> >> >>
> >> >>         @Override
> >> >>         public void cancel() {
> >> >>                 running = false;
> >> >>         }
> >> >> }
> >> >>
> >> >> Regards,
> >> >> Ali
> >> >>
> >> >>
> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote:
> >> >>
> >> >> >Hi Ali!
> >> >> >
> >> >> >In the case you have, the sequence of source-map-filter ... forms a
> >> >> >pipeline.
> >> >> >
> >> >> >You mentioned that you set the parallelism to 16, so there should
> >>be 16
> >> >> >pipelines. These pipelines should be completely independent.
> >> >> >
> >> >> >Looking at the way the scheduler is implemented, independent
> >>pipelines
> >> >> >should be spread across machines. But when you execute that in
> >> >>parallel,
> >> >> >you say all 16 pipelines end up on the same machine?
> >> >> >
> >> >> >Can you share with us the rough code of your program? Or a
> >>Screenshot
> >> >>from
> >> >> >the runtime dashboard that shows the program graph?
> >> >> >
> >> >> >
> >> >> >If your cluster is basically for that one job only, you could try
> >>and
> >> >>set
> >> >> >the number of slots to 4 for each machine. Then you have 16 slots in
> >> >>total
> >> >> >and each node would run one of the 16 pipelines.
> >> >> >
> >> >> >
> >> >> >Greetings,
> >> >> >Stephan
> >> >> >
> >> >> >
> >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <[hidden email]>
> >> >>wrote:
> >> >> >
> >> >> >> There is no shuffle operation in my flow. Mine actually looks like
> >> >>this:
> >> >> >>
> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map ->
> >> >>Map
> >> >> >>->
> >> >> >> Map, Filter)
> >> >> >>
> >> >> >>
> >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning
> >>it
> >> >>to
> >> >> >>a
> >> >> >> slot. What I really wanted was to have the custom source I built
> >>to
> >> >>have
> >> >> >> running instances on all nodes. I’m not really sure if that’s the
> >> >>right
> >> >> >> approach, but if we could add this as a feature that’d be great,
> >> >>since
> >> >> >> having more than one node running the same pipeline guarantees the
> >> >> >> pipeline is never offline.
> >> >> >>
> >> >> >> -Ali
> >> >> >>
> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]>
> >> wrote:
> >> >> >>
> >> >> >> >If I'm not mistaken, then the scheduler has already a preference
> >>to
> >> >> >>spread
> >> >> >> >independent pipelines out across the cluster. At least he uses a
> >> >>queue
> >> >> >>of
> >> >> >> >instances from which it pops the first element if it allocates a
> >>new
> >> >> >>slot.
> >> >> >> >This instance is then appended to the queue again, if it has some
> >> >> >> >resources
> >> >> >> >(slots) left.
> >> >> >> >
> >> >> >> >I would assume that you have a shuffle operation involved in your
> >> >>job
> >> >> >>such
> >> >> >> >that it makes sense for the scheduler to deploy all pipelines to
> >>the
> >> >> >>same
> >> >> >> >machine.
> >> >> >> >
> >> >> >> >Cheers,
> >> >> >> >Till
> >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]> wrote:
> >> >> >> >
> >> >> >> >> Slots are like "resource groups" which execute entire
> >>pipelines.
> >> >>They
> >> >> >> >> frequently have more than one operator.
> >> >> >> >>
> >> >> >> >> What you can try as a workaround is decrease the number of
> >>slots
> >> >>per
> >> >> >> >> machine to cause the operators to be spread across more
> >>machines.
> >> >> >> >>
> >> >> >> >> If this is a crucial issue for your use case, it should be
> >>simple
> >> >>to
> >> >> >> >>add a
> >> >> >> >> "preference to spread out" to the scheduler...
> >> >> >> >>
> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali
> >><[hidden email]
> >> >
> >> >> >> >>wrote:
> >> >> >> >>
> >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e.
> >>Make
> >> >> >>sure
> >> >> >> >>the
> >> >> >> >> > parallel instances of the task are distributed across the
> >> >>cluster.
> >> >> >> >>When I
> >> >> >> >> > run my flink job with a parallelism of 16, all the parallel
> >> >>tasks
> >> >> >>are
> >> >> >> >> > assigned to the first task manager.
> >> >> >> >> >
> >> >> >> >> > - Ali
> >> >> >> >> >
> >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]>
> wrote:
> >> >> >> >> >
> >> >> >> >> > >
> >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali
> >><[hidden email]>
> >> >> >> wrote:
> >> >> >> >> > >> Do the parallel instances of each task get distributed
> >>across
> >> >> >>the
> >> >> >> >> > >>cluster or is it possible that they all run on the same
> >>node?
> >> >> >> >> > >
> >> >> >> >> > >Yes, slots are requested from all nodes of the cluster. But
> >> >>keep
> >> >> >>in
> >> >> >> >>mind
> >> >> >> >> > >that multiple tasks (forming a local pipeline) can be
> >> >>scheduled to
> >> >> >> >>the
> >> >> >> >> > >same slot (1 slot can hold many tasks).
> >> >> >> >> > >
> >> >> >> >> > >Have you seen this?
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >> >>
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
> >> >> >> >>b
> >> >> >> >> > >_scheduling.html
> >> >> >> >> > >
> >> >> >> >> > >> If they can all run on the same node, what happens when
> >>that
> >> >> >>node
> >> >> >> >> > >>crashes? Does the job manager recreate them using the
> >> >>remaining
> >> >> >>open
> >> >> >> >> > >>slots?
> >> >> >> >> > >
> >> >> >> >> > >What happens: The job manager tries to restart the program
> >>with
> >> >> >>the
> >> >> >> >>same
> >> >> >> >> > >parallelism. Thus if you have enough free slots available in
> >> >>your
> >> >> >> >> > >cluster, this works smoothly (so yes, the
> >>remaining/available
> >> >> >>slots
> >> >> >> >>are
> >> >> >> >> > >used)
> >> >> >> >> > >
> >> >> >> >> > >With a YARN cluster the task manager containers are
> >>restarted
> >> >> >> >> > >automatically. In standalone mode, you have to take care of
> >> >>this
> >> >> >> >> yourself.
> >> >> >> >> > >
> >> >> >> >> > >
> >> >> >> >> > >Does this help?
> >> >> >> >> > >
> >> >> >> >> > >­ Ufuk
> >> >> >> >> > >
> >> >> >> >> >
> >> >> >> >> >
> >> >> >> >>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Kashmar, Ali
Hi Stephan,

I’m using DataStream.writeAsText(String path, WriteMode writemode) for my
sink. The data is written to disk and there’s plenty of space available.

I looked deeper into the logs and found out that the jobs on 174 and 175
are not actually stuck, but they’re moving extremely slowly, This is an
excerpt from the task manager log on 175:

03:44:43,307 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
254 to read a 1000 lines
03:44:43,315 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
254 to read a 1000 lines
03:46:09,360 INFO  org.apache.zookeeper.ClientCnxn
      - Opening socket connection to server
192.168.200.173/192.168.200.173:2181. Will not attempt to au
thenticate using SASL (unknown error)
03:46:09,361 INFO  org.apache.zookeeper.ClientCnxn
      - Client session timed out, have not heard from server in 86223ms
for sessionid 0x25181a544860091,
 closing socket connection and attempting reconnect
03:46:09,362 WARN  
org.apache.flink.shaded.org.apache.curator.ConnectionState    - Connection
attempt unsuccessful after 86221 (greater than max timeout of 60000).
Resetting conne
ction and trying again with a new connection.
03:46:09,391 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86222 to read a 1000 lines
03:46:09,394 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86243 to read a 1000 lines
03:46:09,439 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86224 to read a 1000 lines
03:46:09,445 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86217 to read a 1000 lines
03:46:09,462 INFO  org.apache.zookeeper.ZooKeeper
      - Session: 0x25181a544860091 closed
03:46:09,462 INFO  org.apache.zookeeper.ZooKeeper
      - Initiating client connection,
connectString=192.168.200.173:2181,192.168.200.174:2181,192.168.20
0.175:2181 sessionTimeout=60000
watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@550a1967
03:46:09,462 INFO  org.apache.zookeeper.ClientCnxn
      - EventThread shut down
03:46:09,463 INFO  org.apache.zookeeper.ClientCnxn
      - Opening socket connection to server
192.168.200.174/192.168.200.174:2181. Will not attempt to au
thenticate using SASL (unknown error)
03:46:09,463 ERROR
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl  - Background operation retry gave up
org.apache.zookeeper.KeeperException$ConnectionLossException:
KeeperErrorCode = ConnectionLoss
        at
org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
        at
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708)
        at
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.performBackgroundOperation(CuratorFrameworkImpl.java:826)
        at
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
        at
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.access$300(CuratorFrameworkImpl.java:62)
        at
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl$4.call(CuratorFrameworkImpl.java:257)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
617)
        at java.lang.Thread.run(Thread.java:745)
03:46:09,464 ERROR
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl  - Background retry gave up
org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException:
KeeperErrorCode = ConnectionLoss
        at
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.performBackgroundOperation(CuratorFrameworkImpl.java:809)
        at
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
        at
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl.access$300(CuratorFrameworkImpl.java:62)
        at
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkI
mpl$4.call(CuratorFrameworkImpl.java:257)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
617)
        at java.lang.Thread.run(Thread.java:745)
03:46:09,464 INFO  org.apache.zookeeper.ClientCnxn
      - Socket connection established to
192.168.200.174/192.168.200.174:2181, initiating session
03:46:09,468 INFO  org.apache.zookeeper.ClientCnxn
      - Session establishment complete on server
192.168.200.174/192.168.200.174:2181, sessionid = 0x25181a544860094,
negotiated timeout = 40000
03:46:09,469 INFO  
org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateM
anager  - State change: RECONNECTED
03:46:09,475 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86212 to read a 1000 lines
03:46:09,523 INFO  
com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
86217 to read a 1000 lines



You’ll notice that at some point it takes 254 milliseconds to process a
1000 lines of input, and then it jumps 86 seconds!! And I also see some
zookeeper exceptions that lead me to believe that it’s a networking
problem. I have 4 VMs running on 4 different hosts, and connected via a
10G NIC.

Thanks,
Ali


On 2015-12-11, 11:23 AM, "Stephan Ewen" <[hidden email]> wrote:

>Hi Ali!
>
>I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not
>make progress, even do not recognize the end-of-stream point.
>
>I expect that the streams on 192.168.200.174 and 192.168.200.175 are
>back-pressured to a stand-still. Since no network is involved, the reason
>for the back pressure are probably the sinks.
>
>What kind of data sink are you using (in the addSink()) function?
>Can you check if that one starts to fully block on machines
>192.168.200.174 and 192.168.200.175 ?
>
>Greetings,
>Stephan
>
>
>
>On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <[hidden email]> wrote:
>
>> Hi Stephan,
>>
>> I got a request to share the image with someone and I assume it was you.
>> You should be able to see it now. This seems to be the main issue I have
>> at this time. I've tried running the job on the cluster with a
>>parallelism
>> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
>> working for a bit and then some of them just stop, I’m not sure if
>>they’re
>> stuck or not. Here’s another screenshot:
>> http://postimg.org/image/gr6ogxqjj/
>>
>> Two things you’ll notice:
>> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
>> anything at one point and only 192.168.200.173 is doing all the work.
>> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end
>>time
>> even though the job should be finished (the screenshot was taken after
>>the
>> source was closed).
>>
>> I’m not sure if this helps or not, but here are some properties from the
>> flink-conf.yaml:
>>
>> jobmanager.heap.mb: 8192
>> taskmanager.heap.mb: 49152
>> taskmanager.numberOfTaskSlots: 16
>> parallelism.default: 1
>>
>> state.backend: filesystem
>> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints
>>
>> taskmanager.network.numberOfBuffers: 3072
>>
>> recovery.mode: zookeeper
>> recovery.zookeeper.quorum:
>> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
>> recovery.zookeeper.storageDir: file:///tmp/zk-recovery
>> recovery.zookeeper.path.root: /opt/flink-0.10.0
>>
>> I appreciate all the help.
>>
>>
>> Thanks,
>> Ali
>>
>>
>> On 2015-12-10, 10:16 AM, "Stephan Ewen" <[hidden email]> wrote:
>>
>> >Hi Ali!
>> >
>> >Seems like the Google Doc has restricted access, I tells me I have no
>> >permission to view it...
>> >
>> >Stephan
>> >
>> >
>> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]>
>>wrote:
>> >
>> >> Hi Stephan,
>> >>
>> >> Here’s a link to the screenshot I tried to attach earlier:
>> >>
>> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>> >>
>> >> It looks to me like the distribution is fairly skewed across the
>>nodes,
>> >> even though they’re executing the same pipeline.
>> >>
>> >> Thanks,
>> >> Ali
>> >>
>> >>
>> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote:
>> >>
>> >> >Hi!
>> >> >
>> >> >The parallel socket source looks good.
>> >> >I think you forgot to attach the screenshot, or the mailing list
>> >>dropped
>> >> >the attachment...
>> >> >
>> >> >Not sure if I can diagnose that without more details. The sources
>>all
>> >>do
>> >> >the same. Assuming that the server distributes the data evenly
>>across
>> >>all
>> >> >connected sockets, and that the network bandwidth ends up being
>> >>divided in
>> >> >a fair way, all pipelines should run be similarly "eager".
>> >> >
>> >> >Greetings,
>> >> >Stephan
>> >> >
>> >> >
>> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]>
>> >>wrote:
>> >> >
>> >> >> Hi Stephan,
>> >> >>
>> >> >> That was my original understanding, until I realized that I was
>>not
>> >> >>using
>> >> >> a parallel socket source. I had a custom source that extended
>> >> >> SourceFunction which always runs with parallelism = 1. I looked
>> >>through
>> >> >> the API and found the ParallelSourceFunction interface so I
>> >>implemented
>> >> >> that and voila, now all 3 nodes in the cluster are actually
>>receiving
>> >> >> traffic on socket connections.
>> >> >>
>> >> >> Now that I’m running it successfully end to end, I’m trying to
>> >>improve
>> >> >>the
>> >> >> performance. Can you take a look at the attached screen shot and
>> >>tell me
>> >> >> if the distribution of work amongst the pipelines is normal? I
>>feel
>> >>like
>> >> >> some pipelines are lot lazier than others, even though the cluster
>> >>nodes
>> >> >> are exactly the same.
>> >> >>
>> >> >> By the way, here’s the class I wrote. It would be useful to have
>>this
>> >> >> available in Flink distro:
>> >> >>
>> >> >> public class ParallelSocketSource implements
>> >> >> ParallelSourceFunction<String> {
>> >> >>
>> >> >>         private static final long serialVersionUID =
>> >> >>-271094428915640892L;
>> >> >>         private static final Logger LOG =
>> >> >> LoggerFactory.getLogger(ParallelSocketSource.class);
>> >> >>
>> >> >>         private volatile boolean running = true;
>> >> >>         private String host;
>> >> >>         private int port;
>> >> >>
>> >> >>         public ParallelSocketSource(String host, int port) {
>> >> >>                 this.host = host;
>> >> >>                 this.port = port;
>> >> >>         }
>> >> >>
>> >> >>         @Override
>> >> >>         public void run(SourceContext<String> ctx) throws
>>Exception {
>> >> >>                 try (Socket socket = new Socket(host, port);
>> >> >>                         BufferedReader reader = new
>> >>BufferedReader(new
>> >> >> InputStreamReader(socket.getInputStream()))) {
>> >> >>                         String line  = null;
>> >> >>                         while(running && ((line =
>>reader.readLine())
>> >>!=
>> >> >> null)) {
>> >> >>                                 ctx.collect(line);
>> >> >>                         }
>> >> >>                 } catch(IOException ex) {
>> >> >>                         LOG.error("error reading from socket",
>>ex);
>> >> >>                 }
>> >> >>         }
>> >> >>
>> >> >>         @Override
>> >> >>         public void cancel() {
>> >> >>                 running = false;
>> >> >>         }
>> >> >> }
>> >> >>
>> >> >> Regards,
>> >> >> Ali
>> >> >>
>> >> >>
>> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote:
>> >> >>
>> >> >> >Hi Ali!
>> >> >> >
>> >> >> >In the case you have, the sequence of source-map-filter ...
>>forms a
>> >> >> >pipeline.
>> >> >> >
>> >> >> >You mentioned that you set the parallelism to 16, so there should
>> >>be 16
>> >> >> >pipelines. These pipelines should be completely independent.
>> >> >> >
>> >> >> >Looking at the way the scheduler is implemented, independent
>> >>pipelines
>> >> >> >should be spread across machines. But when you execute that in
>> >> >>parallel,
>> >> >> >you say all 16 pipelines end up on the same machine?
>> >> >> >
>> >> >> >Can you share with us the rough code of your program? Or a
>> >>Screenshot
>> >> >>from
>> >> >> >the runtime dashboard that shows the program graph?
>> >> >> >
>> >> >> >
>> >> >> >If your cluster is basically for that one job only, you could try
>> >>and
>> >> >>set
>> >> >> >the number of slots to 4 for each machine. Then you have 16
>>slots in
>> >> >>total
>> >> >> >and each node would run one of the 16 pipelines.
>> >> >> >
>> >> >> >
>> >> >> >Greetings,
>> >> >> >Stephan
>> >> >> >
>> >> >> >
>> >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali
>><[hidden email]>
>> >> >>wrote:
>> >> >> >
>> >> >> >> There is no shuffle operation in my flow. Mine actually looks
>>like
>> >> >>this:
>> >> >> >>
>> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map ->
>>Map ->
>> >> >>Map
>> >> >> >>->
>> >> >> >> Map, Filter)
>> >> >> >>
>> >> >> >>
>> >> >> >> Maybe it’s treating this whole flow as one pipeline and
>>assigning
>> >>it
>> >> >>to
>> >> >> >>a
>> >> >> >> slot. What I really wanted was to have the custom source I
>>built
>> >>to
>> >> >>have
>> >> >> >> running instances on all nodes. I’m not really sure if that’s
>>the
>> >> >>right
>> >> >> >> approach, but if we could add this as a feature that’d be
>>great,
>> >> >>since
>> >> >> >> having more than one node running the same pipeline guarantees
>>the
>> >> >> >> pipeline is never offline.
>> >> >> >>
>> >> >> >> -Ali
>> >> >> >>
>> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]>
>> >> wrote:
>> >> >> >>
>> >> >> >> >If I'm not mistaken, then the scheduler has already a
>>preference
>> >>to
>> >> >> >>spread
>> >> >> >> >independent pipelines out across the cluster. At least he
>>uses a
>> >> >>queue
>> >> >> >>of
>> >> >> >> >instances from which it pops the first element if it
>>allocates a
>> >>new
>> >> >> >>slot.
>> >> >> >> >This instance is then appended to the queue again, if it has
>>some
>> >> >> >> >resources
>> >> >> >> >(slots) left.
>> >> >> >> >
>> >> >> >> >I would assume that you have a shuffle operation involved in
>>your
>> >> >>job
>> >> >> >>such
>> >> >> >> >that it makes sense for the scheduler to deploy all pipelines
>>to
>> >>the
>> >> >> >>same
>> >> >> >> >machine.
>> >> >> >> >
>> >> >> >> >Cheers,
>> >> >> >> >Till
>> >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]>
>>wrote:
>> >> >> >> >
>> >> >> >> >> Slots are like "resource groups" which execute entire
>> >>pipelines.
>> >> >>They
>> >> >> >> >> frequently have more than one operator.
>> >> >> >> >>
>> >> >> >> >> What you can try as a workaround is decrease the number of
>> >>slots
>> >> >>per
>> >> >> >> >> machine to cause the operators to be spread across more
>> >>machines.
>> >> >> >> >>
>> >> >> >> >> If this is a crucial issue for your use case, it should be
>> >>simple
>> >> >>to
>> >> >> >> >>add a
>> >> >> >> >> "preference to spread out" to the scheduler...
>> >> >> >> >>
>> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali
>> >><[hidden email]
>> >> >
>> >> >> >> >>wrote:
>> >> >> >> >>
>> >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e.
>> >>Make
>> >> >> >>sure
>> >> >> >> >>the
>> >> >> >> >> > parallel instances of the task are distributed across the
>> >> >>cluster.
>> >> >> >> >>When I
>> >> >> >> >> > run my flink job with a parallelism of 16, all the
>>parallel
>> >> >>tasks
>> >> >> >>are
>> >> >> >> >> > assigned to the first task manager.
>> >> >> >> >> >
>> >> >> >> >> > - Ali
>> >> >> >> >> >
>> >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]>
>> wrote:
>> >> >> >> >> >
>> >> >> >> >> > >
>> >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali
>> >><[hidden email]>
>> >> >> >> wrote:
>> >> >> >> >> > >> Do the parallel instances of each task get distributed
>> >>across
>> >> >> >>the
>> >> >> >> >> > >>cluster or is it possible that they all run on the same
>> >>node?
>> >> >> >> >> > >
>> >> >> >> >> > >Yes, slots are requested from all nodes of the cluster.
>>But
>> >> >>keep
>> >> >> >>in
>> >> >> >> >>mind
>> >> >> >> >> > >that multiple tasks (forming a local pipeline) can be
>> >> >>scheduled to
>> >> >> >> >>the
>> >> >> >> >> > >same slot (1 slot can hold many tasks).
>> >> >> >> >> > >
>> >> >> >> >> > >Have you seen this?
>> >> >> >> >> > >
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >>
>> >> >>
>> >>
>> >>
>>
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
>> >> >> >> >>b
>> >> >> >> >> > >_scheduling.html
>> >> >> >> >> > >
>> >> >> >> >> > >> If they can all run on the same node, what happens when
>> >>that
>> >> >> >>node
>> >> >> >> >> > >>crashes? Does the job manager recreate them using the
>> >> >>remaining
>> >> >> >>open
>> >> >> >> >> > >>slots?
>> >> >> >> >> > >
>> >> >> >> >> > >What happens: The job manager tries to restart the
>>program
>> >>with
>> >> >> >>the
>> >> >> >> >>same
>> >> >> >> >> > >parallelism. Thus if you have enough free slots
>>available in
>> >> >>your
>> >> >> >> >> > >cluster, this works smoothly (so yes, the
>> >>remaining/available
>> >> >> >>slots
>> >> >> >> >>are
>> >> >> >> >> > >used)
>> >> >> >> >> > >
>> >> >> >> >> > >With a YARN cluster the task manager containers are
>> >>restarted
>> >> >> >> >> > >automatically. In standalone mode, you have to take care
>>of
>> >> >>this
>> >> >> >> >> yourself.
>> >> >> >> >> > >
>> >> >> >> >> > >
>> >> >> >> >> > >Does this help?
>> >> >> >> >> > >
>> >> >> >> >> > >­ Ufuk
>> >> >> >> >> > >
>> >> >> >> >> >
>> >> >> >> >> >
>> >> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> >>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Task Parallelism in a Cluster

Kashmar, Ali
Hi Stephan,

I figured it out. The problem was that the date/time was different on all
3 nodes. Zookeeper thought that it hadn’t heard from the other nodes for
longer than the allowed period and dropped them, therefore causing the
other two task managers in the cluster to fail. I synchronized the time
between the 3 nodes and reran the test. It’s running very smoothly now.

Thanks again for your help.

-Ali

On 2015-12-11, 12:03 PM, "Kashmar, Ali" <[hidden email]> wrote:

>Hi Stephan,
>
>I’m using DataStream.writeAsText(String path, WriteMode writemode) for my
>sink. The data is written to disk and there’s plenty of space available.
>
>I looked deeper into the logs and found out that the jobs on 174 and 175
>are not actually stuck, but they’re moving extremely slowly, This is an
>excerpt from the task manager log on 175:
>
>03:44:43,307 INFO
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>254 to read a 1000 lines
>03:44:43,315 INFO
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>254 to read a 1000 lines
>03:46:09,360 INFO  org.apache.zookeeper.ClientCnxn
>      - Opening socket connection to server
>192.168.200.173/192.168.200.173:2181. Will not attempt to au
>thenticate using SASL (unknown error)
>03:46:09,361 INFO  org.apache.zookeeper.ClientCnxn
>      - Client session timed out, have not heard from server in 86223ms
>for sessionid 0x25181a544860091,
> closing socket connection and attempting reconnect
>03:46:09,362 WARN
>org.apache.flink.shaded.org.apache.curator.ConnectionState    - Connection
>attempt unsuccessful after 86221 (greater than max timeout of 60000).
>Resetting conne
>ction and trying again with a new connection.
>03:46:09,391 INFO
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86222 to read a 1000 lines
>03:46:09,394 INFO
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86243 to read a 1000 lines
>03:46:09,439 INFO
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86224 to read a 1000 lines
>03:46:09,445 INFO
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86217 to read a 1000 lines
>03:46:09,462 INFO  org.apache.zookeeper.ZooKeeper
>      - Session: 0x25181a544860091 closed
>03:46:09,462 INFO  org.apache.zookeeper.ZooKeeper
>      - Initiating client connection,
>connectString=192.168.200.173:2181,192.168.200.174:2181,192.168.20
>0.175:2181 sessionTimeout=60000
>watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@550a196
>7
>03:46:09,462 INFO  org.apache.zookeeper.ClientCnxn
>      - EventThread shut down
>03:46:09,463 INFO  org.apache.zookeeper.ClientCnxn
>      - Opening socket connection to server
>192.168.200.174/192.168.200.174:2181. Will not attempt to au
>thenticate using SASL (unknown error)
>03:46:09,463 ERROR
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl  - Background operation retry gave up
>org.apache.zookeeper.KeeperException$ConnectionLossException:
>KeeperErrorCode = ConnectionLoss
>        at
>org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
>        at
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708)
>        at
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.performBackgroundOperation(CuratorFrameworkImpl.java:826)
>        at
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
>        at
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.access$300(CuratorFrameworkImpl.java:62)
>        at
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl$4.call(CuratorFrameworkImpl.java:257)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        at
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
>1
>142)
>        at
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
>:
>617)
>        at java.lang.Thread.run(Thread.java:745)
>03:46:09,464 ERROR
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl  - Background retry gave up
>org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException:
>KeeperErrorCode = ConnectionLoss
>        at
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.performBackgroundOperation(CuratorFrameworkImpl.java:809)
>        at
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
>        at
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl.access$300(CuratorFrameworkImpl.java:62)
>        at
>org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFramework
>I
>mpl$4.call(CuratorFrameworkImpl.java:257)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>        at
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
>1
>142)
>        at
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
>:
>617)
>        at java.lang.Thread.run(Thread.java:745)
>03:46:09,464 INFO  org.apache.zookeeper.ClientCnxn
>      - Socket connection established to
>192.168.200.174/192.168.200.174:2181, initiating session
>03:46:09,468 INFO  org.apache.zookeeper.ClientCnxn
>      - Session establishment complete on server
>192.168.200.174/192.168.200.174:2181, sessionid = 0x25181a544860094,
>negotiated timeout = 40000
>03:46:09,469 INFO
>org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionState
>M
>anager  - State change: RECONNECTED
>03:46:09,475 INFO
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86212 to read a 1000 lines
>03:46:09,523 INFO
>com.emc.ngen.analytics.flink.source.ParallelSocketSource      - It took
>86217 to read a 1000 lines
>
>
>
>You’ll notice that at some point it takes 254 milliseconds to process a
>1000 lines of input, and then it jumps 86 seconds!! And I also see some
>zookeeper exceptions that lead me to believe that it’s a networking
>problem. I have 4 VMs running on 4 different hosts, and connected via a
>10G NIC.
>
>Thanks,
>Ali
>
>
>On 2015-12-11, 11:23 AM, "Stephan Ewen" <[hidden email]> wrote:
>
>>Hi Ali!
>>
>>I see, so the tasks 192.168.200.174 and 192.168.200.175 apparently do not
>>make progress, even do not recognize the end-of-stream point.
>>
>>I expect that the streams on 192.168.200.174 and 192.168.200.175 are
>>back-pressured to a stand-still. Since no network is involved, the reason
>>for the back pressure are probably the sinks.
>>
>>What kind of data sink are you using (in the addSink()) function?
>>Can you check if that one starts to fully block on machines
>>192.168.200.174 and 192.168.200.175 ?
>>
>>Greetings,
>>Stephan
>>
>>
>>
>>On Fri, Dec 11, 2015 at 4:50 PM, Kashmar, Ali <[hidden email]>
>>wrote:
>>
>>> Hi Stephan,
>>>
>>> I got a request to share the image with someone and I assume it was
>>>you.
>>> You should be able to see it now. This seems to be the main issue I
>>>have
>>> at this time. I've tried running the job on the cluster with a
>>>parallelism
>>> of 16, 24, 36, and even went up to 48. I see all the parallel pipelines
>>> working for a bit and then some of them just stop, I’m not sure if
>>>they’re
>>> stuck or not. Here’s another screenshot:
>>> http://postimg.org/image/gr6ogxqjj/
>>>
>>> Two things you’ll notice:
>>> 1. Pipelines on 192.168.200.174 and 192.168.200.175 have stopped doing
>>> anything at one point and only 192.168.200.173 is doing all the work.
>>> 2. Pipelines on 192.168.200.174 and 192.168.200.175 don’t have an end
>>>time
>>> even though the job should be finished (the screenshot was taken after
>>>the
>>> source was closed).
>>>
>>> I’m not sure if this helps or not, but here are some properties from
>>>the
>>> flink-conf.yaml:
>>>
>>> jobmanager.heap.mb: 8192
>>> taskmanager.heap.mb: 49152
>>> taskmanager.numberOfTaskSlots: 16
>>> parallelism.default: 1
>>>
>>> state.backend: filesystem
>>> state.backend.fs.checkpointdir: file:///tmp/flink-checkpoints
>>>
>>> taskmanager.network.numberOfBuffers: 3072
>>>
>>> recovery.mode: zookeeper
>>> recovery.zookeeper.quorum:
>>> 192.168.200.173:2181,192.168.200.174:2181,192.168.200.175:2181
>>> recovery.zookeeper.storageDir: file:///tmp/zk-recovery
>>> recovery.zookeeper.path.root: /opt/flink-0.10.0
>>>
>>> I appreciate all the help.
>>>
>>>
>>> Thanks,
>>> Ali
>>>
>>>
>>> On 2015-12-10, 10:16 AM, "Stephan Ewen" <[hidden email]> wrote:
>>>
>>> >Hi Ali!
>>> >
>>> >Seems like the Google Doc has restricted access, I tells me I have no
>>> >permission to view it...
>>> >
>>> >Stephan
>>> >
>>> >
>>> >On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <[hidden email]>
>>>wrote:
>>> >
>>> >> Hi Stephan,
>>> >>
>>> >> Here’s a link to the screenshot I tried to attach earlier:
>>> >>
>>> >> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>>> >>
>>> >> It looks to me like the distribution is fairly skewed across the
>>>nodes,
>>> >> even though they’re executing the same pipeline.
>>> >>
>>> >> Thanks,
>>> >> Ali
>>> >>
>>> >>
>>> >> On 2015-12-09, 12:36 PM, "Stephan Ewen" <[hidden email]> wrote:
>>> >>
>>> >> >Hi!
>>> >> >
>>> >> >The parallel socket source looks good.
>>> >> >I think you forgot to attach the screenshot, or the mailing list
>>> >>dropped
>>> >> >the attachment...
>>> >> >
>>> >> >Not sure if I can diagnose that without more details. The sources
>>>all
>>> >>do
>>> >> >the same. Assuming that the server distributes the data evenly
>>>across
>>> >>all
>>> >> >connected sockets, and that the network bandwidth ends up being
>>> >>divided in
>>> >> >a fair way, all pipelines should run be similarly "eager".
>>> >> >
>>> >> >Greetings,
>>> >> >Stephan
>>> >> >
>>> >> >
>>> >> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <[hidden email]>
>>> >>wrote:
>>> >> >
>>> >> >> Hi Stephan,
>>> >> >>
>>> >> >> That was my original understanding, until I realized that I was
>>>not
>>> >> >>using
>>> >> >> a parallel socket source. I had a custom source that extended
>>> >> >> SourceFunction which always runs with parallelism = 1. I looked
>>> >>through
>>> >> >> the API and found the ParallelSourceFunction interface so I
>>> >>implemented
>>> >> >> that and voila, now all 3 nodes in the cluster are actually
>>>receiving
>>> >> >> traffic on socket connections.
>>> >> >>
>>> >> >> Now that I’m running it successfully end to end, I’m trying to
>>> >>improve
>>> >> >>the
>>> >> >> performance. Can you take a look at the attached screen shot and
>>> >>tell me
>>> >> >> if the distribution of work amongst the pipelines is normal? I
>>>feel
>>> >>like
>>> >> >> some pipelines are lot lazier than others, even though the
>>>cluster
>>> >>nodes
>>> >> >> are exactly the same.
>>> >> >>
>>> >> >> By the way, here’s the class I wrote. It would be useful to have
>>>this
>>> >> >> available in Flink distro:
>>> >> >>
>>> >> >> public class ParallelSocketSource implements
>>> >> >> ParallelSourceFunction<String> {
>>> >> >>
>>> >> >>         private static final long serialVersionUID =
>>> >> >>-271094428915640892L;
>>> >> >>         private static final Logger LOG =
>>> >> >> LoggerFactory.getLogger(ParallelSocketSource.class);
>>> >> >>
>>> >> >>         private volatile boolean running = true;
>>> >> >>         private String host;
>>> >> >>         private int port;
>>> >> >>
>>> >> >>         public ParallelSocketSource(String host, int port) {
>>> >> >>                 this.host = host;
>>> >> >>                 this.port = port;
>>> >> >>         }
>>> >> >>
>>> >> >>         @Override
>>> >> >>         public void run(SourceContext<String> ctx) throws
>>>Exception {
>>> >> >>                 try (Socket socket = new Socket(host, port);
>>> >> >>                         BufferedReader reader = new
>>> >>BufferedReader(new
>>> >> >> InputStreamReader(socket.getInputStream()))) {
>>> >> >>                         String line  = null;
>>> >> >>                         while(running && ((line =
>>>reader.readLine())
>>> >>!=
>>> >> >> null)) {
>>> >> >>                                 ctx.collect(line);
>>> >> >>                         }
>>> >> >>                 } catch(IOException ex) {
>>> >> >>                         LOG.error("error reading from socket",
>>>ex);
>>> >> >>                 }
>>> >> >>         }
>>> >> >>
>>> >> >>         @Override
>>> >> >>         public void cancel() {
>>> >> >>                 running = false;
>>> >> >>         }
>>> >> >> }
>>> >> >>
>>> >> >> Regards,
>>> >> >> Ali
>>> >> >>
>>> >> >>
>>> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <[hidden email]> wrote:
>>> >> >>
>>> >> >> >Hi Ali!
>>> >> >> >
>>> >> >> >In the case you have, the sequence of source-map-filter ...
>>>forms a
>>> >> >> >pipeline.
>>> >> >> >
>>> >> >> >You mentioned that you set the parallelism to 16, so there
>>>should
>>> >>be 16
>>> >> >> >pipelines. These pipelines should be completely independent.
>>> >> >> >
>>> >> >> >Looking at the way the scheduler is implemented, independent
>>> >>pipelines
>>> >> >> >should be spread across machines. But when you execute that in
>>> >> >>parallel,
>>> >> >> >you say all 16 pipelines end up on the same machine?
>>> >> >> >
>>> >> >> >Can you share with us the rough code of your program? Or a
>>> >>Screenshot
>>> >> >>from
>>> >> >> >the runtime dashboard that shows the program graph?
>>> >> >> >
>>> >> >> >
>>> >> >> >If your cluster is basically for that one job only, you could
>>>try
>>> >>and
>>> >> >>set
>>> >> >> >the number of slots to 4 for each machine. Then you have 16
>>>slots in
>>> >> >>total
>>> >> >> >and each node would run one of the 16 pipelines.
>>> >> >> >
>>> >> >> >
>>> >> >> >Greetings,
>>> >> >> >Stephan
>>> >> >> >
>>> >> >> >
>>> >> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali
>>><[hidden email]>
>>> >> >>wrote:
>>> >> >> >
>>> >> >> >> There is no shuffle operation in my flow. Mine actually looks
>>>like
>>> >> >>this:
>>> >> >> >>
>>> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map ->
>>>Map ->
>>> >> >>Map
>>> >> >> >>->
>>> >> >> >> Map, Filter)
>>> >> >> >>
>>> >> >> >>
>>> >> >> >> Maybe it’s treating this whole flow as one pipeline and
>>>assigning
>>> >>it
>>> >> >>to
>>> >> >> >>a
>>> >> >> >> slot. What I really wanted was to have the custom source I
>>>built
>>> >>to
>>> >> >>have
>>> >> >> >> running instances on all nodes. I’m not really sure if that’s
>>>the
>>> >> >>right
>>> >> >> >> approach, but if we could add this as a feature that’d be
>>>great,
>>> >> >>since
>>> >> >> >> having more than one node running the same pipeline guarantees
>>>the
>>> >> >> >> pipeline is never offline.
>>> >> >> >>
>>> >> >> >> -Ali
>>> >> >> >>
>>> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <[hidden email]>
>>> >> wrote:
>>> >> >> >>
>>> >> >> >> >If I'm not mistaken, then the scheduler has already a
>>>preference
>>> >>to
>>> >> >> >>spread
>>> >> >> >> >independent pipelines out across the cluster. At least he
>>>uses a
>>> >> >>queue
>>> >> >> >>of
>>> >> >> >> >instances from which it pops the first element if it
>>>allocates a
>>> >>new
>>> >> >> >>slot.
>>> >> >> >> >This instance is then appended to the queue again, if it has
>>>some
>>> >> >> >> >resources
>>> >> >> >> >(slots) left.
>>> >> >> >> >
>>> >> >> >> >I would assume that you have a shuffle operation involved in
>>>your
>>> >> >>job
>>> >> >> >>such
>>> >> >> >> >that it makes sense for the scheduler to deploy all pipelines
>>>to
>>> >>the
>>> >> >> >>same
>>> >> >> >> >machine.
>>> >> >> >> >
>>> >> >> >> >Cheers,
>>> >> >> >> >Till
>>> >> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <[hidden email]>
>>>wrote:
>>> >> >> >> >
>>> >> >> >> >> Slots are like "resource groups" which execute entire
>>> >>pipelines.
>>> >> >>They
>>> >> >> >> >> frequently have more than one operator.
>>> >> >> >> >>
>>> >> >> >> >> What you can try as a workaround is decrease the number of
>>> >>slots
>>> >> >>per
>>> >> >> >> >> machine to cause the operators to be spread across more
>>> >>machines.
>>> >> >> >> >>
>>> >> >> >> >> If this is a crucial issue for your use case, it should be
>>> >>simple
>>> >> >>to
>>> >> >> >> >>add a
>>> >> >> >> >> "preference to spread out" to the scheduler...
>>> >> >> >> >>
>>> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali
>>> >><[hidden email]
>>> >> >
>>> >> >> >> >>wrote:
>>> >> >> >> >>
>>> >> >> >> >> > Is there a way to make a task cluster-parallelizable?
>>>I.e.
>>> >>Make
>>> >> >> >>sure
>>> >> >> >> >>the
>>> >> >> >> >> > parallel instances of the task are distributed across the
>>> >> >>cluster.
>>> >> >> >> >>When I
>>> >> >> >> >> > run my flink job with a parallelism of 16, all the
>>>parallel
>>> >> >>tasks
>>> >> >> >>are
>>> >> >> >> >> > assigned to the first task manager.
>>> >> >> >> >> >
>>> >> >> >> >> > - Ali
>>> >> >> >> >> >
>>> >> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <[hidden email]>
>>> wrote:
>>> >> >> >> >> >
>>> >> >> >> >> > >
>>> >> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali
>>> >><[hidden email]>
>>> >> >> >> wrote:
>>> >> >> >> >> > >> Do the parallel instances of each task get distributed
>>> >>across
>>> >> >> >>the
>>> >> >> >> >> > >>cluster or is it possible that they all run on the same
>>> >>node?
>>> >> >> >> >> > >
>>> >> >> >> >> > >Yes, slots are requested from all nodes of the cluster.
>>>But
>>> >> >>keep
>>> >> >> >>in
>>> >> >> >> >>mind
>>> >> >> >> >> > >that multiple tasks (forming a local pipeline) can be
>>> >> >>scheduled to
>>> >> >> >> >>the
>>> >> >> >> >> > >same slot (1 slot can hold many tasks).
>>> >> >> >> >> > >
>>> >> >> >> >> > >Have you seen this?
>>> >> >> >> >> > >
>>> >> >> >> >> >
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >>
>>> >> >> >>
>>> >> >>
>>> >> >>
>>> >>
>>> >>
>>>
>>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/j
>>>o
>>> >> >> >> >>b
>>> >> >> >> >> > >_scheduling.html
>>> >> >> >> >> > >
>>> >> >> >> >> > >> If they can all run on the same node, what happens
>>>when
>>> >>that
>>> >> >> >>node
>>> >> >> >> >> > >>crashes? Does the job manager recreate them using the
>>> >> >>remaining
>>> >> >> >>open
>>> >> >> >> >> > >>slots?
>>> >> >> >> >> > >
>>> >> >> >> >> > >What happens: The job manager tries to restart the
>>>program
>>> >>with
>>> >> >> >>the
>>> >> >> >> >>same
>>> >> >> >> >> > >parallelism. Thus if you have enough free slots
>>>available in
>>> >> >>your
>>> >> >> >> >> > >cluster, this works smoothly (so yes, the
>>> >>remaining/available
>>> >> >> >>slots
>>> >> >> >> >>are
>>> >> >> >> >> > >used)
>>> >> >> >> >> > >
>>> >> >> >> >> > >With a YARN cluster the task manager containers are
>>> >>restarted
>>> >> >> >> >> > >automatically. In standalone mode, you have to take care
>>>of
>>> >> >>this
>>> >> >> >> >> yourself.
>>> >> >> >> >> > >
>>> >> >> >> >> > >
>>> >> >> >> >> > >Does this help?
>>> >> >> >> >> > >
>>> >> >> >> >> > >­ Ufuk
>>> >> >> >> >> > >
>>> >> >> >> >> >
>>> >> >> >> >> >
>>> >> >> >> >>
>>> >> >> >>
>>> >> >> >>
>>> >> >>
>>> >>
>>> >>
>>>
>>>
>