Hi,
According to the documentation : *"**Each task is executed by one thread ,**Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency"* So does it mean that the single box (refer below mails) represent it as a *single task* and the task will be executed by single thread only ? I am having 8 node cluster (parallelism set to 56), so what is the correct way to achieve maximum CPU utilization and parallelism ? Does complete stream chaining into a single box achieve maximum parallelism ? The data we are processing is huge volume of data (60,000 records per second), so wanted to be sure what we can correct to achieve better results. Regards, Vinay Patil On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > yes, the window operator is stateful, which means that it will pick up > where it left in case of a failure and restore. > > You're right about the graph, chained operators are shown as one box. > > Cheers, > Aljoscha > > On Fri, 1 Jul 2016 at 04:52 Vinay Patil <[hidden email]> wrote: > > > Hi, > > > > Just watched the video on Robust Stream Processing . > > So when we say Window is a stateful operator , does it mean that even if > > the task manager doing the window operation fails, will it pick up from > > the state left earlier when it comes up ? (Have not read more on state > for > > now) > > > > > > Also in one of our project when we deploy on cluster and check the Job > > Graph , everything is shown in one box , why this happens ? Is it because > > of chaining of streams ? > > So the box here represent the function flow, right ? > > > > > > > > Regards, > > Vinay Patil > > > |
Just an update, the task will be executed by multiple threads , my bad I
asked the wrong way. Can you please clarify other things. Out of 8 node only 3 of them are getting utilized, reading the data from Kafka , does it mean that the Kafka partitions are set to less number ? What if we use rescale or rebalance since it evenly distributes , would that ensure maximum use of resources ? Regards, Vinay Patil On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil <[hidden email]> wrote: > Hi, > > According to the documentation : > *"**Each task is executed by one thread ,**Chaining operators together > into tasks is a useful optimization: it reduces the overhead of > thread-to-thread handover and buffering, and increases overall throughput > while decreasing latency"* > So does it mean that the single box (refer below mails) represent it as a *single > task* and the task will be executed by single thread only ? > > I am having 8 node cluster (parallelism set to 56), so what is the correct > way to achieve maximum CPU utilization and parallelism ? Does complete > stream chaining into a single box achieve maximum parallelism ? > > The data we are processing is huge volume of data (60,000 records per > second), so wanted to be sure what we can correct to achieve better > results. > > Regards, > Vinay Patil > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek <[hidden email]> > wrote: > >> Hi, >> yes, the window operator is stateful, which means that it will pick up >> where it left in case of a failure and restore. >> >> You're right about the graph, chained operators are shown as one box. >> >> Cheers, >> Aljoscha >> >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil <[hidden email]> wrote: >> >> > Hi, >> > >> > Just watched the video on Robust Stream Processing . >> > So when we say Window is a stateful operator , does it mean that even if >> > the task manager doing the window operation fails, will it pick up from >> > the state left earlier when it comes up ? (Have not read more on state >> for >> > now) >> > >> > >> > Also in one of our project when we deploy on cluster and check the Job >> > Graph , everything is shown in one box , why this happens ? Is it >> because >> > of chaining of streams ? >> > So the box here represent the function flow, right ? >> > >> > >> > >> > Regards, >> > Vinay Patil >> > >> > |
Hi,
this is true, yes. If the number of Kafka partitions is less than the parallelism then some of the sources might not be utilized. If you insert a rebalance after the sources you should be able to utilize all the downstream operations equally. Cheers, Aljoscha On Mon, 4 Jul 2016 at 11:13 Vinay Patil <[hidden email]> wrote: > Just an update, the task will be executed by multiple threads , my bad I > asked the wrong way. > Can you please clarify other things. > > Out of 8 node only 3 of them are getting utilized, reading the data from > Kafka , does it mean that the Kafka partitions are set to less number ? > > What if we use rescale or rebalance since it evenly distributes , would > that ensure maximum use of resources ? > > Regards, > Vinay Patil > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil <[hidden email]> > wrote: > > > Hi, > > > > According to the documentation : > > *"**Each task is executed by one thread ,**Chaining operators together > > into tasks is a useful optimization: it reduces the overhead of > > thread-to-thread handover and buffering, and increases overall throughput > > while decreasing latency"* > > So does it mean that the single box (refer below mails) represent it as > a *single > > task* and the task will be executed by single thread only ? > > > > I am having 8 node cluster (parallelism set to 56), so what is the > correct > > way to achieve maximum CPU utilization and parallelism ? Does complete > > stream chaining into a single box achieve maximum parallelism ? > > > > The data we are processing is huge volume of data (60,000 records per > > second), so wanted to be sure what we can correct to achieve better > > results. > > > > Regards, > > Vinay Patil > > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek <[hidden email]> > > wrote: > > > >> Hi, > >> yes, the window operator is stateful, which means that it will pick up > >> where it left in case of a failure and restore. > >> > >> You're right about the graph, chained operators are shown as one box. > >> > >> Cheers, > >> Aljoscha > >> > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil <[hidden email]> > wrote: > >> > >> > Hi, > >> > > >> > Just watched the video on Robust Stream Processing . > >> > So when we say Window is a stateful operator , does it mean that even > if > >> > the task manager doing the window operation fails, will it pick up > from > >> > the state left earlier when it comes up ? (Have not read more on state > >> for > >> > now) > >> > > >> > > >> > Also in one of our project when we deploy on cluster and check the Job > >> > Graph , everything is shown in one box , why this happens ? Is it > >> because > >> > of chaining of streams ? > >> > So the box here represent the function flow, right ? > >> > > >> > > >> > > >> > Regards, > >> > Vinay Patil > >> > > >> > > > |
Thanks,
so is operator chaining useful in terms of utilizing the resources or we should keep the chaining to minimal use, say 3-4 operators and disable chaining ? I am worried because I am seeing all the operators in one box on flink UI. Regards, Vinay Patil On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > this is true, yes. If the number of Kafka partitions is less than the > parallelism then some of the sources might not be utilized. If you insert a > rebalance after the sources you should be able to utilize all the > downstream operations equally. > > Cheers, > Aljoscha > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil <[hidden email]> wrote: > > > Just an update, the task will be executed by multiple threads , my bad I > > asked the wrong way. > > Can you please clarify other things. > > > > Out of 8 node only 3 of them are getting utilized, reading the data from > > Kafka , does it mean that the Kafka partitions are set to less number ? > > > > What if we use rescale or rebalance since it evenly distributes , would > > that ensure maximum use of resources ? > > > > Regards, > > Vinay Patil > > > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil <[hidden email]> > > wrote: > > > > > Hi, > > > > > > According to the documentation : > > > *"**Each task is executed by one thread ,**Chaining operators together > > > into tasks is a useful optimization: it reduces the overhead of > > > thread-to-thread handover and buffering, and increases overall > throughput > > > while decreasing latency"* > > > So does it mean that the single box (refer below mails) represent it as > > a *single > > > task* and the task will be executed by single thread only ? > > > > > > I am having 8 node cluster (parallelism set to 56), so what is the > > correct > > > way to achieve maximum CPU utilization and parallelism ? Does complete > > > stream chaining into a single box achieve maximum parallelism ? > > > > > > The data we are processing is huge volume of data (60,000 records per > > > second), so wanted to be sure what we can correct to achieve better > > > results. > > > > > > Regards, > > > Vinay Patil > > > > > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek <[hidden email]> > > > wrote: > > > > > >> Hi, > > >> yes, the window operator is stateful, which means that it will pick up > > >> where it left in case of a failure and restore. > > >> > > >> You're right about the graph, chained operators are shown as one box. > > >> > > >> Cheers, > > >> Aljoscha > > >> > > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil <[hidden email]> > > wrote: > > >> > > >> > Hi, > > >> > > > >> > Just watched the video on Robust Stream Processing . > > >> > So when we say Window is a stateful operator , does it mean that > even > > if > > >> > the task manager doing the window operation fails, will it pick up > > from > > >> > the state left earlier when it comes up ? (Have not read more on > state > > >> for > > >> > now) > > >> > > > >> > > > >> > Also in one of our project when we deploy on cluster and check the > Job > > >> > Graph , everything is shown in one box , why this happens ? Is it > > >> because > > >> > of chaining of streams ? > > >> > So the box here represent the function flow, right ? > > >> > > > >> > > > >> > > > >> > Regards, > > >> > Vinay Patil > > >> > > > >> > > > > > > |
Hi,
chaining is useful to minimize communication overhead. But in your case you might benefit more from having good cluster utilization. There seems to be a tradeoff. Maybe you can run some easy tests to see how it behaves for you. Cheers, Aljoscha On Mon, 4 Jul 2016 at 16:28 Vinay Patil <[hidden email]> wrote: > Thanks, > > so is operator chaining useful in terms of utilizing the resources or we > should keep the chaining to minimal use, say 3-4 operators and disable > chaining ? > I am worried because I am seeing all the operators in one box on flink UI. > > > Regards, > Vinay Patil > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek <[hidden email]> > wrote: > > > Hi, > > this is true, yes. If the number of Kafka partitions is less than the > > parallelism then some of the sources might not be utilized. If you > insert a > > rebalance after the sources you should be able to utilize all the > > downstream operations equally. > > > > Cheers, > > Aljoscha > > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil <[hidden email]> wrote: > > > > > Just an update, the task will be executed by multiple threads , my bad > I > > > asked the wrong way. > > > Can you please clarify other things. > > > > > > Out of 8 node only 3 of them are getting utilized, reading the data > from > > > Kafka , does it mean that the Kafka partitions are set to less number ? > > > > > > What if we use rescale or rebalance since it evenly distributes , would > > > that ensure maximum use of resources ? > > > > > > Regards, > > > Vinay Patil > > > > > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil <[hidden email]> > > > wrote: > > > > > > > Hi, > > > > > > > > According to the documentation : > > > > *"**Each task is executed by one thread ,**Chaining operators > together > > > > into tasks is a useful optimization: it reduces the overhead of > > > > thread-to-thread handover and buffering, and increases overall > > throughput > > > > while decreasing latency"* > > > > So does it mean that the single box (refer below mails) represent it > as > > > a *single > > > > task* and the task will be executed by single thread only ? > > > > > > > > I am having 8 node cluster (parallelism set to 56), so what is the > > > correct > > > > way to achieve maximum CPU utilization and parallelism ? Does > complete > > > > stream chaining into a single box achieve maximum parallelism ? > > > > > > > > The data we are processing is huge volume of data (60,000 records per > > > > second), so wanted to be sure what we can correct to achieve better > > > > results. > > > > > > > > Regards, > > > > Vinay Patil > > > > > > > > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek < > [hidden email]> > > > > wrote: > > > > > > > >> Hi, > > > >> yes, the window operator is stateful, which means that it will pick > up > > > >> where it left in case of a failure and restore. > > > >> > > > >> You're right about the graph, chained operators are shown as one > box. > > > >> > > > >> Cheers, > > > >> Aljoscha > > > >> > > > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil <[hidden email]> > > > wrote: > > > >> > > > >> > Hi, > > > >> > > > > >> > Just watched the video on Robust Stream Processing . > > > >> > So when we say Window is a stateful operator , does it mean that > > even > > > if > > > >> > the task manager doing the window operation fails, will it pick > up > > > from > > > >> > the state left earlier when it comes up ? (Have not read more on > > state > > > >> for > > > >> > now) > > > >> > > > > >> > > > > >> > Also in one of our project when we deploy on cluster and check the > > Job > > > >> > Graph , everything is shown in one box , why this happens ? Is it > > > >> because > > > >> > of chaining of streams ? > > > >> > So the box here represent the function flow, right ? > > > >> > > > > >> > > > > >> > > > > >> > Regards, > > > >> > Vinay Patil > > > >> > > > > >> > > > > > > > > > > |
Just to be sure: Each *subtask* has one thread - so for each task, there
are as many parallel threads (distributed across nodes) as your parallelism indicates. For most cases, having long chains and then a higher parallelism is a good choice. Cases where individual functions (MapFunction, etc) do something very CPU intensive are cases where you may want to not chain them, so they get a separate thread. If you see all tasks in one box in the UI, it probably means you have only "Filter" and "Map" as a function? In that case it is fine to have just one box (=Task) in the UI. The box still has parallelism via subtasks. If you insert a "rebalance()" between the Kafka Source and the Map/Filter/etc it makes sure that the data distribution in the Map/Filter/etc operators has best utilization independent of how the data was partitioned in Kafka. You should then also see two boxes in the UI - one for the Kafka Source, one for the actual processing. On Mon, Jul 4, 2016 at 5:00 PM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > chaining is useful to minimize communication overhead. But in your case you > might benefit more from having good cluster utilization. There seems to be > a tradeoff. Maybe you can run some easy tests to see how it behaves for > you. > > Cheers, > Aljoscha > > On Mon, 4 Jul 2016 at 16:28 Vinay Patil <[hidden email]> wrote: > > > Thanks, > > > > so is operator chaining useful in terms of utilizing the resources or we > > should keep the chaining to minimal use, say 3-4 operators and disable > > chaining ? > > I am worried because I am seeing all the operators in one box on flink > UI. > > > > > > Regards, > > Vinay Patil > > > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > Hi, > > > this is true, yes. If the number of Kafka partitions is less than the > > > parallelism then some of the sources might not be utilized. If you > > insert a > > > rebalance after the sources you should be able to utilize all the > > > downstream operations equally. > > > > > > Cheers, > > > Aljoscha > > > > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil <[hidden email]> > wrote: > > > > > > > Just an update, the task will be executed by multiple threads , my > bad > > I > > > > asked the wrong way. > > > > Can you please clarify other things. > > > > > > > > Out of 8 node only 3 of them are getting utilized, reading the data > > from > > > > Kafka , does it mean that the Kafka partitions are set to less > number ? > > > > > > > > What if we use rescale or rebalance since it evenly distributes , > would > > > > that ensure maximum use of resources ? > > > > > > > > Regards, > > > > Vinay Patil > > > > > > > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil < > [hidden email]> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > According to the documentation : > > > > > *"**Each task is executed by one thread ,**Chaining operators > > together > > > > > into tasks is a useful optimization: it reduces the overhead of > > > > > thread-to-thread handover and buffering, and increases overall > > > throughput > > > > > while decreasing latency"* > > > > > So does it mean that the single box (refer below mails) represent > it > > as > > > > a *single > > > > > task* and the task will be executed by single thread only ? > > > > > > > > > > I am having 8 node cluster (parallelism set to 56), so what is the > > > > correct > > > > > way to achieve maximum CPU utilization and parallelism ? Does > > complete > > > > > stream chaining into a single box achieve maximum parallelism ? > > > > > > > > > > The data we are processing is huge volume of data (60,000 records > per > > > > > second), so wanted to be sure what we can correct to achieve better > > > > > results. > > > > > > > > > > Regards, > > > > > Vinay Patil > > > > > > > > > > > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek < > > [hidden email]> > > > > > wrote: > > > > > > > > > >> Hi, > > > > >> yes, the window operator is stateful, which means that it will > pick > > up > > > > >> where it left in case of a failure and restore. > > > > >> > > > > >> You're right about the graph, chained operators are shown as one > > box. > > > > >> > > > > >> Cheers, > > > > >> Aljoscha > > > > >> > > > > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil <[hidden email]> > > > > wrote: > > > > >> > > > > >> > Hi, > > > > >> > > > > > >> > Just watched the video on Robust Stream Processing . > > > > >> > So when we say Window is a stateful operator , does it mean that > > > even > > > > if > > > > >> > the task manager doing the window operation fails, will it pick > > up > > > > from > > > > >> > the state left earlier when it comes up ? (Have not read more on > > > state > > > > >> for > > > > >> > now) > > > > >> > > > > > >> > > > > > >> > Also in one of our project when we deploy on cluster and check > the > > > Job > > > > >> > Graph , everything is shown in one box , why this happens ? Is > it > > > > >> because > > > > >> > of chaining of streams ? > > > > >> > So the box here represent the function flow, right ? > > > > >> > > > > > >> > > > > > >> > > > > > >> > Regards, > > > > >> > Vinay Patil > > > > >> > > > > > >> > > > > > > > > > > > > > > > |
Thanks a lot guys, this helps to understand better
Regards, Vinay Patil On Mon, Jul 4, 2016 at 8:43 PM, Stephan Ewen <[hidden email]> wrote: > Just to be sure: Each *subtask* has one thread - so for each task, there > are as many parallel threads (distributed across nodes) as your parallelism > indicates. > > For most cases, having long chains and then a higher parallelism is a good > choice. > Cases where individual functions (MapFunction, etc) do something very CPU > intensive are cases where you may want to not chain them, so they get a > separate thread. > > If you see all tasks in one box in the UI, it probably means you have only > "Filter" and "Map" as a function? In that case it is fine to have just one > box (=Task) in the UI. The box still has parallelism via subtasks. > > If you insert a "rebalance()" between the Kafka Source and the > Map/Filter/etc it makes sure that the data distribution in the > Map/Filter/etc operators has best utilization independent of how the data > was partitioned in Kafka. > You should then also see two boxes in the UI - one for the Kafka Source, > one for the actual processing. > > > > > > > On Mon, Jul 4, 2016 at 5:00 PM, Aljoscha Krettek <[hidden email]> > wrote: > > > Hi, > > chaining is useful to minimize communication overhead. But in your case > you > > might benefit more from having good cluster utilization. There seems to > be > > a tradeoff. Maybe you can run some easy tests to see how it behaves for > > you. > > > > Cheers, > > Aljoscha > > > > On Mon, 4 Jul 2016 at 16:28 Vinay Patil <[hidden email]> wrote: > > > > > Thanks, > > > > > > so is operator chaining useful in terms of utilizing the resources or > we > > > should keep the chaining to minimal use, say 3-4 operators and disable > > > chaining ? > > > I am worried because I am seeing all the operators in one box on flink > > UI. > > > > > > > > > Regards, > > > Vinay Patil > > > > > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek <[hidden email]> > > > wrote: > > > > > > > Hi, > > > > this is true, yes. If the number of Kafka partitions is less than the > > > > parallelism then some of the sources might not be utilized. If you > > > insert a > > > > rebalance after the sources you should be able to utilize all the > > > > downstream operations equally. > > > > > > > > Cheers, > > > > Aljoscha > > > > > > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil <[hidden email]> > > wrote: > > > > > > > > > Just an update, the task will be executed by multiple threads , my > > bad > > > I > > > > > asked the wrong way. > > > > > Can you please clarify other things. > > > > > > > > > > Out of 8 node only 3 of them are getting utilized, reading the data > > > from > > > > > Kafka , does it mean that the Kafka partitions are set to less > > number ? > > > > > > > > > > What if we use rescale or rebalance since it evenly distributes , > > would > > > > > that ensure maximum use of resources ? > > > > > > > > > > Regards, > > > > > Vinay Patil > > > > > > > > > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil < > > [hidden email]> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > According to the documentation : > > > > > > *"**Each task is executed by one thread ,**Chaining operators > > > together > > > > > > into tasks is a useful optimization: it reduces the overhead of > > > > > > thread-to-thread handover and buffering, and increases overall > > > > throughput > > > > > > while decreasing latency"* > > > > > > So does it mean that the single box (refer below mails) represent > > it > > > as > > > > > a *single > > > > > > task* and the task will be executed by single thread only ? > > > > > > > > > > > > I am having 8 node cluster (parallelism set to 56), so what is > the > > > > > correct > > > > > > way to achieve maximum CPU utilization and parallelism ? Does > > > complete > > > > > > stream chaining into a single box achieve maximum parallelism ? > > > > > > > > > > > > The data we are processing is huge volume of data (60,000 records > > per > > > > > > second), so wanted to be sure what we can correct to achieve > better > > > > > > results. > > > > > > > > > > > > Regards, > > > > > > Vinay Patil > > > > > > > > > > > > > > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek < > > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > >> Hi, > > > > > >> yes, the window operator is stateful, which means that it will > > pick > > > up > > > > > >> where it left in case of a failure and restore. > > > > > >> > > > > > >> You're right about the graph, chained operators are shown as one > > > box. > > > > > >> > > > > > >> Cheers, > > > > > >> Aljoscha > > > > > >> > > > > > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil < > [hidden email]> > > > > > wrote: > > > > > >> > > > > > >> > Hi, > > > > > >> > > > > > > >> > Just watched the video on Robust Stream Processing . > > > > > >> > So when we say Window is a stateful operator , does it mean > that > > > > even > > > > > if > > > > > >> > the task manager doing the window operation fails, will it > pick > > > up > > > > > from > > > > > >> > the state left earlier when it comes up ? (Have not read more > on > > > > state > > > > > >> for > > > > > >> > now) > > > > > >> > > > > > > >> > > > > > > >> > Also in one of our project when we deploy on cluster and check > > the > > > > Job > > > > > >> > Graph , everything is shown in one box , why this happens ? Is > > it > > > > > >> because > > > > > >> > of chaining of streams ? > > > > > >> > So the box here represent the function flow, right ? > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > Regards, > > > > > >> > Vinay Patil > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > |
Hi,
The re-balance actually distributes it to all the task managers, and now all TM's are getting utilized, You were right , I am seeing two boxes(Tasks) now. I have one question regarding the task slots : For the source the parallelism is set to 56, now when we see on the UI and click on source sub-task , I see 56 entries , out of which only two are getting the data from Kafka (this may be because I have two kafka partitions) The 56 entries that I am seeing for a sub-task on UI are the total task slots of all TM's, right ? If yes, only two slots are getting utilized, how do I ensure enough task slots are getting utilized at the source ? I have 7 task managers (8 cores per TM), so if only 1 core each of two task manager is performing the consume operation, wouldn't it hamper the performance. Even if two Task managers are utilized , all 16 slots should have been used , right ? For the other sub-task, for all 56 entries I am seeing bytes received. (this may be because of applying rebalance after the source) P.S: I am reading over million records from Kafka , so need to utilize enough resources [Performance is the key here]. Regards, Vinay Patil On Mon, Jul 4, 2016 at 8:55 PM, Vinay Patil <[hidden email]> wrote: > Thanks a lot guys, this helps to understand better > > Regards, > Vinay Patil > > On Mon, Jul 4, 2016 at 8:43 PM, Stephan Ewen <[hidden email]> wrote: > >> Just to be sure: Each *subtask* has one thread - so for each task, there >> are as many parallel threads (distributed across nodes) as your >> parallelism >> indicates. >> >> For most cases, having long chains and then a higher parallelism is a good >> choice. >> Cases where individual functions (MapFunction, etc) do something very CPU >> intensive are cases where you may want to not chain them, so they get a >> separate thread. >> >> If you see all tasks in one box in the UI, it probably means you have only >> "Filter" and "Map" as a function? In that case it is fine to have just one >> box (=Task) in the UI. The box still has parallelism via subtasks. >> >> If you insert a "rebalance()" between the Kafka Source and the >> Map/Filter/etc it makes sure that the data distribution in the >> Map/Filter/etc operators has best utilization independent of how the data >> was partitioned in Kafka. >> You should then also see two boxes in the UI - one for the Kafka Source, >> one for the actual processing. >> >> >> >> >> >> >> On Mon, Jul 4, 2016 at 5:00 PM, Aljoscha Krettek <[hidden email]> >> wrote: >> >> > Hi, >> > chaining is useful to minimize communication overhead. But in your case >> you >> > might benefit more from having good cluster utilization. There seems to >> be >> > a tradeoff. Maybe you can run some easy tests to see how it behaves for >> > you. >> > >> > Cheers, >> > Aljoscha >> > >> > On Mon, 4 Jul 2016 at 16:28 Vinay Patil <[hidden email]> >> wrote: >> > >> > > Thanks, >> > > >> > > so is operator chaining useful in terms of utilizing the resources or >> we >> > > should keep the chaining to minimal use, say 3-4 operators and disable >> > > chaining ? >> > > I am worried because I am seeing all the operators in one box on flink >> > UI. >> > > >> > > >> > > Regards, >> > > Vinay Patil >> > > >> > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek <[hidden email] >> > >> > > wrote: >> > > >> > > > Hi, >> > > > this is true, yes. If the number of Kafka partitions is less than >> the >> > > > parallelism then some of the sources might not be utilized. If you >> > > insert a >> > > > rebalance after the sources you should be able to utilize all the >> > > > downstream operations equally. >> > > > >> > > > Cheers, >> > > > Aljoscha >> > > > >> > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil <[hidden email]> >> > wrote: >> > > > >> > > > > Just an update, the task will be executed by multiple threads , my >> > bad >> > > I >> > > > > asked the wrong way. >> > > > > Can you please clarify other things. >> > > > > >> > > > > Out of 8 node only 3 of them are getting utilized, reading the >> data >> > > from >> > > > > Kafka , does it mean that the Kafka partitions are set to less >> > number ? >> > > > > >> > > > > What if we use rescale or rebalance since it evenly distributes , >> > would >> > > > > that ensure maximum use of resources ? >> > > > > >> > > > > Regards, >> > > > > Vinay Patil >> > > > > >> > > > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil < >> > [hidden email]> >> > > > > wrote: >> > > > > >> > > > > > Hi, >> > > > > > >> > > > > > According to the documentation : >> > > > > > *"**Each task is executed by one thread ,**Chaining operators >> > > together >> > > > > > into tasks is a useful optimization: it reduces the overhead of >> > > > > > thread-to-thread handover and buffering, and increases overall >> > > > throughput >> > > > > > while decreasing latency"* >> > > > > > So does it mean that the single box (refer below mails) >> represent >> > it >> > > as >> > > > > a *single >> > > > > > task* and the task will be executed by single thread only ? >> > > > > > >> > > > > > I am having 8 node cluster (parallelism set to 56), so what is >> the >> > > > > correct >> > > > > > way to achieve maximum CPU utilization and parallelism ? Does >> > > complete >> > > > > > stream chaining into a single box achieve maximum parallelism ? >> > > > > > >> > > > > > The data we are processing is huge volume of data (60,000 >> records >> > per >> > > > > > second), so wanted to be sure what we can correct to achieve >> better >> > > > > > results. >> > > > > > >> > > > > > Regards, >> > > > > > Vinay Patil >> > > > > > >> > > > > > >> > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek < >> > > [hidden email]> >> > > > > > wrote: >> > > > > > >> > > > > >> Hi, >> > > > > >> yes, the window operator is stateful, which means that it will >> > pick >> > > up >> > > > > >> where it left in case of a failure and restore. >> > > > > >> >> > > > > >> You're right about the graph, chained operators are shown as >> one >> > > box. >> > > > > >> >> > > > > >> Cheers, >> > > > > >> Aljoscha >> > > > > >> >> > > > > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil < >> [hidden email]> >> > > > > wrote: >> > > > > >> >> > > > > >> > Hi, >> > > > > >> > >> > > > > >> > Just watched the video on Robust Stream Processing . >> > > > > >> > So when we say Window is a stateful operator , does it mean >> that >> > > > even >> > > > > if >> > > > > >> > the task manager doing the window operation fails, will it >> pick >> > > up >> > > > > from >> > > > > >> > the state left earlier when it comes up ? (Have not read >> more on >> > > > state >> > > > > >> for >> > > > > >> > now) >> > > > > >> > >> > > > > >> > >> > > > > >> > Also in one of our project when we deploy on cluster and >> check >> > the >> > > > Job >> > > > > >> > Graph , everything is shown in one box , why this happens ? >> Is >> > it >> > > > > >> because >> > > > > >> > of chaining of streams ? >> > > > > >> > So the box here represent the function flow, right ? >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > Regards, >> > > > > >> > Vinay Patil >> > > > > >> > >> > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> > > |
Hi,
unfortunately the reading of one Kafka partition cannot be split among several parallel instances of the Kafka source. So if you have only 2 partitions your reading parallelism is limited to that. You are right that this can lead to bad performance and underutilization. The only solution I see right now is to have more partitions in Kafka so that more readers can read in parallel. +Robert Adding Robert directly because he might have something more to say about this. Cheers, Aljoscha On Tue, 5 Jul 2016 at 15:48 Vinay Patil <[hidden email]> wrote: > Hi, > > The re-balance actually distributes it to all the task managers, and now > all TM's are getting utilized, You were right , I am seeing two > boxes(Tasks) now. > > I have one question regarding the task slots : > > For the source the parallelism is set to 56, now when we see on the UI and > click on source sub-task , I see 56 entries , out of which only two are > getting the data from Kafka (this may be because I have two kafka > partitions) > > The 56 entries that I am seeing for a sub-task on UI are the total task > slots of all TM's, right ? > > If yes, only two slots are getting utilized, how do I ensure enough task > slots are getting utilized at the source ? I have 7 task managers (8 cores > per TM), so if only 1 core each of two task manager is performing the > consume operation, wouldn't it hamper the performance. > > Even if two Task managers are utilized , all 16 slots should have been used > , right ? > > For the other sub-task, for all 56 entries I am seeing bytes received. > (this may be because of applying rebalance after the source) > > P.S: I am reading over million records from Kafka , so need to utilize > enough resources [Performance is the key here]. > > > Regards, > Vinay Patil > > On Mon, Jul 4, 2016 at 8:55 PM, Vinay Patil <[hidden email]> > wrote: > > > Thanks a lot guys, this helps to understand better > > > > Regards, > > Vinay Patil > > > > On Mon, Jul 4, 2016 at 8:43 PM, Stephan Ewen <[hidden email]> wrote: > > > >> Just to be sure: Each *subtask* has one thread - so for each task, there > >> are as many parallel threads (distributed across nodes) as your > >> parallelism > >> indicates. > >> > >> For most cases, having long chains and then a higher parallelism is a > good > >> choice. > >> Cases where individual functions (MapFunction, etc) do something very > CPU > >> intensive are cases where you may want to not chain them, so they get a > >> separate thread. > >> > >> If you see all tasks in one box in the UI, it probably means you have > only > >> "Filter" and "Map" as a function? In that case it is fine to have just > one > >> box (=Task) in the UI. The box still has parallelism via subtasks. > >> > >> If you insert a "rebalance()" between the Kafka Source and the > >> Map/Filter/etc it makes sure that the data distribution in the > >> Map/Filter/etc operators has best utilization independent of how the > data > >> was partitioned in Kafka. > >> You should then also see two boxes in the UI - one for the Kafka Source, > >> one for the actual processing. > >> > >> > >> > >> > >> > >> > >> On Mon, Jul 4, 2016 at 5:00 PM, Aljoscha Krettek <[hidden email]> > >> wrote: > >> > >> > Hi, > >> > chaining is useful to minimize communication overhead. But in your > case > >> you > >> > might benefit more from having good cluster utilization. There seems > to > >> be > >> > a tradeoff. Maybe you can run some easy tests to see how it behaves > for > >> > you. > >> > > >> > Cheers, > >> > Aljoscha > >> > > >> > On Mon, 4 Jul 2016 at 16:28 Vinay Patil <[hidden email]> > >> wrote: > >> > > >> > > Thanks, > >> > > > >> > > so is operator chaining useful in terms of utilizing the resources > or > >> we > >> > > should keep the chaining to minimal use, say 3-4 operators and > disable > >> > > chaining ? > >> > > I am worried because I am seeing all the operators in one box on > flink > >> > UI. > >> > > > >> > > > >> > > Regards, > >> > > Vinay Patil > >> > > > >> > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek < > [hidden email] > >> > > >> > > wrote: > >> > > > >> > > > Hi, > >> > > > this is true, yes. If the number of Kafka partitions is less than > >> the > >> > > > parallelism then some of the sources might not be utilized. If you > >> > > insert a > >> > > > rebalance after the sources you should be able to utilize all the > >> > > > downstream operations equally. > >> > > > > >> > > > Cheers, > >> > > > Aljoscha > >> > > > > >> > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil <[hidden email]> > >> > wrote: > >> > > > > >> > > > > Just an update, the task will be executed by multiple threads , > my > >> > bad > >> > > I > >> > > > > asked the wrong way. > >> > > > > Can you please clarify other things. > >> > > > > > >> > > > > Out of 8 node only 3 of them are getting utilized, reading the > >> data > >> > > from > >> > > > > Kafka , does it mean that the Kafka partitions are set to less > >> > number ? > >> > > > > > >> > > > > What if we use rescale or rebalance since it evenly distributes > , > >> > would > >> > > > > that ensure maximum use of resources ? > >> > > > > > >> > > > > Regards, > >> > > > > Vinay Patil > >> > > > > > >> > > > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil < > >> > [hidden email]> > >> > > > > wrote: > >> > > > > > >> > > > > > Hi, > >> > > > > > > >> > > > > > According to the documentation : > >> > > > > > *"**Each task is executed by one thread ,**Chaining operators > >> > > together > >> > > > > > into tasks is a useful optimization: it reduces the overhead > of > >> > > > > > thread-to-thread handover and buffering, and increases overall > >> > > > throughput > >> > > > > > while decreasing latency"* > >> > > > > > So does it mean that the single box (refer below mails) > >> represent > >> > it > >> > > as > >> > > > > a *single > >> > > > > > task* and the task will be executed by single thread only ? > >> > > > > > > >> > > > > > I am having 8 node cluster (parallelism set to 56), so what is > >> the > >> > > > > correct > >> > > > > > way to achieve maximum CPU utilization and parallelism ? Does > >> > > complete > >> > > > > > stream chaining into a single box achieve maximum parallelism > ? > >> > > > > > > >> > > > > > The data we are processing is huge volume of data (60,000 > >> records > >> > per > >> > > > > > second), so wanted to be sure what we can correct to achieve > >> better > >> > > > > > results. > >> > > > > > > >> > > > > > Regards, > >> > > > > > Vinay Patil > >> > > > > > > >> > > > > > > >> > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek < > >> > > [hidden email]> > >> > > > > > wrote: > >> > > > > > > >> > > > > >> Hi, > >> > > > > >> yes, the window operator is stateful, which means that it > will > >> > pick > >> > > up > >> > > > > >> where it left in case of a failure and restore. > >> > > > > >> > >> > > > > >> You're right about the graph, chained operators are shown as > >> one > >> > > box. > >> > > > > >> > >> > > > > >> Cheers, > >> > > > > >> Aljoscha > >> > > > > >> > >> > > > > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil < > >> [hidden email]> > >> > > > > wrote: > >> > > > > >> > >> > > > > >> > Hi, > >> > > > > >> > > >> > > > > >> > Just watched the video on Robust Stream Processing . > >> > > > > >> > So when we say Window is a stateful operator , does it mean > >> that > >> > > > even > >> > > > > if > >> > > > > >> > the task manager doing the window operation fails, will it > >> pick > >> > > up > >> > > > > from > >> > > > > >> > the state left earlier when it comes up ? (Have not read > >> more on > >> > > > state > >> > > > > >> for > >> > > > > >> > now) > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > Also in one of our project when we deploy on cluster and > >> check > >> > the > >> > > > Job > >> > > > > >> > Graph , everything is shown in one box , why this happens ? > >> Is > >> > it > >> > > > > >> because > >> > > > > >> > of chaining of streams ? > >> > > > > >> > So the box here represent the function flow, right ? > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > Regards, > >> > > > > >> > Vinay Patil > >> > > > > >> > > >> > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > > |
Aljoscha is right. Multiple consumers in the same consumer group can not
read from the same partition. You'll need to create a Kafka topic with more partitions to have higher parallelism. On Wed, Jul 6, 2016 at 10:45 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > unfortunately the reading of one Kafka partition cannot be split among > several parallel instances of the Kafka source. So if you have only 2 > partitions your reading parallelism is limited to that. You are right that > this can lead to bad performance and underutilization. The only solution I > see right now is to have more partitions in Kafka so that more readers can > read in parallel. > > +Robert Adding Robert directly because he might have something more to say > about this. > > Cheers, > Aljoscha > > On Tue, 5 Jul 2016 at 15:48 Vinay Patil <[hidden email]> wrote: > >> Hi, >> >> The re-balance actually distributes it to all the task managers, and now >> all TM's are getting utilized, You were right , I am seeing two >> boxes(Tasks) now. >> >> I have one question regarding the task slots : >> >> For the source the parallelism is set to 56, now when we see on the UI and >> click on source sub-task , I see 56 entries , out of which only two are >> getting the data from Kafka (this may be because I have two kafka >> partitions) >> >> The 56 entries that I am seeing for a sub-task on UI are the total task >> slots of all TM's, right ? >> >> If yes, only two slots are getting utilized, how do I ensure enough task >> slots are getting utilized at the source ? I have 7 task managers (8 cores >> per TM), so if only 1 core each of two task manager is performing the >> consume operation, wouldn't it hamper the performance. >> >> Even if two Task managers are utilized , all 16 slots should have been >> used >> , right ? >> >> For the other sub-task, for all 56 entries I am seeing bytes received. >> (this may be because of applying rebalance after the source) >> >> P.S: I am reading over million records from Kafka , so need to utilize >> enough resources [Performance is the key here]. >> >> >> Regards, >> Vinay Patil >> >> On Mon, Jul 4, 2016 at 8:55 PM, Vinay Patil <[hidden email]> >> wrote: >> >> > Thanks a lot guys, this helps to understand better >> > >> > Regards, >> > Vinay Patil >> > >> > On Mon, Jul 4, 2016 at 8:43 PM, Stephan Ewen <[hidden email]> wrote: >> > >> >> Just to be sure: Each *subtask* has one thread - so for each task, >> there >> >> are as many parallel threads (distributed across nodes) as your >> >> parallelism >> >> indicates. >> >> >> >> For most cases, having long chains and then a higher parallelism is a >> good >> >> choice. >> >> Cases where individual functions (MapFunction, etc) do something very >> CPU >> >> intensive are cases where you may want to not chain them, so they get a >> >> separate thread. >> >> >> >> If you see all tasks in one box in the UI, it probably means you have >> only >> >> "Filter" and "Map" as a function? In that case it is fine to have just >> one >> >> box (=Task) in the UI. The box still has parallelism via subtasks. >> >> >> >> If you insert a "rebalance()" between the Kafka Source and the >> >> Map/Filter/etc it makes sure that the data distribution in the >> >> Map/Filter/etc operators has best utilization independent of how the >> data >> >> was partitioned in Kafka. >> >> You should then also see two boxes in the UI - one for the Kafka >> Source, >> >> one for the actual processing. >> >> >> >> >> >> >> >> >> >> >> >> >> >> On Mon, Jul 4, 2016 at 5:00 PM, Aljoscha Krettek <[hidden email]> >> >> wrote: >> >> >> >> > Hi, >> >> > chaining is useful to minimize communication overhead. But in your >> case >> >> you >> >> > might benefit more from having good cluster utilization. There seems >> to >> >> be >> >> > a tradeoff. Maybe you can run some easy tests to see how it behaves >> for >> >> > you. >> >> > >> >> > Cheers, >> >> > Aljoscha >> >> > >> >> > On Mon, 4 Jul 2016 at 16:28 Vinay Patil <[hidden email]> >> >> wrote: >> >> > >> >> > > Thanks, >> >> > > >> >> > > so is operator chaining useful in terms of utilizing the resources >> or >> >> we >> >> > > should keep the chaining to minimal use, say 3-4 operators and >> disable >> >> > > chaining ? >> >> > > I am worried because I am seeing all the operators in one box on >> flink >> >> > UI. >> >> > > >> >> > > >> >> > > Regards, >> >> > > Vinay Patil >> >> > > >> >> > > On Mon, Jul 4, 2016 at 7:13 PM, Aljoscha Krettek < >> [hidden email] >> >> > >> >> > > wrote: >> >> > > >> >> > > > Hi, >> >> > > > this is true, yes. If the number of Kafka partitions is less than >> >> the >> >> > > > parallelism then some of the sources might not be utilized. If >> you >> >> > > insert a >> >> > > > rebalance after the sources you should be able to utilize all the >> >> > > > downstream operations equally. >> >> > > > >> >> > > > Cheers, >> >> > > > Aljoscha >> >> > > > >> >> > > > On Mon, 4 Jul 2016 at 11:13 Vinay Patil <[hidden email] >> > >> >> > wrote: >> >> > > > >> >> > > > > Just an update, the task will be executed by multiple threads >> , my >> >> > bad >> >> > > I >> >> > > > > asked the wrong way. >> >> > > > > Can you please clarify other things. >> >> > > > > >> >> > > > > Out of 8 node only 3 of them are getting utilized, reading the >> >> data >> >> > > from >> >> > > > > Kafka , does it mean that the Kafka partitions are set to less >> >> > number ? >> >> > > > > >> >> > > > > What if we use rescale or rebalance since it evenly >> distributes , >> >> > would >> >> > > > > that ensure maximum use of resources ? >> >> > > > > >> >> > > > > Regards, >> >> > > > > Vinay Patil >> >> > > > > >> >> > > > > On Fri, Jul 1, 2016 at 11:09 PM, Vinay Patil < >> >> > [hidden email]> >> >> > > > > wrote: >> >> > > > > >> >> > > > > > Hi, >> >> > > > > > >> >> > > > > > According to the documentation : >> >> > > > > > *"**Each task is executed by one thread ,**Chaining operators >> >> > > together >> >> > > > > > into tasks is a useful optimization: it reduces the overhead >> of >> >> > > > > > thread-to-thread handover and buffering, and increases >> overall >> >> > > > throughput >> >> > > > > > while decreasing latency"* >> >> > > > > > So does it mean that the single box (refer below mails) >> >> represent >> >> > it >> >> > > as >> >> > > > > a *single >> >> > > > > > task* and the task will be executed by single thread only ? >> >> > > > > > >> >> > > > > > I am having 8 node cluster (parallelism set to 56), so what >> is >> >> the >> >> > > > > correct >> >> > > > > > way to achieve maximum CPU utilization and parallelism ? Does >> >> > > complete >> >> > > > > > stream chaining into a single box achieve maximum >> parallelism ? >> >> > > > > > >> >> > > > > > The data we are processing is huge volume of data (60,000 >> >> records >> >> > per >> >> > > > > > second), so wanted to be sure what we can correct to achieve >> >> better >> >> > > > > > results. >> >> > > > > > >> >> > > > > > Regards, >> >> > > > > > Vinay Patil >> >> > > > > > >> >> > > > > > >> >> > > > > > On Fri, Jul 1, 2016 at 9:23 PM, Aljoscha Krettek < >> >> > > [hidden email]> >> >> > > > > > wrote: >> >> > > > > > >> >> > > > > >> Hi, >> >> > > > > >> yes, the window operator is stateful, which means that it >> will >> >> > pick >> >> > > up >> >> > > > > >> where it left in case of a failure and restore. >> >> > > > > >> >> >> > > > > >> You're right about the graph, chained operators are shown as >> >> one >> >> > > box. >> >> > > > > >> >> >> > > > > >> Cheers, >> >> > > > > >> Aljoscha >> >> > > > > >> >> >> > > > > >> On Fri, 1 Jul 2016 at 04:52 Vinay Patil < >> >> [hidden email]> >> >> > > > > wrote: >> >> > > > > >> >> >> > > > > >> > Hi, >> >> > > > > >> > >> >> > > > > >> > Just watched the video on Robust Stream Processing . >> >> > > > > >> > So when we say Window is a stateful operator , does it >> mean >> >> that >> >> > > > even >> >> > > > > if >> >> > > > > >> > the task manager doing the window operation fails, will >> it >> >> pick >> >> > > up >> >> > > > > from >> >> > > > > >> > the state left earlier when it comes up ? (Have not read >> >> more on >> >> > > > state >> >> > > > > >> for >> >> > > > > >> > now) >> >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > Also in one of our project when we deploy on cluster and >> >> check >> >> > the >> >> > > > Job >> >> > > > > >> > Graph , everything is shown in one box , why this happens >> ? >> >> Is >> >> > it >> >> > > > > >> because >> >> > > > > >> > of chaining of streams ? >> >> > > > > >> > So the box here represent the function flow, right ? >> >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > >> >> > > > > >> > Regards, >> >> > > > > >> > Vinay Patil >> >> > > > > >> > >> >> > > > > >> >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> > >> > >> > |
Free forum by Nabble | Edit this page |