Data locality and scheduler

classic Classic list List threaded Threaded
6 messages Options
CPC
Reply | Threaded
Open this post in threaded view
|

Data locality and scheduler

CPC
Hi,

I look at some scheduler documentations but could not find answer to my
question. My question is: suppose that i have a big file on 40 node hadoop
cluster and since it is a big file every node has at least one chunk of the
file. If i write a flink job and want to filter file and if job has
parelelism of 4(less that 40 actually) how datalocality is working? Does
some tasks read some chunks from remote nodes? Or scheduler schedule tasks
in way that keeping max paralelism at 4 but schedule tasks on every node?

Regards
Reply | Threaded
Open this post in threaded view
|

Re: Data locality and scheduler

Fabian Hueske-2
Hi,

Flink starts four tasks and then lazily assigns input splits to these tasks
with locality preference. So each task may consume more than one split.
This is different from Hadoop MapReduce or Spark which schedule a new task
for each input split.
In your case, the four tasks would be scheduled to four of the 40 machines
and most of the splits will be remotely read.

Best, Fabian


2016-04-26 16:59 GMT+02:00 CPC <[hidden email]>:

> Hi,
>
> I look at some scheduler documentations but could not find answer to my
> question. My question is: suppose that i have a big file on 40 node hadoop
> cluster and since it is a big file every node has at least one chunk of the
> file. If i write a flink job and want to filter file and if job has
> parelelism of 4(less that 40 actually) how datalocality is working? Does
> some tasks read some chunks from remote nodes? Or scheduler schedule tasks
> in way that keeping max paralelism at 4 but schedule tasks on every node?
>
> Regards
>
CPC
Reply | Threaded
Open this post in threaded view
|

Re: Data locality and scheduler

CPC
Hi

But isnt this behaviour can cause a lot of network activity? Is there any
roadmap or plan to change this behaviour?
On Apr 26, 2016 7:06 PM, "Fabian Hueske" <[hidden email]> wrote:

> Hi,
>
> Flink starts four tasks and then lazily assigns input splits to these tasks
> with locality preference. So each task may consume more than one split.
> This is different from Hadoop MapReduce or Spark which schedule a new task
> for each input split.
> In your case, the four tasks would be scheduled to four of the 40 machines
> and most of the splits will be remotely read.
>
> Best, Fabian
>
>
> 2016-04-26 16:59 GMT+02:00 CPC <[hidden email]>:
>
> > Hi,
> >
> > I look at some scheduler documentations but could not find answer to my
> > question. My question is: suppose that i have a big file on 40 node
> hadoop
> > cluster and since it is a big file every node has at least one chunk of
> the
> > file. If i write a flink job and want to filter file and if job has
> > parelelism of 4(less that 40 actually) how datalocality is working? Does
> > some tasks read some chunks from remote nodes? Or scheduler schedule
> tasks
> > in way that keeping max paralelism at 4 but schedule tasks on every node?
> >
> > Regards
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Data locality and scheduler

Fabian Hueske-2
Hi,

yes, that can cause network traffic.
AFAIK, there are no plans to work on behavior.

Best, Fabian

2016-04-26 18:17 GMT+02:00 CPC <[hidden email]>:

> Hi
>
> But isnt this behaviour can cause a lot of network activity? Is there any
> roadmap or plan to change this behaviour?
> On Apr 26, 2016 7:06 PM, "Fabian Hueske" <[hidden email]> wrote:
>
> > Hi,
> >
> > Flink starts four tasks and then lazily assigns input splits to these
> tasks
> > with locality preference. So each task may consume more than one split.
> > This is different from Hadoop MapReduce or Spark which schedule a new
> task
> > for each input split.
> > In your case, the four tasks would be scheduled to four of the 40
> machines
> > and most of the splits will be remotely read.
> >
> > Best, Fabian
> >
> >
> > 2016-04-26 16:59 GMT+02:00 CPC <[hidden email]>:
> >
> > > Hi,
> > >
> > > I look at some scheduler documentations but could not find answer to my
> > > question. My question is: suppose that i have a big file on 40 node
> > hadoop
> > > cluster and since it is a big file every node has at least one chunk of
> > the
> > > file. If i write a flink job and want to filter file and if job has
> > > parelelism of 4(less that 40 actually) how datalocality is working?
> Does
> > > some tasks read some chunks from remote nodes? Or scheduler schedule
> > tasks
> > > in way that keeping max paralelism at 4 but schedule tasks on every
> node?
> > >
> > > Regards
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

RE: Data locality and scheduler

Eron Wright
My understanding of Flink on YARN is that the task managers are eagerly acquired when the session is started, before any job information (which could be mined for locality hints) is available.   This makes sense because the session could be reused for other jobs.  But this approach leads to poor locality as mentioned.

A sophisticated alternative would be to affinitize TaskManagers to a specific job, acquired lazily, leveraging locality info gleaned from the data sources/sinks.   A hybrid solution could be used to counteract job latency.    

-Eron


> From: [hidden email]
> Date: Thu, 28 Apr 2016 11:59:28 +0200
> Subject: Re: Data locality and scheduler
> To: [hidden email]
>
> Hi,
>
> yes, that can cause network traffic.
> AFAIK, there are no plans to work on behavior.
>
> Best, Fabian
>
> 2016-04-26 18:17 GMT+02:00 CPC :
>
>> Hi
>>
>> But isnt this behaviour can cause a lot of network activity? Is there any
>> roadmap or plan to change this behaviour?
>> On Apr 26, 2016 7:06 PM, "Fabian Hueske"  wrote:
>>
>>> Hi,
>>>
>>> Flink starts four tasks and then lazily assigns input splits to these
>> tasks
>>> with locality preference. So each task may consume more than one split.
>>> This is different from Hadoop MapReduce or Spark which schedule a new
>> task
>>> for each input split.
>>> In your case, the four tasks would be scheduled to four of the 40
>> machines
>>> and most of the splits will be remotely read.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2016-04-26 16:59 GMT+02:00 CPC :
>>>
>>>> Hi,
>>>>
>>>> I look at some scheduler documentations but could not find answer to my
>>>> question. My question is: suppose that i have a big file on 40 node
>>> hadoop
>>>> cluster and since it is a big file every node has at least one chunk of
>>> the
>>>> file. If i write a flink job and want to filter file and if job has
>>>> parelelism of 4(less that 40 actually) how datalocality is working?
>> Does
>>>> some tasks read some chunks from remote nodes? Or scheduler schedule
>>> tasks
>>>> in way that keeping max paralelism at 4 but schedule tasks on every
>> node?
>>>>
>>>> Regards
>>>>
>>>
>>
     
Reply | Threaded
Open this post in threaded view
|

Re: Data locality and scheduler

Fabian Hueske-2
The lazy split assignment design goes back to the very early days of Flink
and has not been changed since.
It was initially motivated by the pipelined data exchange of Flink which
was easier to realize with a fixed set of source tasks which request file
splits than with new tasks being spawned for every input split.

I agree that is makes sense to think about an alternative or hybrid
strategy to address the locality issue.

2016-05-01 4:56 GMT+02:00 Eron Wright <[hidden email]>:

> My understanding of Flink on YARN is that the task managers are eagerly
> acquired when the session is started, before any job information (which
> could be mined for locality hints) is available.   This makes sense because
> the session could be reused for other jobs.  But this approach leads to
> poor locality as mentioned.
>
> A sophisticated alternative would be to affinitize TaskManagers to a
> specific job, acquired lazily, leveraging locality info gleaned from the
> data sources/sinks.   A hybrid solution could be used to counteract job
> latency.
>
> -Eron
>
>
> > From: [hidden email]
> > Date: Thu, 28 Apr 2016 11:59:28 +0200
> > Subject: Re: Data locality and scheduler
> > To: [hidden email]
> >
> > Hi,
> >
> > yes, that can cause network traffic.
> > AFAIK, there are no plans to work on behavior.
> >
> > Best, Fabian
> >
> > 2016-04-26 18:17 GMT+02:00 CPC :
> >
> >> Hi
> >>
> >> But isnt this behaviour can cause a lot of network activity? Is there
> any
> >> roadmap or plan to change this behaviour?
> >> On Apr 26, 2016 7:06 PM, "Fabian Hueske"  wrote:
> >>
> >>> Hi,
> >>>
> >>> Flink starts four tasks and then lazily assigns input splits to these
> >> tasks
> >>> with locality preference. So each task may consume more than one split.
> >>> This is different from Hadoop MapReduce or Spark which schedule a new
> >> task
> >>> for each input split.
> >>> In your case, the four tasks would be scheduled to four of the 40
> >> machines
> >>> and most of the splits will be remotely read.
> >>>
> >>> Best, Fabian
> >>>
> >>>
> >>> 2016-04-26 16:59 GMT+02:00 CPC :
> >>>
> >>>> Hi,
> >>>>
> >>>> I look at some scheduler documentations but could not find answer to
> my
> >>>> question. My question is: suppose that i have a big file on 40 node
> >>> hadoop
> >>>> cluster and since it is a big file every node has at least one chunk
> of
> >>> the
> >>>> file. If i write a flink job and want to filter file and if job has
> >>>> parelelism of 4(less that 40 actually) how datalocality is working?
> >> Does
> >>>> some tasks read some chunks from remote nodes? Or scheduler schedule
> >>> tasks
> >>>> in way that keeping max paralelism at 4 but schedule tasks on every
> >> node?
> >>>>
> >>>> Regards
> >>>>
> >>>
> >>
>
>