Hi all!
Now that we are coming to the next release, I wanted to make sure we finalize the decision on that point, because it would be nice to not break the behavior of system afterwards. Right now, when tasks are chained together, the system copies the elements always between different tasks in the same chain. I think this policy was established under the assumption that copies do not cost anything, given our own test examples, which mainly use immutable types like Strings, boxed primitives, .. In practice, a lot of data types are actually quite expensive to copy. For example, a rather common data type in the event analysis of web-sources is JSON Object. Flink treats this as a generic type. Depending on its concrete implementation, Kryo may have perform a serialization copy, which means encoding into bytes (JSON encoding, charset encoding) and decoding again. This has a massive impact on the out-of-the-box performance of the system. Given that, I was wondering whether we should set to default policy to "not copying". That is basically the behavior of the batch API, and there has so far never been an issue with that (people running into the trap of overwritten mutable elements). What do you think? Stephan |
+1 for disable copy by default
On 10/02/2015 05:53 PM, Stephan Ewen wrote: > Hi all! > > Now that we are coming to the next release, I wanted to make sure we > finalize the decision on that point, because it would be nice to not break > the behavior of system afterwards. > > Right now, when tasks are chained together, the system copies the elements > always between different tasks in the same chain. > > I think this policy was established under the assumption that copies do not > cost anything, given our own test examples, which mainly use immutable > types like Strings, boxed primitives, .. > > In practice, a lot of data types are actually quite expensive to copy. > > For example, a rather common data type in the event analysis of web-sources > is JSON Object. > Flink treats this as a generic type. Depending on its concrete > implementation, Kryo may have perform a serialization copy, which means > encoding into bytes (JSON encoding, charset encoding) and decoding again. > > This has a massive impact on the out-of-the-box performance of the system. > Given that, I was wondering whether we should set to default policy to "not > copying". > > That is basically the behavior of the batch API, and there has so far never > been an issue with that (people running into the trap of overwritten > mutable elements). > > What do you think? > > Stephan > |
In reply to this post by Stephan Ewen
It seems like I'm one of the few people that run into the mutable elements
trap on the Batch API from time to time. At the moment I always clone when I'm not 100% sure to avoid hunting the bugs later. So far I was happy to learn that this is not a problem in Streaming, but that's just me. When working with groupby and partition functions, its easy to forget that there is one class per operator not per partition. So if you write your code in the state of mind that each partition is separate object reduce on operator level becomes really annoying. Since the mapping between partitions and operators is usually hidden, makes the debugging harder especially in cases where the test data produces a single partition per operator and the real deployment does not. *To summarize:* I'm not against reusing objects as long as there is something that helps ease the pitfalls. This could be coding guidelines, debugging tools or best practices. On Fri, Oct 2, 2015 at 5:53 PM, Stephan Ewen <[hidden email]> wrote: > Hi all! > > Now that we are coming to the next release, I wanted to make sure we > finalize the decision on that point, because it would be nice to not break > the behavior of system afterwards. > > Right now, when tasks are chained together, the system copies the elements > always between different tasks in the same chain. > > I think this policy was established under the assumption that copies do not > cost anything, given our own test examples, which mainly use immutable > types like Strings, boxed primitives, .. > > In practice, a lot of data types are actually quite expensive to copy. > > For example, a rather common data type in the event analysis of web-sources > is JSON Object. > Flink treats this as a generic type. Depending on its concrete > implementation, Kryo may have perform a serialization copy, which means > encoding into bytes (JSON encoding, charset encoding) and decoding again. > > This has a massive impact on the out-of-the-box performance of the system. > Given that, I was wondering whether we should set to default policy to "not > copying". > > That is basically the behavior of the batch API, and there has so far never > been an issue with that (people running into the trap of overwritten > mutable elements). > > What do you think? > > Stephan > |
@Martin:
I think you were a user of the Batch API before we made the non-reuse mode the default mode. By now, when you use a GroupReduceFunction or a MapPartitionFunction or so, you need not do any cloning or copying. All functions that receive groups will always get fresh elements. This chaining issue concerns only MapFunction (and FlatMapFunction) where users keep lists to remember elements across invokations to the MapFunction. On Fri, Oct 2, 2015 at 6:27 PM, Martin Neumann <[hidden email]> wrote: > It seems like I'm one of the few people that run into the mutable elements > trap on the Batch API from time to time. At the moment I always clone when > I'm not 100% sure to avoid hunting the bugs later. So far I was happy to > learn that this is not a problem in Streaming, but that's just me. > > When working with groupby and partition functions, its easy to forget that > there is one class per operator not per partition. So if you write your > code in the state of mind that each partition is separate object reduce on > operator level becomes really annoying. > Since the mapping between partitions and operators is usually hidden, makes > the debugging harder especially in cases where the test data produces a > single partition per operator and the real deployment does not. > > *To summarize:* > I'm not against reusing objects as long as there is something that helps > ease the pitfalls. This could be coding guidelines, debugging tools or best > practices. > > > On Fri, Oct 2, 2015 at 5:53 PM, Stephan Ewen <[hidden email]> wrote: > > > Hi all! > > > > Now that we are coming to the next release, I wanted to make sure we > > finalize the decision on that point, because it would be nice to not > break > > the behavior of system afterwards. > > > > Right now, when tasks are chained together, the system copies the > elements > > always between different tasks in the same chain. > > > > I think this policy was established under the assumption that copies do > not > > cost anything, given our own test examples, which mainly use immutable > > types like Strings, boxed primitives, .. > > > > In practice, a lot of data types are actually quite expensive to copy. > > > > For example, a rather common data type in the event analysis of > web-sources > > is JSON Object. > > Flink treats this as a generic type. Depending on its concrete > > implementation, Kryo may have perform a serialization copy, which means > > encoding into bytes (JSON encoding, charset encoding) and decoding again. > > > > This has a massive impact on the out-of-the-box performance of the > system. > > Given that, I was wondering whether we should set to default policy to > "not > > copying". > > > > That is basically the behavior of the batch API, and there has so far > never > > been an issue with that (people running into the trap of overwritten > > mutable elements). > > > > What do you think? > > > > Stephan > > > |
In reply to this post by Matthias J. Sax-2
+1 Good idea. I think we can save quite some CPU cycles by not copying
records. That is basically the behavior of the batch API, and there has so far never > been an issue with that (people running into the trap of overwritten > mutable elements). As far as I know, this is only the case for chained operators? On Fri, Oct 2, 2015 at 6:15 PM, Matthias J. Sax <[hidden email]> wrote: > +1 for disable copy by default > > > On 10/02/2015 05:53 PM, Stephan Ewen wrote: > > Hi all! > > > > Now that we are coming to the next release, I wanted to make sure we > > finalize the decision on that point, because it would be nice to not > break > > the behavior of system afterwards. > > > > Right now, when tasks are chained together, the system copies the > elements > > always between different tasks in the same chain. > > > > I think this policy was established under the assumption that copies do > not > > cost anything, given our own test examples, which mainly use immutable > > types like Strings, boxed primitives, .. > > > > In practice, a lot of data types are actually quite expensive to copy. > > > > For example, a rather common data type in the event analysis of > web-sources > > is JSON Object. > > Flink treats this as a generic type. Depending on its concrete > > implementation, Kryo may have perform a serialization copy, which means > > encoding into bytes (JSON encoding, charset encoding) and decoding again. > > > > This has a massive impact on the out-of-the-box performance of the > system. > > Given that, I was wondering whether we should set to default policy to > "not > > copying". > > > > That is basically the behavior of the batch API, and there has so far > never > > been an issue with that (people running into the trap of overwritten > > mutable elements). > > > > What do you think? > > > > Stephan > > > > |
Do we know what kind of impact the non-reuse policy has? Maybe the
serialization overhead is subsumed by other effects. But in general I'm ok with changing the default to non copying. We just have to document this feature properly. On Oct 2, 2015 6:31 PM, "Maximilian Michels" <[hidden email]> wrote: > +1 Good idea. I think we can save quite some CPU cycles by not copying > records. > > That is basically the behavior of the batch API, and there has so far never > > been an issue with that (people running into the trap of overwritten > > mutable elements). > > > As far as I know, this is only the case for chained operators? > > On Fri, Oct 2, 2015 at 6:15 PM, Matthias J. Sax <[hidden email]> wrote: > > > +1 for disable copy by default > > > > > > On 10/02/2015 05:53 PM, Stephan Ewen wrote: > > > Hi all! > > > > > > Now that we are coming to the next release, I wanted to make sure we > > > finalize the decision on that point, because it would be nice to not > > break > > > the behavior of system afterwards. > > > > > > Right now, when tasks are chained together, the system copies the > > elements > > > always between different tasks in the same chain. > > > > > > I think this policy was established under the assumption that copies do > > not > > > cost anything, given our own test examples, which mainly use immutable > > > types like Strings, boxed primitives, .. > > > > > > In practice, a lot of data types are actually quite expensive to copy. > > > > > > For example, a rather common data type in the event analysis of > > web-sources > > > is JSON Object. > > > Flink treats this as a generic type. Depending on its concrete > > > implementation, Kryo may have perform a serialization copy, which means > > > encoding into bytes (JSON encoding, charset encoding) and decoding > again. > > > > > > This has a massive impact on the out-of-the-box performance of the > > system. > > > Given that, I was wondering whether we should set to default policy to > > "not > > > copying". > > > > > > That is basically the behavior of the batch API, and there has so far > > never > > > been an issue with that (people running into the trap of overwritten > > > mutable elements). > > > > > > What do you think? > > > > > > Stephan > > > > > > > > |
Hey guys,
Have we disabled the default input copying after all? I don't remember seeing a Jira or PR for this (maybe I just missed it). And if not, do we want this in the 0.10 release? Cheers, Gyula On Fri, Oct 2, 2015 at 7:57 PM, Till Rohrmann <[hidden email]> wrote: > Do we know what kind of impact the non-reuse policy has? Maybe the > serialization overhead is subsumed by other effects. > > But in general I'm ok with changing the default to non copying. We just > have to document this feature properly. > On Oct 2, 2015 6:31 PM, "Maximilian Michels" <[hidden email]> wrote: > > > +1 Good idea. I think we can save quite some CPU cycles by not copying > > records. > > > > That is basically the behavior of the batch API, and there has so far > never > > > been an issue with that (people running into the trap of overwritten > > > mutable elements). > > > > > > As far as I know, this is only the case for chained operators? > > > > On Fri, Oct 2, 2015 at 6:15 PM, Matthias J. Sax <[hidden email]> > wrote: > > > > > +1 for disable copy by default > > > > > > > > > On 10/02/2015 05:53 PM, Stephan Ewen wrote: > > > > Hi all! > > > > > > > > Now that we are coming to the next release, I wanted to make sure we > > > > finalize the decision on that point, because it would be nice to not > > > break > > > > the behavior of system afterwards. > > > > > > > > Right now, when tasks are chained together, the system copies the > > > elements > > > > always between different tasks in the same chain. > > > > > > > > I think this policy was established under the assumption that copies > do > > > not > > > > cost anything, given our own test examples, which mainly use > immutable > > > > types like Strings, boxed primitives, .. > > > > > > > > In practice, a lot of data types are actually quite expensive to > copy. > > > > > > > > For example, a rather common data type in the event analysis of > > > web-sources > > > > is JSON Object. > > > > Flink treats this as a generic type. Depending on its concrete > > > > implementation, Kryo may have perform a serialization copy, which > means > > > > encoding into bytes (JSON encoding, charset encoding) and decoding > > again. > > > > > > > > This has a massive impact on the out-of-the-box performance of the > > > system. > > > > Given that, I was wondering whether we should set to default policy > to > > > "not > > > > copying". > > > > > > > > That is basically the behavior of the batch API, and there has so far > > > never > > > > been an issue with that (people running into the trap of overwritten > > > > mutable elements). > > > > > > > > What do you think? > > > > > > > > Stephan > > > > > > > > > > > > > |
I don't recall that the default policy was changed.
If we change it, would be a good idea to change it for 0.10 - the latest for 1.0 One thing I realized is that to get predictable behavior with chaining, we should not do the special case parallelism 1 chaining (meaning shuffle operations get chained when both sender and receiver have parallelism 1). This causes different chaining behavior with different parallelism - can be an easy source of confusion when debugging a program. Parallelism 1 with repartitioning operators is probably mostly a debug setup anyways. On Sat, Oct 24, 2015 at 6:35 PM, Gyula Fóra <[hidden email]> wrote: > Hey guys, > > Have we disabled the default input copying after all? I don't remember > seeing a Jira or PR for this (maybe I just missed it). > > And if not, do we want this in the 0.10 release? > > Cheers, > Gyula > > On Fri, Oct 2, 2015 at 7:57 PM, Till Rohrmann <[hidden email]> > wrote: > > > Do we know what kind of impact the non-reuse policy has? Maybe the > > serialization overhead is subsumed by other effects. > > > > But in general I'm ok with changing the default to non copying. We just > > have to document this feature properly. > > On Oct 2, 2015 6:31 PM, "Maximilian Michels" <[hidden email]> wrote: > > > > > +1 Good idea. I think we can save quite some CPU cycles by not copying > > > records. > > > > > > That is basically the behavior of the batch API, and there has so far > > never > > > > been an issue with that (people running into the trap of overwritten > > > > mutable elements). > > > > > > > > > As far as I know, this is only the case for chained operators? > > > > > > On Fri, Oct 2, 2015 at 6:15 PM, Matthias J. Sax <[hidden email]> > > wrote: > > > > > > > +1 for disable copy by default > > > > > > > > > > > > On 10/02/2015 05:53 PM, Stephan Ewen wrote: > > > > > Hi all! > > > > > > > > > > Now that we are coming to the next release, I wanted to make sure > we > > > > > finalize the decision on that point, because it would be nice to > not > > > > break > > > > > the behavior of system afterwards. > > > > > > > > > > Right now, when tasks are chained together, the system copies the > > > > elements > > > > > always between different tasks in the same chain. > > > > > > > > > > I think this policy was established under the assumption that > copies > > do > > > > not > > > > > cost anything, given our own test examples, which mainly use > > immutable > > > > > types like Strings, boxed primitives, .. > > > > > > > > > > In practice, a lot of data types are actually quite expensive to > > copy. > > > > > > > > > > For example, a rather common data type in the event analysis of > > > > web-sources > > > > > is JSON Object. > > > > > Flink treats this as a generic type. Depending on its concrete > > > > > implementation, Kryo may have perform a serialization copy, which > > means > > > > > encoding into bytes (JSON encoding, charset encoding) and decoding > > > again. > > > > > > > > > > This has a massive impact on the out-of-the-box performance of the > > > > system. > > > > > Given that, I was wondering whether we should set to default policy > > to > > > > "not > > > > > copying". > > > > > > > > > > That is basically the behavior of the batch API, and there has so > far > > > > never > > > > > been an issue with that (people running into the trap of > overwritten > > > > > mutable elements). > > > > > > > > > > What do you think? > > > > > > > > > > Stephan > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |