Hi,
I look at some scheduler documentations but could not find answer to my question. My question is: suppose that i have a big file on 40 node hadoop cluster and since it is a big file every node has at least one chunk of the file. If i write a flink job and want to filter file and if job has parelelism of 4(less that 40 actually) how datalocality is working? Does some tasks read some chunks from remote nodes? Or scheduler schedule tasks in way that keeping max paralelism at 4 but schedule tasks on every node? Regards |
Hi,
Flink starts four tasks and then lazily assigns input splits to these tasks with locality preference. So each task may consume more than one split. This is different from Hadoop MapReduce or Spark which schedule a new task for each input split. In your case, the four tasks would be scheduled to four of the 40 machines and most of the splits will be remotely read. Best, Fabian 2016-04-26 16:59 GMT+02:00 CPC <[hidden email]>: > Hi, > > I look at some scheduler documentations but could not find answer to my > question. My question is: suppose that i have a big file on 40 node hadoop > cluster and since it is a big file every node has at least one chunk of the > file. If i write a flink job and want to filter file and if job has > parelelism of 4(less that 40 actually) how datalocality is working? Does > some tasks read some chunks from remote nodes? Or scheduler schedule tasks > in way that keeping max paralelism at 4 but schedule tasks on every node? > > Regards > |
Hi
But isnt this behaviour can cause a lot of network activity? Is there any roadmap or plan to change this behaviour? On Apr 26, 2016 7:06 PM, "Fabian Hueske" <[hidden email]> wrote: > Hi, > > Flink starts four tasks and then lazily assigns input splits to these tasks > with locality preference. So each task may consume more than one split. > This is different from Hadoop MapReduce or Spark which schedule a new task > for each input split. > In your case, the four tasks would be scheduled to four of the 40 machines > and most of the splits will be remotely read. > > Best, Fabian > > > 2016-04-26 16:59 GMT+02:00 CPC <[hidden email]>: > > > Hi, > > > > I look at some scheduler documentations but could not find answer to my > > question. My question is: suppose that i have a big file on 40 node > hadoop > > cluster and since it is a big file every node has at least one chunk of > the > > file. If i write a flink job and want to filter file and if job has > > parelelism of 4(less that 40 actually) how datalocality is working? Does > > some tasks read some chunks from remote nodes? Or scheduler schedule > tasks > > in way that keeping max paralelism at 4 but schedule tasks on every node? > > > > Regards > > > |
Hi,
yes, that can cause network traffic. AFAIK, there are no plans to work on behavior. Best, Fabian 2016-04-26 18:17 GMT+02:00 CPC <[hidden email]>: > Hi > > But isnt this behaviour can cause a lot of network activity? Is there any > roadmap or plan to change this behaviour? > On Apr 26, 2016 7:06 PM, "Fabian Hueske" <[hidden email]> wrote: > > > Hi, > > > > Flink starts four tasks and then lazily assigns input splits to these > tasks > > with locality preference. So each task may consume more than one split. > > This is different from Hadoop MapReduce or Spark which schedule a new > task > > for each input split. > > In your case, the four tasks would be scheduled to four of the 40 > machines > > and most of the splits will be remotely read. > > > > Best, Fabian > > > > > > 2016-04-26 16:59 GMT+02:00 CPC <[hidden email]>: > > > > > Hi, > > > > > > I look at some scheduler documentations but could not find answer to my > > > question. My question is: suppose that i have a big file on 40 node > > hadoop > > > cluster and since it is a big file every node has at least one chunk of > > the > > > file. If i write a flink job and want to filter file and if job has > > > parelelism of 4(less that 40 actually) how datalocality is working? > Does > > > some tasks read some chunks from remote nodes? Or scheduler schedule > > tasks > > > in way that keeping max paralelism at 4 but schedule tasks on every > node? > > > > > > Regards > > > > > > |
My understanding of Flink on YARN is that the task managers are eagerly acquired when the session is started, before any job information (which could be mined for locality hints) is available. This makes sense because the session could be reused for other jobs. But this approach leads to poor locality as mentioned.
A sophisticated alternative would be to affinitize TaskManagers to a specific job, acquired lazily, leveraging locality info gleaned from the data sources/sinks. A hybrid solution could be used to counteract job latency. -Eron > From: [hidden email] > Date: Thu, 28 Apr 2016 11:59:28 +0200 > Subject: Re: Data locality and scheduler > To: [hidden email] > > Hi, > > yes, that can cause network traffic. > AFAIK, there are no plans to work on behavior. > > Best, Fabian > > 2016-04-26 18:17 GMT+02:00 CPC : > >> Hi >> >> But isnt this behaviour can cause a lot of network activity? Is there any >> roadmap or plan to change this behaviour? >> On Apr 26, 2016 7:06 PM, "Fabian Hueske" wrote: >> >>> Hi, >>> >>> Flink starts four tasks and then lazily assigns input splits to these >> tasks >>> with locality preference. So each task may consume more than one split. >>> This is different from Hadoop MapReduce or Spark which schedule a new >> task >>> for each input split. >>> In your case, the four tasks would be scheduled to four of the 40 >> machines >>> and most of the splits will be remotely read. >>> >>> Best, Fabian >>> >>> >>> 2016-04-26 16:59 GMT+02:00 CPC : >>> >>>> Hi, >>>> >>>> I look at some scheduler documentations but could not find answer to my >>>> question. My question is: suppose that i have a big file on 40 node >>> hadoop >>>> cluster and since it is a big file every node has at least one chunk of >>> the >>>> file. If i write a flink job and want to filter file and if job has >>>> parelelism of 4(less that 40 actually) how datalocality is working? >> Does >>>> some tasks read some chunks from remote nodes? Or scheduler schedule >>> tasks >>>> in way that keeping max paralelism at 4 but schedule tasks on every >> node? >>>> >>>> Regards >>>> >>> >> |
The lazy split assignment design goes back to the very early days of Flink
and has not been changed since. It was initially motivated by the pipelined data exchange of Flink which was easier to realize with a fixed set of source tasks which request file splits than with new tasks being spawned for every input split. I agree that is makes sense to think about an alternative or hybrid strategy to address the locality issue. 2016-05-01 4:56 GMT+02:00 Eron Wright <[hidden email]>: > My understanding of Flink on YARN is that the task managers are eagerly > acquired when the session is started, before any job information (which > could be mined for locality hints) is available. This makes sense because > the session could be reused for other jobs. But this approach leads to > poor locality as mentioned. > > A sophisticated alternative would be to affinitize TaskManagers to a > specific job, acquired lazily, leveraging locality info gleaned from the > data sources/sinks. A hybrid solution could be used to counteract job > latency. > > -Eron > > > > From: [hidden email] > > Date: Thu, 28 Apr 2016 11:59:28 +0200 > > Subject: Re: Data locality and scheduler > > To: [hidden email] > > > > Hi, > > > > yes, that can cause network traffic. > > AFAIK, there are no plans to work on behavior. > > > > Best, Fabian > > > > 2016-04-26 18:17 GMT+02:00 CPC : > > > >> Hi > >> > >> But isnt this behaviour can cause a lot of network activity? Is there > any > >> roadmap or plan to change this behaviour? > >> On Apr 26, 2016 7:06 PM, "Fabian Hueske" wrote: > >> > >>> Hi, > >>> > >>> Flink starts four tasks and then lazily assigns input splits to these > >> tasks > >>> with locality preference. So each task may consume more than one split. > >>> This is different from Hadoop MapReduce or Spark which schedule a new > >> task > >>> for each input split. > >>> In your case, the four tasks would be scheduled to four of the 40 > >> machines > >>> and most of the splits will be remotely read. > >>> > >>> Best, Fabian > >>> > >>> > >>> 2016-04-26 16:59 GMT+02:00 CPC : > >>> > >>>> Hi, > >>>> > >>>> I look at some scheduler documentations but could not find answer to > my > >>>> question. My question is: suppose that i have a big file on 40 node > >>> hadoop > >>>> cluster and since it is a big file every node has at least one chunk > of > >>> the > >>>> file. If i write a flink job and want to filter file and if job has > >>>> parelelism of 4(less that 40 actually) how datalocality is working? > >> Does > >>>> some tasks read some chunks from remote nodes? Or scheduler schedule > >>> tasks > >>>> in way that keeping max paralelism at 4 but schedule tasks on every > >> node? > >>>> > >>>> Regards > >>>> > >>> > >> > > |
Free forum by Nabble | Edit this page |